import mimetypes import os import re import uuid from collections.abc import Iterator from contextlib import asynccontextmanager from pathlib import Path import aiofiles import magic from fastapi import HTTPException, UploadFile from folkugat_web.config import db from folkugat_web.dal.sql.temes import links as links_dal from folkugat_web.dal.sql.temes import scores as scores_dal from folkugat_web.log import logger async def get_mimetype(upload_file: UploadFile) -> str: info = magic.detect_from_content(await upload_file.read(2048)) await upload_file.seek(0) return info.mime_type ACCEPTED_MIMETYPES = [ re.compile(r"image/.+"), re.compile(r".+/pdf"), ] def check_mimetype(mimetype: str) -> None: if not any(regex.match(mimetype) for regex in ACCEPTED_MIMETYPES): raise HTTPException(status_code=400, detail=f"Unsupported file type: {mimetype}") def get_db_file_path(filepath: Path) -> str: return f"{db.DB_FILES_URL}/{filepath.relative_to(db.DB_FILES_DIR)}" async def store_file(tema_id: int, upload_file: UploadFile) -> str: if not upload_file.size: raise HTTPException(status_code=400, detail="Couldn't find out the size of the file") if upload_file.size > db.FILE_MAX_SIZE: raise HTTPException( status_code=400, detail=f"The uploaded file is too big (max size = {db.FILE_MAX_SIZE} bytes)", ) mimetype = await get_mimetype(upload_file) check_mimetype(mimetype) extension = mimetypes.guess_extension(mimetype) or "" filepath = create_tema_filename(tema_id=tema_id, extension=extension) with open(filepath, "wb") as f: _ = f.write(await upload_file.read()) return get_db_file_path(filepath) def create_tema_filename(tema_id: int, extension: str = "") -> Path: filename = str(uuid.uuid4().hex) + extension filedir = db.DB_FILES_DIR / "tema" / str(tema_id) filedir.mkdir(parents=True, exist_ok=True) filepath = filedir / filename return filepath def create_tmp_filename(extension: str = "") -> Path: filename = str(uuid.uuid4().hex) + extension filedir = db.DB_FILES_DIR / "tmp" filedir.mkdir(exist_ok=True) filepath = filedir / filename return filepath @asynccontextmanager async def tmp_file(content: str): input_filename = create_tmp_filename(extension=".ly") async with aiofiles.open(input_filename, "w") as f: _ = await f.write(content) try: yield input_filename finally: if input_filename.exists(): os.remove(input_filename) def list_files(tema_id: str) -> list[str]: filedir = db.DB_FILES_DIR / str(tema_id) return [get_db_file_path(f) for f in filedir.iterdir()] def get_orphan_files() -> Iterator[Path]: link_urls = {link.url for link in links_dal.get_links()} score_pdf_urls = {score.pdf_url for score in scores_dal.get_scores() if score.pdf_url is not None} score_img_urls = {score.img_url for score in scores_dal.get_scores() if score.img_url is not None} alive_urls = link_urls | score_pdf_urls | score_img_urls return filter( lambda p: p.is_file() and get_db_file_path(p) not in alive_urls, db.DB_FILES_DIR.rglob("*"), ) def clean_orphan_files(): for path in get_orphan_files(): logger.info(f"Deleting the orphan file: {path}") os.remove(path)