diff --git a/.gitignore b/.gitignore index 9b36762..5b89af8 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ __pycache__ log cacheTest tmp +logs diff --git a/backend/TgFileSystemClient.py b/backend/TgFileSystemClient.py index fa3523d..de145b4 100644 --- a/backend/TgFileSystemClient.py +++ b/backend/TgFileSystemClient.py @@ -8,6 +8,7 @@ import os import functools import collections import traceback +import logging from collections import OrderedDict from typing import Union, Optional @@ -18,6 +19,7 @@ import configParse from backend import apiutils from backend.UserManager import UserManager +logger = logging.getLogger(__file__.split("/")[-1]) class TgFileSystemClient(object): @functools.total_ordering @@ -312,7 +314,7 @@ class TgFileSystemClient(object): try: t.cancel() except Exception as err: - print(f"{err=}") + logger.error(f"{err=}") async def _cache_whitelist_chat2(self): for chat_id in self.client_param.whitelist_chat: @@ -361,7 +363,7 @@ class TgFileSystemClient(object): task = await self.task_queue.get() await task[1] except Exception as err: - print(f"{err=}") + logger.error(f"{err=}") finally: self.task_queue.task_done() @@ -413,7 +415,6 @@ class TgFileSystemClient(object): async def _download_media_chunk(self, msg: types.Message, media_holder: MediaChunkHolder) -> None: try: - flag = False offset = media_holder.start + media_holder.length target_size = media_holder.target_len - media_holder.length remain_size = target_size @@ -426,25 +427,21 @@ class TgFileSystemClient(object): chunk[:len(chunk)+remain_size]) break media_holder.append_chunk_mem(chunk) - if await media_holder.is_disconneted() and not flag: - flag = True - # print(f"cancel trigger, requester len: {len(media_holder.requester)}, {media_holder=}") - # raise asyncio.CancelledError(f"disconneted,cancel:{media_holder=}") except asyncio.CancelledError as err: - # print(f"cancel holder:{media_holder}") + logger.warning(f"cancel holder:{media_holder}") self.media_chunk_manager.cancel_media_chunk(media_holder) except Exception as err: - print( + logger.error( f"_download_media_chunk err:{err=},{offset=},{target_size=},{media_holder},\r\n{traceback.format_exc()}") finally: media_holder.set_done() - # print( - # f"downloaded chunk:{time.time()}.{offset=},{target_size=},{media_holder}") + logger.debug( + f"downloaded chunk:{time.time()}.{offset=},{target_size=},{media_holder}") async def streaming_get_iter(self, msg: types.Message, start: int, end: int, req: Request): try: - # print( - # f"new steaming request:{msg.chat_id=},{msg.id=},[{start}:{end}]") + logger.debug( + f"new steaming request:{msg.chat_id=},{msg.id=},[{start}:{end}]") cur_task_id = self._get_unique_task_id() pos = start while not await req.is_disconnected() and pos <= end: @@ -476,8 +473,6 @@ class TgFileSystemClient(object): continue need_len = min(cache_chunk.length - offset, end - pos + 1) - # print( - # f"return missed {need_len} bytes:[{pos}:{pos+need_len}].{cache_chunk=}") pos = pos + need_len yield cache_chunk.mem[offset:offset+need_len] else: @@ -486,13 +481,11 @@ class TgFileSystemClient(object): raise RuntimeError( f"lru cache missed!{pos=},{cache_chunk=}") need_len = min(cache_chunk.length - offset, end - pos + 1) - # print( - # f"return hited {need_len} bytes:[{pos}:{pos+need_len}].{cache_chunk=}") pos = pos + need_len yield cache_chunk.mem[offset:offset+need_len] except Exception as err: traceback.print_exc() - print(f"stream iter:{err=}") + logger.error(f"stream iter:{err=}") finally: async def _cancel_task_by_id(task_id: int): for _ in range(self.task_queue.qsize()): @@ -501,7 +494,7 @@ class TgFileSystemClient(object): if task[0] != task_id: self.task_queue.put_nowait(task) await self.client.loop.create_task(_cancel_task_by_id(cur_task_id)) - # print(f"yield quit,{msg.chat_id=},{msg.id=},[{start}:{end}]") + logger.debug(f"yield quit,{msg.chat_id=},{msg.id=},[{start}:{end}]") def __enter__(self): raise NotImplementedError diff --git a/backend/TgFileSystemClientManager.py b/backend/TgFileSystemClientManager.py index 1b45a09..0bbf08f 100644 --- a/backend/TgFileSystemClientManager.py +++ b/backend/TgFileSystemClientManager.py @@ -3,11 +3,13 @@ import asyncio import time import hashlib import os +import logging from backend.TgFileSystemClient import TgFileSystemClient from backend.UserManager import UserManager import configParse +logger = logging.getLogger(__file__.split("/")[-1]) class TgFileSystemClientManager(object): MAX_MANAGE_CLIENTS: int = 10 diff --git a/backend/UserManager.py b/backend/UserManager.py index a28f0f1..6c51d4a 100644 --- a/backend/UserManager.py +++ b/backend/UserManager.py @@ -1,11 +1,13 @@ import os from enum import Enum, IntEnum, unique, auto import sqlite3 +import logging import datetime from pydantic import BaseModel from telethon import types +logger = logging.getLogger(__file__.split("/")[-1]) class UserUpdateParam(BaseModel): client_id: str @@ -124,7 +126,7 @@ class UserManager(object): file_name = attr.file_name msg_type = UserManager.MessageTypeEnum.FILE.value except Exception as err: - print(f"{err=}") + logger.error(f"{err=}") insert_data = (unique_id, user_id, chat_id, msg_id, msg_type, msg_ctx, mime_type, file_name, msg_js, date_time) execute_script = "INSERT INTO message (unique_id, user_id, chat_id, msg_id, msg_type, msg_ctx, mime_type, file_name, msg_js, date_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" @@ -132,7 +134,7 @@ class UserManager(object): self.cur.execute(execute_script, insert_data) self.con.commit() except Exception as err: - print(f"{err=}") + logger.error(f"{err=}") @unique class ColumnEnum(IntEnum): diff --git a/backend/api.py b/backend/api.py index a9beb32..9a6af52 100644 --- a/backend/api.py +++ b/backend/api.py @@ -1,6 +1,7 @@ import asyncio import json import os +import logging import uvicorn from fastapi import FastAPI, status, Request @@ -14,6 +15,8 @@ import configParse from backend import apiutils from backend.TgFileSystemClientManager import TgFileSystemClientManager +logger = logging.getLogger(__file__.split("/")[-1]) + clients_mgr: TgFileSystemClientManager = None @@ -63,7 +66,7 @@ async def search_tg_file_list(body: TgToFileListRequestBody): } return Response(json.dumps(response_dict), status_code=status.HTTP_200_OK) except Exception as err: - print(f"{err=}") + logger.error(f"{err=}") return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND) @@ -96,7 +99,7 @@ async def get_tg_file_list(body: TgToFileListRequestBody): } return Response(json.dumps(response_dict), status_code=status.HTTP_200_OK) except Exception as err: - print(f"{err=}") + logger.error(f"{err=}") return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND) @@ -145,7 +148,7 @@ async def get_tg_file_media_stream(token: str, cid: int, mid: int, request: Requ status_code=status_code, ) except Exception as err: - print(f"{err=}") + logger.error(f"{err=}") return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND) @@ -195,7 +198,7 @@ async def get_tg_client_chat_list(body: TgToChatListRequestBody, request: Reques } return Response(json.dumps(response_dict), status_code=status.HTTP_200_OK) except Exception as err: - print(f"{err=}") + logger.error(f"{err=}") return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND) if __name__ == "__main__": diff --git a/backend/apiutils.py b/backend/apiutils.py index bf4b9b3..3ad2207 100644 --- a/backend/apiutils.py +++ b/backend/apiutils.py @@ -1,4 +1,5 @@ import time +import logging from fastapi import status, HTTPException from telethon import types @@ -6,6 +7,7 @@ from functools import wraps import configParse +logger = logging.getLogger(__file__.split("/")[-1]) def get_range_header(range_header: str, file_size: int) -> tuple[int, int]: def _invalid_range(): @@ -37,13 +39,13 @@ def get_message_media_name(msg: types.Message) -> str: def timeit_sec(func): @wraps(func) def timeit_wrapper(*args, **kwargs): - print( + logger.debug( f'Function called {func.__name__}{args} {kwargs}') start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() total_time = end_time - start_time - print( + logger.debug( f'Function quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds') return result return timeit_wrapper @@ -52,13 +54,13 @@ def timeit(func): if configParse.get_TgToFileSystemParameter().base.timeit_enable: @wraps(func) def timeit_wrapper(*args, **kwargs): - print( + logger.debug( f'Function called {func.__name__}{args} {kwargs}') start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() total_time = end_time - start_time - print( + logger.debug( f'Function quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds') return result return timeit_wrapper @@ -69,13 +71,13 @@ def atimeit(func): if configParse.get_TgToFileSystemParameter().base.timeit_enable: @wraps(func) async def timeit_wrapper(*args, **kwargs): - print( + logger.debug( f'AFunction called {func.__name__}{args} {kwargs}') start_time = time.perf_counter() result = await func(*args, **kwargs) end_time = time.perf_counter() total_time = end_time - start_time - print( + logger.debug( f'AFunction quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds') return result return timeit_wrapper diff --git a/logging_config.yaml b/logging_config.yaml new file mode 100644 index 0000000..ddda5a9 --- /dev/null +++ b/logging_config.yaml @@ -0,0 +1,24 @@ +version: 1 +disable_existing_loggers: false +formatters: + standard: + format: '%(asctime)s [%(name)s][%(levelname)s]:%(message)s' +handlers: + console: + level: INFO + class: logging.StreamHandler + formatter: standard + timed_rotating_file: + class: logging.handlers.TimedRotatingFileHandler + filename: logs/app.log + when: midnight + interval: 1 + backupCount: 7 + level: INFO + formatter: standard + encoding: utf-8 +loggers: + '': + handlers: [console, timed_rotating_file] + level: DEBUG + propagate: true diff --git a/start.py b/start.py index edc0af1..0e4a1fc 100644 --- a/start.py +++ b/start.py @@ -1,12 +1,24 @@ import asyncio import os import sys +import yaml +import logging import uvicorn import configParse from backend import backendapp +if not os.path.exists(os.path.dirname(__file__) + '/logs'): + os.mkdir(os.path.dirname(__file__) + '/logs') +with open('logging_config.yaml', 'r') as f: + logging.config.dictConfig(yaml.safe_load(f.read())) +for handler in logging.getLogger().handlers: + if isinstance(handler, logging.handlers.TimedRotatingFileHandler): + handler.suffix = "%Y-%m-%d" + +logger = logging.getLogger(__file__.split("/")[-1]) + if __name__ == "__main__": param = configParse.get_TgToFileSystemParameter() async def run_web_server(): @@ -14,11 +26,11 @@ if __name__ == "__main__": proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() - print(f'[{cmd!r} exited with {proc.returncode}]') + logger.info(f'[{cmd!r} exited with {proc.returncode}]') if stdout: - print(f'[stdout]\n{stdout.decode()}') + logger.info(f'[stdout]\n{stdout.decode()}') if stderr: - print(f'[stderr]\n{stderr.decode()}') + logger.info(f'[stderr]\n{stderr.decode()}') if param.web.enable: ret = os.fork() if ret == 0: diff --git a/test.py b/test.py index b7af657..ce0887c 100644 --- a/test.py +++ b/test.py @@ -1,14 +1,136 @@ +import logging.handlers import time import asyncio import json +import rsa +import pickle +import base64 +from datetime import datetime +import logging import os +import yaml from telethon import TelegramClient, utils, types +import diskcache from backend.UserManager import UserManager +from backend import apiutils import configParse +with open('logging_config.yaml', 'r') as f: + logging.config.dictConfig(yaml.safe_load(f.read())) +for handler in logging.getLogger().handlers: + if isinstance(handler, logging.handlers.TimedRotatingFileHandler): + handler.suffix = "%Y-%m-%d" +logger = logging.getLogger(__file__.split("/")[-1]) + +logger.debug('This is a debug message') +logger.info('This is an info message') +logger.warning('This is a warning message') +logger.error('This is an error message') +logger.critical('This is a critical message') + +exit(0) + +# class TestClass(object): +# int_value: int = 1 +# float_value: float = 2.0 +# bool_value: bool = True +# bytes_value: bytes = b'Man! What can i say!' + +# src_obj = TestClass() +# with open('tmp', 'wb') as f: +# src_obj.int_value = 10000000000000 +# import random +# src_obj.bytes_value = random.randbytes(5*1024*1024) +# pickle.dump(src_obj, f) +# test_bytes = random.randbytes(5*1024*1024) + + +# with open('tmp', 'rb') as f: +# test_bytes = f.read() + +# @apiutils.timeit_sec +# def pickle_loads_test(loop) -> TestClass: +# obj_cls: TestClass|None = None +# for _ in range(loop): +# obj_cls = pickle.loads(obj_bytes) +# return obj_cls + +# @apiutils.timeit_sec +# def pickle_dumps_test(loop) -> bytes: +# obj_bytes: bytes|None = None +# for _ in range(loop): +# obj_bytes = pickle.dumps(obj_cls) +# return obj_bytes + +# for i in range(10): +# print(f"loop:{i}") +# test_obj = pickle_loads_test(test_bytes, 1000) +# pickle_dumps_test(test_obj, 1000) + +# exit(0) + +# cache = diskcache.Cache("./cacheTest", size_limit=2**30, eviction_policy='least-recently-used') +# random_key = random.randbytes(1000) +# @apiutils.timeit_sec +# def test_db_write_cache(): +# for i in range(1000): +# cache.add(int(random_key[i]), test_bytes, expire=300) +# @apiutils.timeit_sec +# def test_db_read_cache(): +# for i in range(1000): +# exist = cache.touch(int(random_key[i]), expire=300) +# if exist: +# cache.get(int(random_key[i])) +# test_db_write_cache() +# test_db_read_cache() + +# exit(0) + +# db = UserManager() +# search_cur = db.con.cursor() +# update_cur = db.con.cursor() +# res = search_cur.execute("SELECT * FROM message") +# cnt = 0 +# for row in res: +# (unique_id, date_time, msg_js) = (row[0], row[-1], row[-2]) +# msg_dic = json.loads(msg_js) +# date_time_str = msg_dic['date'] +# if date_time is not None or date_time_str is None: +# continue +# date = datetime.fromisoformat(date_time_str) +# ts = int(date.timestamp() * 1_000) * 1_000_000 +# try: +# update_cur.execute(f"UPDATE message SET date_time = {ts} WHERE unique_id == '{unique_id}'") +# except Exception as err: +# print(f"{err=}") +# if cnt % 1000 == 0: +# db.con.commit() +# print(cnt) +# cnt += 1 +# db.con.commit() +# print(cnt) +# exit(0) + +# pubkey, prikey = rsa.newkeys(1024) +# print(pubkey) +# print(prikey) +# print() +# enc_bytes = rsa.encrypt("token=anonnnnnnn1435145nnnnnnn;cid=-1001216816802;mid=95056;t=2000000000000".encode('utf-8'), pubkey) +# print(enc_bytes) +# print(len(enc_bytes)) +# b64enc_str = base64.b64encode(enc_bytes) +# print(b64enc_str.decode('utf-8')) +# print(len(b64enc_str)) +# dec_bytes = base64.b64decode(b64enc_str) +# # print(dec_bytes)s +# origin_str = rsa.decrypt(dec_bytes, prikey) +# print(origin_str) +# print(len(origin_str.decode('utf-8'))) +# exit(0) + param = configParse.get_TgToFileSystemParameter() # Remember to use your own values from my.telegram.org! api_id = param.tgApi.api_id @@ -42,28 +164,32 @@ async def main(client: TelegramClient): username = me.username print(username) print(me.phone) + + msg = await client.get_messages(1216816802, ids=[99334]) + # client.download_media(msg, ) + # print(path) # client.get_entity - i = 0 - async for msg in client.iter_messages('pitaogo'): - print(f'{msg.id=} ,{msg.message=}, {msg.media=}') - i += 1 - if i >= 10: - break + # i = 0 + # async for msg in client.iter_messages('pitaogo'): + # print(f'{msg.id=} ,{msg.message=}, {msg.media=}') + # i += 1 + # if i >= 10: + # break # You can print all the dialogs/conversations that you are part of: - peer_type_list = [] - async for dialog in client.iter_dialogs(): - real_id, peer_type = utils.resolve_id(dialog.id) - if peer_type in peer_type_list: - continue - peer_type_list.append(peer_type) - print(f'{dialog.name} has ID {dialog.id} real_id {real_id} type {peer_type}') - i = 0 - async for msg in client.iter_messages(real_id): - print(f'{msg.id=}, {msg.message=}, {msg.media=}') - i += 1 - if i >= 10: - break + # peer_type_list = [] + # async for dialog in client.iter_dialogs(): + # real_id, peer_type = utils.resolve_id(dialog.id) + # if peer_type in peer_type_list: + # continue + # peer_type_list.append(peer_type) + # print(f'{dialog.name} has ID {dialog.id} real_id {real_id} type {peer_type}') + # i = 0 + # async for msg in client.iter_messages(real_id): + # print(f'{msg.id=}, {msg.message=}, {msg.media=}') + # i += 1 + # if i >= 10: + # break # test_res = await client.get_input_entity(dialog.id) # print(test_res) # await client.send_message(-1001150067822, "test message from python")