Tune editor
This commit is contained in:
58
folkugat_web/services/files.py
Normal file
58
folkugat_web/services/files.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import mimetypes
|
||||
import re
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
import magic
|
||||
from fastapi import HTTPException, UploadFile
|
||||
from folkugat_web.config import db
|
||||
|
||||
|
||||
async def get_mimetype(upload_file: UploadFile) -> str:
|
||||
info = magic.detect_from_content(await upload_file.read(2048))
|
||||
await upload_file.seek(0)
|
||||
return info.mime_type
|
||||
|
||||
|
||||
ACCEPTED_MIMETYPES = [
|
||||
re.compile(r"image/.+"),
|
||||
re.compile(r".+/pdf"),
|
||||
]
|
||||
|
||||
|
||||
def check_mimetype(mimetype: str) -> None:
|
||||
if not any(regex.match(mimetype) for regex in ACCEPTED_MIMETYPES):
|
||||
raise HTTPException(status_code=400, detail=f"Unsupported file type: {mimetype}")
|
||||
|
||||
|
||||
def get_db_file_path(filepath: Path) -> str:
|
||||
return f"{db.DB_FILES_URL}/{filepath.relative_to(db.DB_FILES_DIR)}"
|
||||
|
||||
|
||||
async def store_file(tema_id: int, upload_file: UploadFile) -> str:
|
||||
if not upload_file.size:
|
||||
raise HTTPException(status_code=400, detail="Couldn't find out the size of the file")
|
||||
if upload_file.size > db.FILE_MAX_SIZE:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"The uploaded file is too big (max size = {db.FILE_MAX_SIZE} bytes)",
|
||||
)
|
||||
|
||||
mimetype = await get_mimetype(upload_file)
|
||||
check_mimetype(mimetype)
|
||||
|
||||
extension = mimetypes.guess_extension(mimetype) or ""
|
||||
filename = str(uuid.uuid4().hex) + extension
|
||||
filedir = db.DB_FILES_DIR / str(tema_id)
|
||||
filedir.mkdir(exist_ok=True)
|
||||
filepath = filedir / filename
|
||||
|
||||
with open(filepath, "wb") as f:
|
||||
f.write(await upload_file.read())
|
||||
|
||||
return get_db_file_path(filepath)
|
||||
|
||||
|
||||
def list_files(tema_id: str) -> list[str]:
|
||||
filedir = db.DB_FILES_DIR / str(tema_id)
|
||||
return [get_db_file_path(f) for f in filedir.iterdir()]
|
||||
@@ -1,4 +1,7 @@
|
||||
import functools
|
||||
import time
|
||||
import typing
|
||||
from collections.abc import Callable, Iterable, Iterator
|
||||
|
||||
import Levenshtein
|
||||
from folkugat_web.config import search as config
|
||||
@@ -18,9 +21,10 @@ def get_query_word_similarity(query_word: str, text_ngrams: model.NGrams) -> sea
|
||||
candidate_ngrams = ((m, ngram)
|
||||
for m, ngrams in map(lambda i: (i, text_ngrams.get(i, [])), ns)
|
||||
for ngram in ngrams)
|
||||
return min(search_model.SearchMatch(distance=Levenshtein.distance(query_word, ngram)/m,
|
||||
ngram=ngram)
|
||||
for m, ngram in candidate_ngrams)
|
||||
return min((search_model.SearchMatch(distance=Levenshtein.distance(query_word, ngram)/m,
|
||||
ngram=ngram)
|
||||
for m, ngram in candidate_ngrams),
|
||||
default=search_model.SearchMatch(distance=float("inf"), ngram=""))
|
||||
|
||||
|
||||
def get_query_similarity(query: str, ngrams: model.NGrams) -> search_model.SearchMatch:
|
||||
@@ -44,14 +48,24 @@ def build_result(query: str, entry: tuple[int, model.NGrams]) -> search_model.Qu
|
||||
)
|
||||
|
||||
|
||||
def busca_temes(query: str) -> list[model.Tema]:
|
||||
T = typing.TypeVar("T")
|
||||
|
||||
|
||||
def _thread(it: Iterable[T], *funcs: Callable[[Iterable], Iterable]) -> Iterable:
|
||||
return functools.reduce(lambda i, fn: fn(i), funcs, it)
|
||||
|
||||
|
||||
def busca_temes(query: str, hidden: bool = False, limit: int = 20, offset: int = 0) -> list[model.Tema]:
|
||||
t0 = time.time()
|
||||
with get_connection() as con:
|
||||
tema_id_to_ngrams = temes_q.get_tema_id_to_ngrams(con)
|
||||
search_results = (build_result(query, entry) for entry in tema_id_to_ngrams.items())
|
||||
filtered_results = filter(lambda qr: qr.distance <= config.SEARCH_DISTANCE_THRESHOLD, search_results)
|
||||
# filtered_results = filter(lambda qr: True, search_results)
|
||||
sorted_results = sorted(filtered_results, key=lambda qr: qr.distance)
|
||||
sorted_temes = list(filter(None, map(lambda qr: temes_q.get_tema_by_id(qr.id, con), sorted_results)))
|
||||
result = _thread(
|
||||
temes_q.get_tema_id_to_ngrams(con).items(),
|
||||
lambda tema_id_to_ngrams: (build_result(query, entry) for entry in tema_id_to_ngrams),
|
||||
lambda results: filter(lambda qr: qr.distance <= config.SEARCH_DISTANCE_THRESHOLD, results),
|
||||
lambda results: sorted(results, key=lambda qr: qr.distance),
|
||||
lambda results: filter(None, map(lambda qr: temes_q.get_tema_by_id(qr.id, con), results)),
|
||||
lambda results: filter(lambda t: hidden or not t.hidden, results),
|
||||
)
|
||||
result = list(result)[offset:offset + limit]
|
||||
logger.info(f"Search time: { int((time.time() - t0) * 1000) } ms")
|
||||
return sorted_temes
|
||||
return result
|
||||
|
||||
@@ -6,6 +6,14 @@ from folkugat_web.dal.sql.temes import write as temes_w
|
||||
from folkugat_web.model import temes as model
|
||||
|
||||
|
||||
def create_tema(title: str = "") -> model.Tema:
|
||||
return temes_w.insert_tema(tema=model.Tema(title=title))
|
||||
|
||||
|
||||
def delete_tema(tema_id: int) -> None:
|
||||
tema = temes_w.delete_tema(tema_id=tema_id)
|
||||
|
||||
|
||||
def update_title(tema_id: int, title: str) -> model.Tema:
|
||||
with get_connection() as con:
|
||||
tema = temes_q.get_tema_by_id(tema_id=tema_id, con=con)
|
||||
@@ -98,3 +106,53 @@ def add_link(tema_id: int) -> model.Tema:
|
||||
temes_w.update_tema(tema=tema, con=con)
|
||||
|
||||
return tema
|
||||
|
||||
|
||||
def update_property(tema_id: int, property_id: int, prop: model.Property) -> model.Tema:
|
||||
with get_connection() as con:
|
||||
tema = temes_q.get_tema_by_id(tema_id=tema_id, con=con)
|
||||
if tema is None:
|
||||
raise ValueError(f"No tune found with tema_id = {tema_id}!")
|
||||
|
||||
tema.properties.replace(property_id, prop)
|
||||
temes_w.update_tema(tema=tema, con=con)
|
||||
|
||||
return tema
|
||||
|
||||
|
||||
def delete_property(tema_id: int, property_id: int) -> model.Tema:
|
||||
with get_connection() as con:
|
||||
tema = temes_q.get_tema_by_id(tema_id=tema_id, con=con)
|
||||
if tema is None:
|
||||
raise ValueError(f"No tune found with tema_id = {tema_id}!")
|
||||
|
||||
tema.properties.delete(property_id)
|
||||
temes_w.update_tema(tema=tema, con=con)
|
||||
|
||||
return tema
|
||||
|
||||
|
||||
def add_property(tema_id: int) -> model.Tema:
|
||||
with get_connection() as con:
|
||||
tema = temes_q.get_tema_by_id(tema_id=tema_id, con=con)
|
||||
if tema is None:
|
||||
raise ValueError(f"No tune found with tema_id = {tema_id}!")
|
||||
tema.properties.append(model.Property(
|
||||
id=None,
|
||||
field=model.PropertyField.AUTOR,
|
||||
value="",
|
||||
))
|
||||
temes_w.update_tema(tema=tema, con=con)
|
||||
|
||||
return tema
|
||||
|
||||
|
||||
def set_visibility(tema_id: int, hidden: bool) -> model.Tema:
|
||||
with get_connection() as con:
|
||||
tema = temes_q.get_tema_by_id(tema_id=tema_id, con=con)
|
||||
if tema is None:
|
||||
raise ValueError(f"No tune found with tema_id = {tema_id}!")
|
||||
tema.hidden = hidden
|
||||
temes_w.update_tema(tema=tema, con=con)
|
||||
|
||||
return tema
|
||||
|
||||
Reference in New Issue
Block a user