fix: requester been cancel unexpected & add logs

This commit is contained in:
hehesheng 2024-06-09 01:12:27 +08:00
parent d53cc83e3f
commit 50af6974da
4 changed files with 13 additions and 10 deletions

View File

@ -42,8 +42,8 @@ class ChunkInfo(object):
@functools.total_ordering @functools.total_ordering
class MediaChunkHolder(object): class MediaChunkHolder(object):
waiters: collections.deque[asyncio.Future] waiters: collections.deque[asyncio.Future]
requesters: list[Request] = [] requesters: list[Request]
unique_id: str = "" unique_id: str
info: ChunkInfo info: ChunkInfo
@staticmethod @staticmethod
@ -56,9 +56,11 @@ class MediaChunkHolder(object):
self.mem = bytes() self.mem = bytes()
self.length = len(self.mem) self.length = len(self.mem)
self.waiters = collections.deque() self.waiters = collections.deque()
self.requesters = []
def __repr__(self) -> str: def __repr__(self) -> str:
return f"MediaChunk,unique_id:{self.unique_id},{self.info},mlen:{self.length}" pl = [f"{id(req)}" for req in self.requesters]
return f"MediaChunk,unique_id:{self.unique_id},{self.info},mlen:{self.length},req:{pl}"
def __eq__(self, other: Union["MediaChunkHolder", ChunkInfo, int]): def __eq__(self, other: Union["MediaChunkHolder", ChunkInfo, int]):
if isinstance(other, int): if isinstance(other, int):
@ -110,11 +112,12 @@ class MediaChunkHolder(object):
async def is_disconneted(self) -> bool: async def is_disconneted(self) -> bool:
while self.requesters: while self.requesters:
req = self.requesters[0]
if not await req.is_disconnected():
return False
try: try:
req = self.requesters[0]
if not await req.is_disconnected():
return False
self.requesters.remove(req) self.requesters.remove(req)
logger.info(f"remove req:{id(req)=},{self}")
except Exception as err: except Exception as err:
logger.warning(f"{err=}, trace:{traceback.format_exc()}") logger.warning(f"{err=}, trace:{traceback.format_exc()}")
return False return False
@ -127,9 +130,9 @@ class MediaChunkHolder(object):
self.waiters.append(waiter) self.waiters.append(waiter)
try: try:
await waiter await waiter
except: except Exception as err:
waiter.cancel() waiter.cancel()
logger.warning("waiter cancel") logger.warning(f"waiter cancel:{err=},{traceback.format_exc()}")
try: try:
self.waiters.remove(waiter) self.waiters.remove(waiter)
except ValueError: except ValueError:

View File

@ -356,6 +356,7 @@ class TgFileSystemClient(object):
align_pos = pos align_pos = pos
align_size = min(self.SINGLE_MEDIA_SIZE, file_size - align_pos) align_size = min(self.SINGLE_MEDIA_SIZE, file_size - align_pos)
holder = self.media_chunk_manager.create_media_chunk_holder(msg.chat_id, msg.id, align_pos, align_size) holder = self.media_chunk_manager.create_media_chunk_holder(msg.chat_id, msg.id, align_pos, align_size)
logger.info(f"new holder create:{holder}")
holder.add_chunk_requester(req) holder.add_chunk_requester(req)
self.media_chunk_manager.set_media_chunk(holder) self.media_chunk_manager.set_media_chunk(holder)
self.task_queue.put_nowait((cur_task_id, self._download_media_chunk(msg, holder))) self.task_queue.put_nowait((cur_task_id, self._download_media_chunk(msg, holder)))

View File

@ -177,7 +177,6 @@ async def get_tg_file_media_stream(token: str, cid: int, mid: int, request: Requ
@app.get("/tg/api/v1/file/get/{chat_id}/{msg_id}/{file_name}") @app.get("/tg/api/v1/file/get/{chat_id}/{msg_id}/{file_name}")
@apiutils.atimeit @apiutils.atimeit
async def get_tg_file_media(chat_id: int|str, msg_id: int, file_name: str, sign: str, req: Request): async def get_tg_file_media(chat_id: int|str, msg_id: int, file_name: str, sign: str, req: Request):
logger.info(f"request: {chat_id=},{msg_id=},{file_name=},{req=},{id(req)=}")
try: try:
if isinstance(chat_id, str): if isinstance(chat_id, str):
chat_id = int(chat_id) chat_id = int(chat_id)

View File

@ -9,7 +9,7 @@ st.set_page_config(page_title="TgToolbox", page_icon="🕹️", layout="wide", i
backend_status = api.get_backend_client_status() backend_status = api.get_backend_client_status()
need_login = False need_login = False
if not backend_status["init"]: if backend_status is None or not backend_status["init"]:
st.status("Server not ready") st.status("Server not ready")
time.sleep(0.5) time.sleep(0.5)
st.rerun() st.rerun()