TgToFileSystem/backend/apiutils.py

230 lines
7.0 KiB
Python

import time
import logging
from fastapi import status, HTTPException
from telethon import types, utils
from functools import wraps
import configParse
logger = logging.getLogger(__file__.split("/")[-1])
def get_range_header(range_header: str, file_size: int) -> tuple[int, int]:
def _invalid_range():
return HTTPException(
status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE,
detail=f"Invalid request range (Range:{range_header!r})",
)
try:
h = range_header.replace("bytes=", "").split("-")
start = int(h[0]) if h[0] != "" else 0
end = int(h[1]) if h[1] != "" else file_size - 1
except ValueError:
raise _invalid_range()
if start > end or start < 0 or end > file_size - 1:
raise _invalid_range()
return start, end
def _get_message_media_document_kind_and_names(document: types.MessageMediaDocument) -> tuple[str, str]:
"""Gets kind and possible names for :tl:`DocumentAttribute`."""
kind = "document"
possible_names = []
for attr in document.attributes:
if isinstance(attr, types.DocumentAttributeFilename):
possible_names.insert(0, attr.file_name)
elif isinstance(attr, types.DocumentAttributeAudio):
kind = "audio"
if attr.performer and attr.title:
possible_names.append("{} - {}".format(attr.performer, attr.title))
elif attr.performer:
possible_names.append(attr.performer)
elif attr.title:
possible_names.append(attr.title)
elif attr.voice:
kind = "voice"
return kind, possible_names
def get_message_media_name(msg: types.Message) -> str:
if msg.media is None:
return ""
match type(msg.media):
case types.MessageMediaPhoto:
return f"{msg.media.photo.id}.jpg"
case types.MessageMediaDocument:
kind, possible_names = _get_message_media_document_kind_and_names(msg.media.document)
try:
name = None if possible_names is None else next(x for x in possible_names if x)
except StopIteration:
name = None
if name:
return name
extension = utils.get_extension(msg.media)
peer_id = utils.get_peer_id(msg.peer_id)
return f"{kind}_{peer_id}-{msg.id}{extension}"
case _:
return ""
def _get_message_media_valid_photo(msg: types.Message) -> types.Photo | None:
if msg.media is None:
return None
photo = msg.media
if isinstance(photo, types.MessageMediaPhoto):
photo = photo.photo
if not isinstance(photo, types.Photo):
return None
return photo
def _sort_message_media_photo_thumbs(thumbs: list[any]) -> list[any]:
def sort_thumbs(thumb):
if isinstance(thumb, types.PhotoStrippedSize):
return 1, len(thumb.bytes)
if isinstance(thumb, types.PhotoCachedSize):
return 1, len(thumb.bytes)
if isinstance(thumb, types.PhotoSize):
return 1, thumb.size
if isinstance(thumb, types.PhotoSizeProgressive):
return 1, max(thumb.sizes)
if isinstance(thumb, types.VideoSize):
return 2, thumb.size
# Empty size or invalid should go last
return 0, 0
thumbs = list(sorted(thumbs), key=sort_thumbs)
for i in reversed(range(len(thumbs))):
if isinstance(thumbs[i], types.PhotoPathSize):
thumbs.pop(i)
return thumbs
def _get_message_media_photo_file_last_photo_size(thumbs: list[any]):
thumbs = _sort_message_media_photo_thumbs(thumbs)
size = thumbs[-1] if thumbs else None
if not size or isinstance(size, types.PhotoSizeEmpty):
return None
return size
def get_message_media_photo_file_name(msg: types.Message) -> str:
photo = _get_message_media_valid_photo(msg)
if not photo:
return ""
size = _get_message_media_photo_file_last_photo_size(photo.sizes + (photo.video_sizes or []))
if not size:
return ""
if isinstance(size, types.VideoSize):
return f"{photo.id}.mp4"
return f"{photo.id}.jpg"
def get_message_media_photo_file_size(msg: types.Message) -> int:
photo = _get_message_media_valid_photo(msg)
if not photo:
return 0
size = _get_message_media_photo_file_last_photo_size(photo.sizes + (photo.video_sizes or []))
if not size:
return 0
if isinstance(size, types.PhotoStrippedSize):
return len(utils.stripped_photo_to_jpg(size.bytes))
elif isinstance(size, types.PhotoCachedSize):
return len(size.bytes)
if isinstance(size, types.PhotoSizeProgressive):
return max(size.sizes)
return size.size
def get_message_media_name_from_dict(msg: dict[str, any]) -> str:
doc = None
try:
doc = msg["media"]["document"]
except:
pass
file_name = None
if doc is not None:
for attr in doc["attributes"]:
file_name = attr.get("file_name")
if file_name != "" and file_name is not None:
break
if file_name == "" or file_name is None:
file_name = "unknown.tmp"
return file_name
def get_message_chat_id_from_dict(msg: dict[str, any]) -> int:
try:
return msg["peer_id"]["channel_id"]
except:
pass
return 0
def get_message_msg_id_from_dict(msg: dict[str, any]) -> int:
try:
return msg["id"]
except:
pass
return 0
def timeit_sec(func):
@wraps(func)
def timeit_wrapper(*args, **kwargs):
logger.debug(f"Function called {func.__name__}{args} {kwargs}")
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
total_time = end_time - start_time
logger.debug(f"Function quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds")
return result
return timeit_wrapper
def timeit(func):
if configParse.get_TgToFileSystemParameter().base.timeit_enable:
@wraps(func)
def timeit_wrapper(*args, **kwargs):
logger.debug(f"Function called {func.__name__}{args} {kwargs}")
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
total_time = end_time - start_time
logger.debug(f"Function quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds")
return result
return timeit_wrapper
return func
def atimeit(func):
if configParse.get_TgToFileSystemParameter().base.timeit_enable:
@wraps(func)
async def timeit_wrapper(*args, **kwargs):
logger.debug(f"AFunction called {func.__name__}{args} {kwargs}")
start_time = time.perf_counter()
result = await func(*args, **kwargs)
end_time = time.perf_counter()
total_time = end_time - start_time
logger.debug(f"AFunction quited {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds")
return result
return timeit_wrapper
return func