Files
folkugat-web/folkugat_web/services/files.py
2025-04-27 17:00:04 +02:00

115 lines
3.6 KiB
Python

import itertools
import mimetypes
import os
import re
import uuid
from collections.abc import Iterator
from contextlib import asynccontextmanager
from pathlib import Path
import aiofiles
import magic
from fastapi import HTTPException, UploadFile
from folkugat_web.config import db
from folkugat_web.dal.sql.temes import links as links_dal
from folkugat_web.dal.sql.temes import scores as scores_dal
from folkugat_web.log import logger
async def get_mimetype(upload_file: UploadFile) -> str:
info = magic.detect_from_content(await upload_file.read(2048))
await upload_file.seek(0)
return info.mime_type
ACCEPTED_MIMETYPES = [
re.compile(r"image/.+"),
re.compile(r".+/pdf"),
]
def check_mimetype(mimetype: str) -> None:
if not any(regex.match(mimetype) for regex in ACCEPTED_MIMETYPES):
raise HTTPException(status_code=400, detail=f"Unsupported file type: {mimetype}")
def get_db_file_path(filepath: Path) -> str:
return f"{db.DB_FILES_URL}/{filepath.relative_to(db.DB_FILES_DIR)}"
async def store_file(tema_id: int, upload_file: UploadFile) -> str:
if not upload_file.size:
raise HTTPException(status_code=400, detail="Couldn't find out the size of the file")
if upload_file.size > db.FILE_MAX_SIZE:
raise HTTPException(
status_code=400,
detail=f"The uploaded file is too big (max size = {db.FILE_MAX_SIZE} bytes)",
)
mimetype = await get_mimetype(upload_file)
check_mimetype(mimetype)
extension = mimetypes.guess_extension(mimetype) or ""
filepath = create_tema_filename(tema_id=tema_id, extension=extension)
with open(filepath, "wb") as f:
_ = f.write(await upload_file.read())
return get_db_file_path(filepath)
def create_tema_filename(tema_id: int, extension: str = "") -> Path:
filename = str(uuid.uuid4().hex) + extension
filedir = db.DB_FILES_TEMA_DIR / str(tema_id)
filedir.mkdir(parents=True, exist_ok=True)
filepath = filedir / filename
return filepath
def create_tmp_filename(extension: str = "") -> Path:
filename = str(uuid.uuid4().hex) + extension
filepath = db.DB_FILES_TMP_DIR / filename
return filepath
def get_set_filename(filename: str) -> Path:
return db.DB_FILES_SET_DIR / filename
@asynccontextmanager
async def tmp_file(content: str):
input_filename = create_tmp_filename(extension=".ly")
async with aiofiles.open(input_filename, "w") as f:
_ = await f.write(content)
try:
yield input_filename
finally:
if input_filename.exists():
os.remove(input_filename)
def list_files(tema_id: str) -> list[str]:
filedir = db.DB_FILES_TEMA_DIR / str(tema_id)
return [get_db_file_path(f) for f in filedir.iterdir()]
def get_orphan_files() -> Iterator[Path]:
link_urls = {link.url for link in links_dal.get_links()}
score_pdf_urls = {score.pdf_url for score in scores_dal.get_scores() if score.pdf_url is not None}
score_img_urls = {score.img_url for score in scores_dal.get_scores() if score.img_url is not None}
score_preview_urls = {score.preview_url for score in scores_dal.get_scores() if score.preview_url is not None}
alive_urls = link_urls | score_pdf_urls | score_img_urls | score_preview_urls
return filter(
lambda p: p.is_file() and get_db_file_path(p) not in alive_urls,
itertools.chain(
db.DB_FILES_TEMA_DIR.rglob("*"),
db.DB_FILES_TMP_DIR.rglob("*"),
)
)
def clean_orphan_files():
for path in get_orphan_files():
logger.info(f"Deleting the orphan file: {path}")
os.remove(path)