Afegir cartells a les jams (i esquelet per a slow jams i notes)
This commit is contained in:
@@ -3,7 +3,7 @@ import mimetypes
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
from collections.abc import Iterator
|
||||
from collections.abc import Iterable, Iterator
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
|
||||
@@ -11,6 +11,7 @@ import aiofiles
|
||||
import magic
|
||||
from fastapi import HTTPException, UploadFile
|
||||
from folkugat_web.config import db
|
||||
from folkugat_web.dal.sql.sessions import query as sessions_dal
|
||||
from folkugat_web.dal.sql.temes import links as links_dal
|
||||
from folkugat_web.dal.sql.temes import scores as scores_dal
|
||||
from folkugat_web.log import logger
|
||||
@@ -22,22 +23,16 @@ async def get_mimetype(upload_file: UploadFile) -> str:
|
||||
return info.mime_type
|
||||
|
||||
|
||||
ACCEPTED_MIMETYPES = [
|
||||
re.compile(r"image/.+"),
|
||||
re.compile(r".+/pdf"),
|
||||
]
|
||||
IMAGE_MIMETYPE = re.compile(r"image/.+")
|
||||
PDF_MIMETYPE = re.compile(r".+/pdf")
|
||||
|
||||
|
||||
def check_mimetype(mimetype: str) -> None:
|
||||
if not any(regex.match(mimetype) for regex in ACCEPTED_MIMETYPES):
|
||||
def check_mimetype(mimetype: str, accepted_mimetypes: Iterable[re.Pattern[str]]) -> None:
|
||||
if not any(regex.match(mimetype) for regex in accepted_mimetypes):
|
||||
raise HTTPException(status_code=400, detail=f"Unsupported file type: {mimetype}")
|
||||
|
||||
|
||||
def get_db_file_path(filepath: Path) -> str:
|
||||
return f"{db.DB_FILES_URL}/{filepath.relative_to(db.DB_FILES_DIR)}"
|
||||
|
||||
|
||||
async def store_file(tema_id: int, upload_file: UploadFile) -> str:
|
||||
def check_upload_file_size(upload_file: UploadFile) -> None:
|
||||
if not upload_file.size:
|
||||
raise HTTPException(status_code=400, detail="Couldn't find out the size of the file")
|
||||
if upload_file.size > db.FILE_MAX_SIZE:
|
||||
@@ -46,8 +41,15 @@ async def store_file(tema_id: int, upload_file: UploadFile) -> str:
|
||||
detail=f"The uploaded file is too big (max size = {db.FILE_MAX_SIZE} bytes)",
|
||||
)
|
||||
|
||||
|
||||
def get_db_file_path(filepath: Path) -> str:
|
||||
return f"{db.DB_FILES_URL}/{filepath.relative_to(db.DB_FILES_DIR)}"
|
||||
|
||||
|
||||
async def store_tema_file(tema_id: int, upload_file: UploadFile) -> str:
|
||||
check_upload_file_size(upload_file)
|
||||
mimetype = await get_mimetype(upload_file)
|
||||
check_mimetype(mimetype)
|
||||
check_mimetype(mimetype, [IMAGE_MIMETYPE, PDF_MIMETYPE])
|
||||
|
||||
extension = mimetypes.guess_extension(mimetype) or ""
|
||||
filepath = create_tema_filename(tema_id=tema_id, extension=extension)
|
||||
@@ -58,6 +60,20 @@ async def store_file(tema_id: int, upload_file: UploadFile) -> str:
|
||||
return get_db_file_path(filepath)
|
||||
|
||||
|
||||
async def store_session_cartell(session_id: int, upload_file: UploadFile) -> str:
|
||||
check_upload_file_size(upload_file)
|
||||
mimetype = await get_mimetype(upload_file)
|
||||
check_mimetype(mimetype, [IMAGE_MIMETYPE])
|
||||
|
||||
extension = mimetypes.guess_extension(mimetype) or ""
|
||||
filepath = create_cartell_filename(session_id=session_id, extension=extension)
|
||||
|
||||
with open(filepath, "wb") as f:
|
||||
_ = f.write(await upload_file.read())
|
||||
|
||||
return get_db_file_path(filepath)
|
||||
|
||||
|
||||
def create_tema_filename(tema_id: int, extension: str = "") -> Path:
|
||||
filename = str(uuid.uuid4().hex) + extension
|
||||
filedir = db.DB_FILES_TEMA_DIR / str(tema_id)
|
||||
@@ -66,6 +82,14 @@ def create_tema_filename(tema_id: int, extension: str = "") -> Path:
|
||||
return filepath
|
||||
|
||||
|
||||
def create_cartell_filename(session_id: int, extension: str = "") -> Path:
|
||||
filename = str(uuid.uuid4().hex) + extension
|
||||
filedir = db.DB_FILES_SESSION_DIR / str(session_id) / "cartell"
|
||||
filedir.mkdir(parents=True, exist_ok=True)
|
||||
filepath = filedir / filename
|
||||
return filepath
|
||||
|
||||
|
||||
def create_tmp_filename(extension: str = "") -> Path:
|
||||
filename = str(uuid.uuid4().hex) + extension
|
||||
filepath = db.DB_FILES_TMP_DIR / filename
|
||||
@@ -94,11 +118,13 @@ def list_files(tema_id: str) -> list[str]:
|
||||
|
||||
|
||||
def get_orphan_files() -> Iterator[Path]:
|
||||
link_urls = {link.url for link in links_dal.get_links()}
|
||||
score_pdf_urls = {score.pdf_url for score in scores_dal.get_scores() if score.pdf_url is not None}
|
||||
score_img_urls = {score.img_url for score in scores_dal.get_scores() if score.img_url is not None}
|
||||
score_preview_urls = {score.preview_url for score in scores_dal.get_scores() if score.preview_url is not None}
|
||||
alive_urls = link_urls | score_pdf_urls | score_img_urls | score_preview_urls
|
||||
alive_urls = (
|
||||
{link.url for link in links_dal.get_links()}
|
||||
| {score.pdf_url for score in scores_dal.get_scores() if score.pdf_url is not None}
|
||||
| {score.img_url for score in scores_dal.get_scores() if score.img_url is not None}
|
||||
| {score.preview_url for score in scores_dal.get_scores() if score.preview_url is not None}
|
||||
| {session.cartell_url for session in sessions_dal.get_sessions() if session.cartell_url}
|
||||
)
|
||||
return filter(
|
||||
lambda p: p.is_file() and get_db_file_path(p) not in alive_urls,
|
||||
itertools.chain(
|
||||
|
||||
Reference in New Issue
Block a user