diff --git a/backend/MediaCacheManager.py b/backend/MediaCacheManager.py index 9fe4c75..92b9665 100644 --- a/backend/MediaCacheManager.py +++ b/backend/MediaCacheManager.py @@ -7,7 +7,7 @@ import asyncio import traceback import hashlib import collections -from typing import Union, Optional, Callable +from typing import Union, Optional import diskcache from fastapi import Request @@ -45,19 +45,17 @@ class MediaChunkHolder(object): requesters: list[Request] = [] unique_id: str = "" info: ChunkInfo - callback: Callable = None @staticmethod def generate_id(chat_id: int, msg_id: int, start: int) -> str: return f"{chat_id}:{msg_id}:{start}" - def __init__(self, chat_id: int, msg_id: int, start: int, target_len: int, callback: Callable = None) -> None: + def __init__(self, chat_id: int, msg_id: int, start: int, target_len: int) -> None: self.unique_id = MediaChunkHolder.generate_id(chat_id, msg_id, start) self.info = ChunkInfo(hashlib.md5(self.unique_id.encode()).hexdigest(), chat_id, msg_id, start, target_len) self.mem = bytes() self.length = len(self.mem) self.waiters = collections.deque() - self.callback = callback def __repr__(self) -> str: return f"MediaChunk,{self.info},unique_id:{self.unique_id}" @@ -136,13 +134,6 @@ class MediaChunkHolder(object): except ValueError: pass - def set_done(self) -> None: - if self.callback is None: - return - callback = self.callback - self.callback = None - callback(self) - def try_clear_waiter_and_requester(self) -> bool: if not self.is_completed(): return False @@ -236,15 +227,7 @@ class MediaChunkHolderManager(object): logger.warning(f"remove chunk,{err=},{traceback.format_exc()}") def create_media_chunk_holder(self, chat_id: int, msg_id: int, start: int, target_len: int) -> MediaChunkHolder: - def holder_completed_callback(holder: MediaChunkHolder): - cache_holder = self.incompleted_chunk.pop(holder.chunk_id, None) - if cache_holder is None: - logger.warning(f"the holder not in mem, {holder}") - return - logger.info(f"cache new chunk:{holder}") - self.disk_chunk_cache.set(holder.chunk_id, holder) - - return MediaChunkHolder(chat_id, msg_id, start, target_len, callback=holder_completed_callback) + return MediaChunkHolder(chat_id, msg_id, start, target_len) def get_media_chunk(self, msg: types.Message, start: int, lru: bool = True) -> Optional[MediaChunkHolder]: res = self._get_media_chunk_cache(msg, start) @@ -276,3 +259,14 @@ class MediaChunkHolderManager(object): if dummy is None: return self._remove_pop_chunk(dummy) + + def move_media_chunk_to_disk(self, holder: MediaChunkHolder) -> bool: + cache_holder = self.incompleted_chunk.pop(holder.chunk_id, None) + if cache_holder is None: + logger.warning(f"the holder not in mem, {holder}") + return False + if not holder.is_completed(): + logger.error(f"chunk not completed, but move to disk:{holder=}") + logger.info(f"cache new chunk:{holder}") + self.disk_chunk_cache.set(holder.chunk_id, holder) + return True diff --git a/backend/TgFileSystemClient.py b/backend/TgFileSystemClient.py index 30116b1..22645d4 100644 --- a/backend/TgFileSystemClient.py +++ b/backend/TgFileSystemClient.py @@ -335,7 +335,8 @@ class TgFileSystemClient(object): else: if not media_holder.try_clear_waiter_and_requester(): logger.error("I think never run here.") - media_holder.set_done() + if not self.media_chunk_manager.move_media_chunk_to_disk(media_holder): + logger.warning(f"move to disk failed, {media_holder=}") logger.debug(f"downloaded chunk:{offset=},{target_size=},{media_holder}") finally: pass diff --git a/backend/TgFileSystemClientManager.py b/backend/TgFileSystemClientManager.py index b98f901..0d44507 100644 --- a/backend/TgFileSystemClientManager.py +++ b/backend/TgFileSystemClientManager.py @@ -19,6 +19,12 @@ class TgFileSystemClientManager(object): param: configParse.TgToFileSystemParameter clients: dict[str, TgFileSystemClient] = {} + @classmethod + def get_instance(cls): + if not hasattr(TgFileSystemClientManager, "_instance"): + TgFileSystemClientManager._instance = TgFileSystemClientManager(configParse.get_TgToFileSystemParameter()) + return TgFileSystemClientManager._instance + def __init__(self, param: configParse.TgToFileSystemParameter) -> None: self.param = param self.db = UserManager() diff --git a/backend/api.py b/backend/api.py index 2df432e..1d55d75 100644 --- a/backend/api.py +++ b/backend/api.py @@ -15,23 +15,20 @@ from pydantic import BaseModel import configParse from backend import apiutils +from backend import api_implement as api from backend.TgFileSystemClientManager import TgFileSystemClientManager logger = logging.getLogger(__file__.split("/")[-1]) -clients_mgr: TgFileSystemClientManager = None - @asynccontextmanager async def lifespan(app: FastAPI): - for handler in logging.getLogger().handlers: - if isinstance(handler, logging.handlers.TimedRotatingFileHandler): - handler.suffix = "%Y-%m-%d" - global clients_mgr - param = configParse.get_TgToFileSystemParameter() - clients_mgr = TgFileSystemClientManager(param) + clients_mgr = TgFileSystemClientManager.get_instance() + res = await clients_mgr.get_status() + logger.info(f"init clients manager:{res}") yield + app = FastAPI(lifespan=lifespan) app.add_middleware( @@ -57,6 +54,7 @@ class TgToFileListRequestBody(BaseModel): async def search_tg_file_list(body: TgToFileListRequestBody): try: param = configParse.get_TgToFileSystemParameter() + clients_mgr = TgFileSystemClientManager.get_instance() res = hints.TotalList() res_type = "msg" client = await clients_mgr.get_client_force(body.token) @@ -91,6 +89,7 @@ async def search_tg_file_list(body: TgToFileListRequestBody): @apiutils.atimeit async def get_tg_file_list(body: TgToFileListRequestBody): try: + clients_mgr = TgFileSystemClientManager.get_instance() res = hints.TotalList() res_type = "chat" client = await clients_mgr.get_client_force(body.token) @@ -138,6 +137,7 @@ async def get_tg_file_media_stream(token: str, cid: int, mid: int, request: Requ } range_header = request.headers.get("range") try: + clients_mgr = TgFileSystemClientManager.get_instance() client = await clients_mgr.get_client_force(token) msg = await client.get_message(chat_id, msg_id) file_size = msg.media.document.size @@ -189,12 +189,14 @@ async def get_tg_file_media(chat_id: int|str, msg_id: int, file_name: str, sign: @app.get("/tg/api/v1/client/login") @apiutils.atimeit async def login_new_tg_file_client(): + clients_mgr = TgFileSystemClientManager.get_instance() url = await clients_mgr.login_clients() return {"url": url} @app.get("/tg/api/v1/client/status") async def get_tg_file_client_status(request: Request): + clients_mgr = TgFileSystemClientManager.get_instance() return await clients_mgr.get_status() @@ -202,27 +204,7 @@ async def get_tg_file_client_status(request: Request): @apiutils.atimeit async def convert_tg_msg_link_media_stream(link: str): try: - link_slice = link.split("/") - if len(link_slice) < 5: - raise RuntimeError("link format invalid") - chat_id_or_name, msg_id = link_slice[-2:] - is_msg_id = msg_id.isascii() and msg_id.isdecimal() - if not is_msg_id: - raise RuntimeError("message id invalid") - msg_id = int(msg_id) - is_chat_name = chat_id_or_name.isascii() and not chat_id_or_name.isdecimal() - is_chat_id = chat_id_or_name.isascii() and chat_id_or_name.isdecimal() - if not is_chat_name and not is_chat_id: - raise RuntimeError("chat id invalid") - client = clients_mgr.get_first_client() - if client is None: - raise RuntimeError("client not ready, login first pls.") - if is_chat_id: - chat_id_or_name = int(chat_id_or_name) - msg = await client.get_message(chat_id_or_name, msg_id) - file_name = apiutils.get_message_media_name(msg) - param = configParse.get_TgToFileSystemParameter() - url = f"{param.base.exposed_url}/tg/api/v1/file/get/{utils.get_peer_id(msg.peer_id)}/{msg.id}/{file_name}?sign={client.sign}" + url = await api.link_convert(link) logger.info(f"{link}: link convert to: {url}") return Response(json.dumps({"url": url}), status_code=status.HTTP_200_OK) except Exception as err: @@ -247,6 +229,7 @@ class TgToChatListRequestBody(BaseModel): @apiutils.atimeit async def get_tg_client_chat_list(body: TgToChatListRequestBody, request: Request): try: + clients_mgr = TgFileSystemClientManager.get_instance() res = hints.TotalList() res_type = "chat" client = await clients_mgr.get_client_force(body.token) diff --git a/backend/api_implement.py b/backend/api_implement.py new file mode 100644 index 0000000..221137a --- /dev/null +++ b/backend/api_implement.py @@ -0,0 +1,39 @@ +import traceback +import logging + +from telethon import types, hints, utils + +import configParse +from backend import apiutils +from backend.TgFileSystemClientManager import TgFileSystemClientManager + + +logger = logging.getLogger(__file__.split("/")[-1]) + + +async def link_convert(link: str) -> str: + clients_mgr = TgFileSystemClientManager.get_instance() + link_slice = link.split("/") + if len(link_slice) < 5: + raise RuntimeError("link format invalid") + chat_id_or_name, msg_id = link_slice[-2:] + is_msg_id = msg_id.isascii() and msg_id.isdecimal() + if not is_msg_id: + raise RuntimeError("message id invalid") + msg_id = int(msg_id) + is_chat_name = chat_id_or_name.isascii() and not chat_id_or_name.isdecimal() + is_chat_id = chat_id_or_name.isascii() and chat_id_or_name.isdecimal() + if not is_chat_name and not is_chat_id: + raise RuntimeError("chat id invalid") + client = clients_mgr.get_first_client() + if client is None: + raise RuntimeError("client not ready, login first pls.") + if is_chat_id: + chat_id_or_name = int(chat_id_or_name) + msg = await client.get_message(chat_id_or_name, msg_id) + file_name = apiutils.get_message_media_name(msg) + param = configParse.get_TgToFileSystemParameter() + url = ( + f"{param.base.exposed_url}/tg/api/v1/file/get/{utils.get_peer_id(msg.peer_id)}/{msg.id}/{file_name}?sign={client.sign}" + ) + return url