diff --git a/backend/MediaCacheManager.py b/backend/MediaCacheManager.py index 20fdd39..67260ae 100644 --- a/backend/MediaCacheManager.py +++ b/backend/MediaCacheManager.py @@ -3,6 +3,7 @@ import logging import bisect import collections import asyncio +import traceback import collections from typing import Union, Optional @@ -17,7 +18,6 @@ class MediaChunkHolder(object): waiters: collections.deque[asyncio.Future] requester: list[Request] = [] chunk_id: int = 0 - is_done: bool = False def __init__(self, chat_id: int, msg_id: int, start: int, target_len: int, mem: Optional[bytes] = None) -> None: self.chat_id = chat_id @@ -53,11 +53,6 @@ class MediaChunkHolder(object): def is_completed(self) -> bool: return self.length >= self.target_len - - def set_done(self) -> None: - # self.is_done = True - # self.notify_waiters() - self.requester.clear() def notify_waiters(self) -> None: while self.waiters: @@ -69,15 +64,15 @@ class MediaChunkHolder(object): self.mem = mem self.length = len(self.mem) if self.length > self.target_len: - raise RuntimeWarning( - f"MeidaChunk Overflow:start:{self.start},len:{self.length},tlen:{self.target_len}") + logger.warning(RuntimeWarning( + f"MeidaChunk Overflow:start:{self.start},len:{self.length},tlen:{self.target_len}")) def append_chunk_mem(self, mem: bytes) -> None: self.mem = self.mem + mem self.length = len(self.mem) if self.length > self.target_len: - raise RuntimeWarning( - f"MeidaChunk Overflow:start:{self.start},len:{self.length},tlen:{self.target_len}") + logger.warning(RuntimeWarning( + f"MeidaChunk Overflow:start:{self.start},len:{self.length},tlen:{self.target_len}")) self.notify_waiters() def add_chunk_requester(self, req: Request) -> None: @@ -85,15 +80,18 @@ class MediaChunkHolder(object): async def is_disconneted(self) -> bool: while self.requester: - res = await self.requester[0].is_disconnected() - if res: - self.requester.pop(0) - continue - return res + req = self.requester[0] + if not await req.is_disconnected(): + return False + try: + self.requester.remove(req) + except Exception as err: + logger.warning(f"{err=}, trace:{traceback.format_exc()}") + return False return True async def wait_chunk_update(self) -> None: - if self.is_done: + if self.is_completed(): return waiter = asyncio.Future() self.waiters.append(waiter) diff --git a/backend/TgFileSystemClient.py b/backend/TgFileSystemClient.py index 97acf8f..4112d36 100644 --- a/backend/TgFileSystemClient.py +++ b/backend/TgFileSystemClient.py @@ -21,7 +21,7 @@ logger = logging.getLogger(__file__.split("/")[-1]) class TgFileSystemClient(object): MAX_WORKER_ROUTINE = 4 - SINGLE_NET_CHUNK_SIZE = 256 * 1024 # 256kb + SINGLE_NET_CHUNK_SIZE = 512 * 1024 # 512kb SINGLE_MEDIA_SIZE = 5 * 1024 * 1024 # 5mb api_id: int api_hash: str @@ -245,16 +245,19 @@ class TgFileSystemClient(object): if remain_size <= 0: media_holder.append_chunk_mem( chunk[:len(chunk)+remain_size]) + else: + media_holder.append_chunk_mem(chunk) + if media_holder.is_completed(): break - media_holder.append_chunk_mem(chunk) + if await media_holder.is_disconneted(): + raise asyncio.CancelledError("all requester canceled.") except asyncio.CancelledError as err: - logger.warning(f"cancel holder:{media_holder}") + logger.info(f"cancel holder:{media_holder}") self.media_chunk_manager.cancel_media_chunk(media_holder) except Exception as err: logger.error( f"_download_media_chunk err:{err=},{offset=},{target_size=},{media_holder},\r\n{err=}\r\n{traceback.format_exc()}") finally: - media_holder.set_done() logger.debug( f"downloaded chunk:{time.time()}.{offset=},{target_size=},{media_holder}") @@ -279,7 +282,7 @@ class TgFileSystemClient(object): msg.chat_id, msg.id, align_pos, align_size) holder.add_chunk_requester(req) self.media_chunk_manager.set_media_chunk(holder) - await self.task_queue.put((cur_task_id, self._download_media_chunk(msg, holder))) + self.task_queue.put_nowait((cur_task_id, self._download_media_chunk(msg, holder))) elif not cache_chunk.is_completed(): # yield return completed part # await untill completed or pos > end diff --git a/start.py b/start.py index 81a3199..6a53aa7 100644 --- a/start.py +++ b/start.py @@ -58,4 +58,4 @@ if __name__ == "__main__": if ret == 0: asyncio.get_event_loop().run_until_complete(run_web_server()) sys.exit(0) - uvicorn.run(backendapp, host="0.0.0.0", port=param.base.port) + uvicorn.run(backendapp, host="0.0.0.0", port=param.base.port, app_dir="backend")