feat: add logger

This commit is contained in:
hehesheng 2024-05-27 23:27:44 +08:00
parent c2858bce08
commit 884a22b434
9 changed files with 218 additions and 53 deletions

1
.gitignore vendored
View File

@ -10,3 +10,4 @@ __pycache__
log log
cacheTest cacheTest
tmp tmp
logs

View File

@ -8,6 +8,7 @@ import os
import functools import functools
import collections import collections
import traceback import traceback
import logging
from collections import OrderedDict from collections import OrderedDict
from typing import Union, Optional from typing import Union, Optional
@ -18,6 +19,7 @@ import configParse
from backend import apiutils from backend import apiutils
from backend.UserManager import UserManager from backend.UserManager import UserManager
logger = logging.getLogger(__file__.split("/")[-1])
class TgFileSystemClient(object): class TgFileSystemClient(object):
@functools.total_ordering @functools.total_ordering
@ -312,7 +314,7 @@ class TgFileSystemClient(object):
try: try:
t.cancel() t.cancel()
except Exception as err: except Exception as err:
print(f"{err=}") logger.error(f"{err=}")
async def _cache_whitelist_chat2(self): async def _cache_whitelist_chat2(self):
for chat_id in self.client_param.whitelist_chat: for chat_id in self.client_param.whitelist_chat:
@ -361,7 +363,7 @@ class TgFileSystemClient(object):
task = await self.task_queue.get() task = await self.task_queue.get()
await task[1] await task[1]
except Exception as err: except Exception as err:
print(f"{err=}") logger.error(f"{err=}")
finally: finally:
self.task_queue.task_done() self.task_queue.task_done()
@ -413,7 +415,6 @@ class TgFileSystemClient(object):
async def _download_media_chunk(self, msg: types.Message, media_holder: MediaChunkHolder) -> None: async def _download_media_chunk(self, msg: types.Message, media_holder: MediaChunkHolder) -> None:
try: try:
flag = False
offset = media_holder.start + media_holder.length offset = media_holder.start + media_holder.length
target_size = media_holder.target_len - media_holder.length target_size = media_holder.target_len - media_holder.length
remain_size = target_size remain_size = target_size
@ -426,25 +427,21 @@ class TgFileSystemClient(object):
chunk[:len(chunk)+remain_size]) chunk[:len(chunk)+remain_size])
break break
media_holder.append_chunk_mem(chunk) media_holder.append_chunk_mem(chunk)
if await media_holder.is_disconneted() and not flag:
flag = True
# print(f"cancel trigger, requester len: {len(media_holder.requester)}, {media_holder=}")
# raise asyncio.CancelledError(f"disconneted,cancel:{media_holder=}")
except asyncio.CancelledError as err: except asyncio.CancelledError as err:
# print(f"cancel holder:{media_holder}") logger.warning(f"cancel holder:{media_holder}")
self.media_chunk_manager.cancel_media_chunk(media_holder) self.media_chunk_manager.cancel_media_chunk(media_holder)
except Exception as err: except Exception as err:
print( logger.error(
f"_download_media_chunk err:{err=},{offset=},{target_size=},{media_holder},\r\n{traceback.format_exc()}") f"_download_media_chunk err:{err=},{offset=},{target_size=},{media_holder},\r\n{traceback.format_exc()}")
finally: finally:
media_holder.set_done() media_holder.set_done()
# print( logger.debug(
# f"downloaded chunk:{time.time()}.{offset=},{target_size=},{media_holder}") f"downloaded chunk:{time.time()}.{offset=},{target_size=},{media_holder}")
async def streaming_get_iter(self, msg: types.Message, start: int, end: int, req: Request): async def streaming_get_iter(self, msg: types.Message, start: int, end: int, req: Request):
try: try:
# print( logger.debug(
# f"new steaming request:{msg.chat_id=},{msg.id=},[{start}:{end}]") f"new steaming request:{msg.chat_id=},{msg.id=},[{start}:{end}]")
cur_task_id = self._get_unique_task_id() cur_task_id = self._get_unique_task_id()
pos = start pos = start
while not await req.is_disconnected() and pos <= end: while not await req.is_disconnected() and pos <= end:
@ -476,8 +473,6 @@ class TgFileSystemClient(object):
continue continue
need_len = min(cache_chunk.length - need_len = min(cache_chunk.length -
offset, end - pos + 1) offset, end - pos + 1)
# print(
# f"return missed {need_len} bytes:[{pos}:{pos+need_len}].{cache_chunk=}")
pos = pos + need_len pos = pos + need_len
yield cache_chunk.mem[offset:offset+need_len] yield cache_chunk.mem[offset:offset+need_len]
else: else:
@ -486,13 +481,11 @@ class TgFileSystemClient(object):
raise RuntimeError( raise RuntimeError(
f"lru cache missed!{pos=},{cache_chunk=}") f"lru cache missed!{pos=},{cache_chunk=}")
need_len = min(cache_chunk.length - offset, end - pos + 1) need_len = min(cache_chunk.length - offset, end - pos + 1)
# print(
# f"return hited {need_len} bytes:[{pos}:{pos+need_len}].{cache_chunk=}")
pos = pos + need_len pos = pos + need_len
yield cache_chunk.mem[offset:offset+need_len] yield cache_chunk.mem[offset:offset+need_len]
except Exception as err: except Exception as err:
traceback.print_exc() traceback.print_exc()
print(f"stream iter:{err=}") logger.error(f"stream iter:{err=}")
finally: finally:
async def _cancel_task_by_id(task_id: int): async def _cancel_task_by_id(task_id: int):
for _ in range(self.task_queue.qsize()): for _ in range(self.task_queue.qsize()):
@ -501,7 +494,7 @@ class TgFileSystemClient(object):
if task[0] != task_id: if task[0] != task_id:
self.task_queue.put_nowait(task) self.task_queue.put_nowait(task)
await self.client.loop.create_task(_cancel_task_by_id(cur_task_id)) await self.client.loop.create_task(_cancel_task_by_id(cur_task_id))
# print(f"yield quit,{msg.chat_id=},{msg.id=},[{start}:{end}]") logger.debug(f"yield quit,{msg.chat_id=},{msg.id=},[{start}:{end}]")
def __enter__(self): def __enter__(self):
raise NotImplementedError raise NotImplementedError

View File

@ -3,11 +3,13 @@ import asyncio
import time import time
import hashlib import hashlib
import os import os
import logging
from backend.TgFileSystemClient import TgFileSystemClient from backend.TgFileSystemClient import TgFileSystemClient
from backend.UserManager import UserManager from backend.UserManager import UserManager
import configParse import configParse
logger = logging.getLogger(__file__.split("/")[-1])
class TgFileSystemClientManager(object): class TgFileSystemClientManager(object):
MAX_MANAGE_CLIENTS: int = 10 MAX_MANAGE_CLIENTS: int = 10

View File

@ -1,11 +1,13 @@
import os import os
from enum import Enum, IntEnum, unique, auto from enum import Enum, IntEnum, unique, auto
import sqlite3 import sqlite3
import logging
import datetime import datetime
from pydantic import BaseModel from pydantic import BaseModel
from telethon import types from telethon import types
logger = logging.getLogger(__file__.split("/")[-1])
class UserUpdateParam(BaseModel): class UserUpdateParam(BaseModel):
client_id: str client_id: str
@ -124,7 +126,7 @@ class UserManager(object):
file_name = attr.file_name file_name = attr.file_name
msg_type = UserManager.MessageTypeEnum.FILE.value msg_type = UserManager.MessageTypeEnum.FILE.value
except Exception as err: except Exception as err:
print(f"{err=}") logger.error(f"{err=}")
insert_data = (unique_id, user_id, chat_id, msg_id, insert_data = (unique_id, user_id, chat_id, msg_id,
msg_type, msg_ctx, mime_type, file_name, msg_js, date_time) msg_type, msg_ctx, mime_type, file_name, msg_js, date_time)
execute_script = "INSERT INTO message (unique_id, user_id, chat_id, msg_id, msg_type, msg_ctx, mime_type, file_name, msg_js, date_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" execute_script = "INSERT INTO message (unique_id, user_id, chat_id, msg_id, msg_type, msg_ctx, mime_type, file_name, msg_js, date_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
@ -132,7 +134,7 @@ class UserManager(object):
self.cur.execute(execute_script, insert_data) self.cur.execute(execute_script, insert_data)
self.con.commit() self.con.commit()
except Exception as err: except Exception as err:
print(f"{err=}") logger.error(f"{err=}")
@unique @unique
class ColumnEnum(IntEnum): class ColumnEnum(IntEnum):

View File

@ -1,6 +1,7 @@
import asyncio import asyncio
import json import json
import os import os
import logging
import uvicorn import uvicorn
from fastapi import FastAPI, status, Request from fastapi import FastAPI, status, Request
@ -14,6 +15,8 @@ import configParse
from backend import apiutils from backend import apiutils
from backend.TgFileSystemClientManager import TgFileSystemClientManager from backend.TgFileSystemClientManager import TgFileSystemClientManager
logger = logging.getLogger(__file__.split("/")[-1])
clients_mgr: TgFileSystemClientManager = None clients_mgr: TgFileSystemClientManager = None
@ -63,7 +66,7 @@ async def search_tg_file_list(body: TgToFileListRequestBody):
} }
return Response(json.dumps(response_dict), status_code=status.HTTP_200_OK) return Response(json.dumps(response_dict), status_code=status.HTTP_200_OK)
except Exception as err: except Exception as err:
print(f"{err=}") logger.error(f"{err=}")
return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND) return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND)
@ -96,7 +99,7 @@ async def get_tg_file_list(body: TgToFileListRequestBody):
} }
return Response(json.dumps(response_dict), status_code=status.HTTP_200_OK) return Response(json.dumps(response_dict), status_code=status.HTTP_200_OK)
except Exception as err: except Exception as err:
print(f"{err=}") logger.error(f"{err=}")
return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND) return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND)
@ -145,7 +148,7 @@ async def get_tg_file_media_stream(token: str, cid: int, mid: int, request: Requ
status_code=status_code, status_code=status_code,
) )
except Exception as err: except Exception as err:
print(f"{err=}") logger.error(f"{err=}")
return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND) return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND)
@ -195,7 +198,7 @@ async def get_tg_client_chat_list(body: TgToChatListRequestBody, request: Reques
} }
return Response(json.dumps(response_dict), status_code=status.HTTP_200_OK) return Response(json.dumps(response_dict), status_code=status.HTTP_200_OK)
except Exception as err: except Exception as err:
print(f"{err=}") logger.error(f"{err=}")
return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND) return Response(json.dumps({"detail": f"{err=}"}), status_code=status.HTTP_404_NOT_FOUND)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,4 +1,5 @@
import time import time
import logging
from fastapi import status, HTTPException from fastapi import status, HTTPException
from telethon import types from telethon import types
@ -6,6 +7,7 @@ from functools import wraps
import configParse import configParse
logger = logging.getLogger(__file__.split("/")[-1])
def get_range_header(range_header: str, file_size: int) -> tuple[int, int]: def get_range_header(range_header: str, file_size: int) -> tuple[int, int]:
def _invalid_range(): def _invalid_range():
@ -37,13 +39,13 @@ def get_message_media_name(msg: types.Message) -> str:
def timeit_sec(func): def timeit_sec(func):
@wraps(func) @wraps(func)
def timeit_wrapper(*args, **kwargs): def timeit_wrapper(*args, **kwargs):
print( logger.debug(
f'Function called {func.__name__}{args} {kwargs}') f'Function called {func.__name__}{args} {kwargs}')
start_time = time.perf_counter() start_time = time.perf_counter()
result = func(*args, **kwargs) result = func(*args, **kwargs)
end_time = time.perf_counter() end_time = time.perf_counter()
total_time = end_time - start_time total_time = end_time - start_time
print( logger.debug(
f'Function quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds') f'Function quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds')
return result return result
return timeit_wrapper return timeit_wrapper
@ -52,13 +54,13 @@ def timeit(func):
if configParse.get_TgToFileSystemParameter().base.timeit_enable: if configParse.get_TgToFileSystemParameter().base.timeit_enable:
@wraps(func) @wraps(func)
def timeit_wrapper(*args, **kwargs): def timeit_wrapper(*args, **kwargs):
print( logger.debug(
f'Function called {func.__name__}{args} {kwargs}') f'Function called {func.__name__}{args} {kwargs}')
start_time = time.perf_counter() start_time = time.perf_counter()
result = func(*args, **kwargs) result = func(*args, **kwargs)
end_time = time.perf_counter() end_time = time.perf_counter()
total_time = end_time - start_time total_time = end_time - start_time
print( logger.debug(
f'Function quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds') f'Function quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds')
return result return result
return timeit_wrapper return timeit_wrapper
@ -69,13 +71,13 @@ def atimeit(func):
if configParse.get_TgToFileSystemParameter().base.timeit_enable: if configParse.get_TgToFileSystemParameter().base.timeit_enable:
@wraps(func) @wraps(func)
async def timeit_wrapper(*args, **kwargs): async def timeit_wrapper(*args, **kwargs):
print( logger.debug(
f'AFunction called {func.__name__}{args} {kwargs}') f'AFunction called {func.__name__}{args} {kwargs}')
start_time = time.perf_counter() start_time = time.perf_counter()
result = await func(*args, **kwargs) result = await func(*args, **kwargs)
end_time = time.perf_counter() end_time = time.perf_counter()
total_time = end_time - start_time total_time = end_time - start_time
print( logger.debug(
f'AFunction quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds') f'AFunction quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds')
return result return result
return timeit_wrapper return timeit_wrapper

24
logging_config.yaml Normal file
View File

@ -0,0 +1,24 @@
version: 1
disable_existing_loggers: false
formatters:
standard:
format: '%(asctime)s [%(name)s][%(levelname)s]:%(message)s'
handlers:
console:
level: INFO
class: logging.StreamHandler
formatter: standard
timed_rotating_file:
class: logging.handlers.TimedRotatingFileHandler
filename: logs/app.log
when: midnight
interval: 1
backupCount: 7
level: INFO
formatter: standard
encoding: utf-8
loggers:
'':
handlers: [console, timed_rotating_file]
level: DEBUG
propagate: true

View File

@ -1,12 +1,24 @@
import asyncio import asyncio
import os import os
import sys import sys
import yaml
import logging
import uvicorn import uvicorn
import configParse import configParse
from backend import backendapp from backend import backendapp
if not os.path.exists(os.path.dirname(__file__) + '/logs'):
os.mkdir(os.path.dirname(__file__) + '/logs')
with open('logging_config.yaml', 'r') as f:
logging.config.dictConfig(yaml.safe_load(f.read()))
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.handlers.TimedRotatingFileHandler):
handler.suffix = "%Y-%m-%d"
logger = logging.getLogger(__file__.split("/")[-1])
if __name__ == "__main__": if __name__ == "__main__":
param = configParse.get_TgToFileSystemParameter() param = configParse.get_TgToFileSystemParameter()
async def run_web_server(): async def run_web_server():
@ -14,11 +26,11 @@ if __name__ == "__main__":
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE) stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate() stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]') logger.info(f'[{cmd!r} exited with {proc.returncode}]')
if stdout: if stdout:
print(f'[stdout]\n{stdout.decode()}') logger.info(f'[stdout]\n{stdout.decode()}')
if stderr: if stderr:
print(f'[stderr]\n{stderr.decode()}') logger.info(f'[stderr]\n{stderr.decode()}')
if param.web.enable: if param.web.enable:
ret = os.fork() ret = os.fork()
if ret == 0: if ret == 0:

164
test.py
View File

@ -1,14 +1,136 @@
import logging.handlers
import time import time
import asyncio import asyncio
import json import json
import rsa
import pickle
import base64
from datetime import datetime
import logging
import os import os
import yaml
from telethon import TelegramClient, utils, types from telethon import TelegramClient, utils, types
import diskcache
from backend.UserManager import UserManager from backend.UserManager import UserManager
from backend import apiutils
import configParse import configParse
with open('logging_config.yaml', 'r') as f:
logging.config.dictConfig(yaml.safe_load(f.read()))
for handler in logging.getLogger().handlers:
if isinstance(handler, logging.handlers.TimedRotatingFileHandler):
handler.suffix = "%Y-%m-%d"
logger = logging.getLogger(__file__.split("/")[-1])
logger.debug('This is a debug message')
logger.info('This is an info message')
logger.warning('This is a warning message')
logger.error('This is an error message')
logger.critical('This is a critical message')
exit(0)
# class TestClass(object):
# int_value: int = 1
# float_value: float = 2.0
# bool_value: bool = True
# bytes_value: bytes = b'Man! What can i say!'
# src_obj = TestClass()
# with open('tmp', 'wb') as f:
# src_obj.int_value = 10000000000000
# import random
# src_obj.bytes_value = random.randbytes(5*1024*1024)
# pickle.dump(src_obj, f)
# test_bytes = random.randbytes(5*1024*1024)
# with open('tmp', 'rb') as f:
# test_bytes = f.read()
# @apiutils.timeit_sec
# def pickle_loads_test(loop) -> TestClass:
# obj_cls: TestClass|None = None
# for _ in range(loop):
# obj_cls = pickle.loads(obj_bytes)
# return obj_cls
# @apiutils.timeit_sec
# def pickle_dumps_test(loop) -> bytes:
# obj_bytes: bytes|None = None
# for _ in range(loop):
# obj_bytes = pickle.dumps(obj_cls)
# return obj_bytes
# for i in range(10):
# print(f"loop:{i}")
# test_obj = pickle_loads_test(test_bytes, 1000)
# pickle_dumps_test(test_obj, 1000)
# exit(0)
# cache = diskcache.Cache("./cacheTest", size_limit=2**30, eviction_policy='least-recently-used')
# random_key = random.randbytes(1000)
# @apiutils.timeit_sec
# def test_db_write_cache():
# for i in range(1000):
# cache.add(int(random_key[i]), test_bytes, expire=300)
# @apiutils.timeit_sec
# def test_db_read_cache():
# for i in range(1000):
# exist = cache.touch(int(random_key[i]), expire=300)
# if exist:
# cache.get(int(random_key[i]))
# test_db_write_cache()
# test_db_read_cache()
# exit(0)
# db = UserManager()
# search_cur = db.con.cursor()
# update_cur = db.con.cursor()
# res = search_cur.execute("SELECT * FROM message")
# cnt = 0
# for row in res:
# (unique_id, date_time, msg_js) = (row[0], row[-1], row[-2])
# msg_dic = json.loads(msg_js)
# date_time_str = msg_dic['date']
# if date_time is not None or date_time_str is None:
# continue
# date = datetime.fromisoformat(date_time_str)
# ts = int(date.timestamp() * 1_000) * 1_000_000
# try:
# update_cur.execute(f"UPDATE message SET date_time = {ts} WHERE unique_id == '{unique_id}'")
# except Exception as err:
# print(f"{err=}")
# if cnt % 1000 == 0:
# db.con.commit()
# print(cnt)
# cnt += 1
# db.con.commit()
# print(cnt)
# exit(0)
# pubkey, prikey = rsa.newkeys(1024)
# print(pubkey)
# print(prikey)
# print()
# enc_bytes = rsa.encrypt("token=anonnnnnnn1435145nnnnnnn;cid=-1001216816802;mid=95056;t=2000000000000".encode('utf-8'), pubkey)
# print(enc_bytes)
# print(len(enc_bytes))
# b64enc_str = base64.b64encode(enc_bytes)
# print(b64enc_str.decode('utf-8'))
# print(len(b64enc_str))
# dec_bytes = base64.b64decode(b64enc_str)
# # print(dec_bytes)s
# origin_str = rsa.decrypt(dec_bytes, prikey)
# print(origin_str)
# print(len(origin_str.decode('utf-8')))
# exit(0)
param = configParse.get_TgToFileSystemParameter() param = configParse.get_TgToFileSystemParameter()
# Remember to use your own values from my.telegram.org! # Remember to use your own values from my.telegram.org!
api_id = param.tgApi.api_id api_id = param.tgApi.api_id
@ -43,27 +165,31 @@ async def main(client: TelegramClient):
print(username) print(username)
print(me.phone) print(me.phone)
msg = await client.get_messages(1216816802, ids=[99334])
# client.download_media(msg, )
# print(path)
# client.get_entity # client.get_entity
i = 0 # i = 0
async for msg in client.iter_messages('pitaogo'): # async for msg in client.iter_messages('pitaogo'):
print(f'{msg.id=} ,{msg.message=}, {msg.media=}') # print(f'{msg.id=} ,{msg.message=}, {msg.media=}')
i += 1 # i += 1
if i >= 10: # if i >= 10:
break # break
# You can print all the dialogs/conversations that you are part of: # You can print all the dialogs/conversations that you are part of:
peer_type_list = [] # peer_type_list = []
async for dialog in client.iter_dialogs(): # async for dialog in client.iter_dialogs():
real_id, peer_type = utils.resolve_id(dialog.id) # real_id, peer_type = utils.resolve_id(dialog.id)
if peer_type in peer_type_list: # if peer_type in peer_type_list:
continue # continue
peer_type_list.append(peer_type) # peer_type_list.append(peer_type)
print(f'{dialog.name} has ID {dialog.id} real_id {real_id} type {peer_type}') # print(f'{dialog.name} has ID {dialog.id} real_id {real_id} type {peer_type}')
i = 0 # i = 0
async for msg in client.iter_messages(real_id): # async for msg in client.iter_messages(real_id):
print(f'{msg.id=}, {msg.message=}, {msg.media=}') # print(f'{msg.id=}, {msg.message=}, {msg.media=}')
i += 1 # i += 1
if i >= 10: # if i >= 10:
break # break
# test_res = await client.get_input_entity(dialog.id) # test_res = await client.get_input_entity(dialog.id)
# print(test_res) # print(test_res)
# await client.send_message(-1001150067822, "test message from python") # await client.send_message(-1001150067822, "test message from python")