import mimetypes import os import re import uuid from collections.abc import Iterator from pathlib import Path import magic from fastapi import HTTPException, UploadFile from folkugat_web.config import db from folkugat_web.dal.sql.temes import links as links_dal from folkugat_web.dal.sql.temes import scores as scores_dal from folkugat_web.log import logger async def get_mimetype(upload_file: UploadFile) -> str: info = magic.detect_from_content(await upload_file.read(2048)) await upload_file.seek(0) return info.mime_type ACCEPTED_MIMETYPES = [ re.compile(r"image/.+"), re.compile(r".+/pdf"), ] def check_mimetype(mimetype: str) -> None: if not any(regex.match(mimetype) for regex in ACCEPTED_MIMETYPES): raise HTTPException(status_code=400, detail=f"Unsupported file type: {mimetype}") def get_db_file_path(filepath: Path) -> str: return f"{db.DB_FILES_URL}/{filepath.relative_to(db.DB_FILES_DIR)}" async def store_file(tema_id: int, upload_file: UploadFile) -> str: if not upload_file.size: raise HTTPException(status_code=400, detail="Couldn't find out the size of the file") if upload_file.size > db.FILE_MAX_SIZE: raise HTTPException( status_code=400, detail=f"The uploaded file is too big (max size = {db.FILE_MAX_SIZE} bytes)", ) mimetype = await get_mimetype(upload_file) check_mimetype(mimetype) extension = mimetypes.guess_extension(mimetype) or "" filepath = create_tema_filename(tema_id=tema_id, extension=extension) with open(filepath, "wb") as f: _ = f.write(await upload_file.read()) return get_db_file_path(filepath) def create_tema_filename(tema_id: int, extension: str = "") -> Path: filename = str(uuid.uuid4().hex) + extension filedir = db.DB_FILES_DIR / str(tema_id) filedir.mkdir(exist_ok=True) filepath = filedir / filename return filepath def create_tmp_filename(extension: str = "") -> Path: filename = str(uuid.uuid4().hex) + extension filedir = db.DB_FILES_DIR / "tmp" filedir.mkdir(exist_ok=True) filepath = filedir / filename return filepath def list_files(tema_id: str) -> list[str]: filedir = db.DB_FILES_DIR / str(tema_id) return [get_db_file_path(f) for f in filedir.iterdir()] def get_orphan_files() -> Iterator[Path]: link_urls = {link.url for link in links_dal.get_links()} score_pdf_urls = {score.pdf_url for score in scores_dal.get_scores() if score.pdf_url is not None} score_img_urls = {score.img_url for score in scores_dal.get_scores() if score.img_url is not None} alive_urls = link_urls | score_pdf_urls | score_img_urls return filter( lambda p: p.is_file() and get_db_file_path(p) not in alive_urls, db.DB_FILES_DIR.rglob("*"), ) def clean_orphan_files(): for path in get_orphan_files(): logger.info(f"Deleting the orphan file: {path}") os.remove(path)