Initial commit
This commit is contained in:
0
folkugat_web/services/__init__.py
Normal file
0
folkugat_web/services/__init__.py
Normal file
48
folkugat_web/services/auth.py
Normal file
48
folkugat_web/services/auth.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import datetime
|
||||
from typing import Annotated, Optional
|
||||
|
||||
import jwt
|
||||
from fastapi import Cookie, Depends, HTTPException
|
||||
from folkugat_web.config import auth as config
|
||||
from folkugat_web.log import logger
|
||||
|
||||
|
||||
def login(value):
|
||||
if value and value.lower() == config.ADMIN_PASSWORD:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def logged_in(nota_folkugat: Annotated[Optional[str], Cookie()] = None) -> bool:
|
||||
if not nota_folkugat:
|
||||
return False
|
||||
try:
|
||||
payload = jwt.decode(nota_folkugat, config.JWT_SECRET, algorithms=["HS256"])
|
||||
except Exception as exc:
|
||||
logger.error("Exception: ", exc)
|
||||
return False
|
||||
|
||||
return payload.get('user') == 'admin'
|
||||
|
||||
|
||||
LoggedIn = Annotated[bool, Depends(logged_in)]
|
||||
|
||||
|
||||
def require_login(logged_in: LoggedIn) -> bool:
|
||||
if not logged_in:
|
||||
raise HTTPException(status_code=403, detail="Must be logged in")
|
||||
return True
|
||||
|
||||
|
||||
RequireLogin = Annotated[bool, Depends(require_login)]
|
||||
|
||||
|
||||
def build_payload():
|
||||
return {
|
||||
'user': 'admin',
|
||||
'exp': datetime.datetime.now(tz=datetime.timezone.utc) + config.SESSION_DURATION
|
||||
}
|
||||
|
||||
|
||||
def build_token():
|
||||
return jwt.encode(build_payload(), config.JWT_SECRET, algorithm="HS256")
|
||||
19
folkugat_web/services/ngrams.py
Normal file
19
folkugat_web/services/ngrams.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import operator
|
||||
|
||||
from folkugat_web.config import search as config
|
||||
from folkugat_web.utils import groupby
|
||||
|
||||
|
||||
def get_all_ngrams(text):
|
||||
return [(m, text[i:i+m])
|
||||
for m in range(config.MIN_NGRAM_LENGTH, len(text) + 1)
|
||||
for i in range(len(text) - m + 1)
|
||||
if m > 0]
|
||||
|
||||
def get_text_ngrams(*texts):
|
||||
texts = [word.lower() for text in texts for word in text.split()]
|
||||
word_ngrams = [ngram for ngrams in map(get_all_ngrams, texts) for ngram in ngrams]
|
||||
result = dict(groupby(word_ngrams,
|
||||
key_fn=operator.itemgetter(0),
|
||||
group_fn=lambda gr: list(set(map(operator.itemgetter(1), gr)))))
|
||||
return result
|
||||
80
folkugat_web/services/sessions.py
Normal file
80
folkugat_web/services/sessions.py
Normal file
@@ -0,0 +1,80 @@
|
||||
import datetime
|
||||
from datetime import date as Date
|
||||
from typing import Optional
|
||||
|
||||
from folkugat_web.config import date as config
|
||||
from folkugat_web.dal.sql import sessions as dal
|
||||
from folkugat_web.model import sessions as model
|
||||
from folkugat_web.model.sql import Order, OrderCol, Range
|
||||
|
||||
|
||||
def get_date_names(date: Date) -> model.DateNames:
|
||||
return model.DateNames(
|
||||
day_name=config.DAY_NAMES_CAT[date.weekday()],
|
||||
day=str(date.day),
|
||||
month_name=config.MONTH_NAMES_CAT[date.month],
|
||||
year=str(date.year),
|
||||
)
|
||||
|
||||
|
||||
def new_session() -> model.Session:
|
||||
sessions = get_sessions()
|
||||
if not sessions:
|
||||
venue = model.SessionVenue()
|
||||
else:
|
||||
venue = sessions[-1].venue
|
||||
return model.Session(venue=venue)
|
||||
|
||||
|
||||
def get_sessions() -> list[model.Session]:
|
||||
return dal.get_sessions(order_by=OrderCol(model.SessionCols.DATE, Order.ASCENDING))
|
||||
|
||||
|
||||
def get_next_sessions(limit: Optional[int] = None, offset: Optional[int] = None) -> list[model.Session]:
|
||||
return dal.get_sessions(
|
||||
date_range=Range(gte=datetime.date.today()),
|
||||
order_by=OrderCol(model.SessionCols.DATE, Order.ASCENDING),
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
|
||||
def get_sessions_history(limit: Optional[int] = None, offset: Optional[int] = None) -> list[model.Session]:
|
||||
return dal.get_sessions(
|
||||
date_range=Range(lt=datetime.date.today()),
|
||||
order_by=OrderCol(model.SessionCols.DATE, Order.DESCENDING),
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
|
||||
def get_next_session() -> Optional[model.Session]:
|
||||
return next(iter(get_next_sessions(limit=1,)), None)
|
||||
|
||||
|
||||
def get_live_session() -> Optional[model.Session]:
|
||||
return next(iter(dal.get_sessions(is_live=True)), None)
|
||||
|
||||
|
||||
def get_session(session_id: int) -> Optional[model.Session]:
|
||||
return next(iter(dal.get_sessions(session_id=session_id)), None)
|
||||
|
||||
|
||||
def insert_session(session: model.Session):
|
||||
return dal.insert_session(session)
|
||||
|
||||
|
||||
def set_session(session: model.Session):
|
||||
dal.update_session(session)
|
||||
|
||||
|
||||
def delete_session(session_id: int):
|
||||
dal.delete_session_by_id(session_id)
|
||||
|
||||
|
||||
def stop_live_sessions():
|
||||
dal.stop_live_sessions()
|
||||
|
||||
|
||||
def set_live_session(session_id: int):
|
||||
dal.set_live_session(session_id=session_id)
|
||||
0
folkugat_web/services/temes/__init__.py
Normal file
0
folkugat_web/services/temes/__init__.py
Normal file
8
folkugat_web/services/temes/query.py
Normal file
8
folkugat_web/services/temes/query.py
Normal file
@@ -0,0 +1,8 @@
|
||||
from typing import Optional
|
||||
|
||||
from folkugat_web.dal.sql.temes import query as temes_q
|
||||
from folkugat_web.model import temes as model
|
||||
|
||||
|
||||
def get_tema_by_id(tema_id: int) -> Optional[model.Tema]:
|
||||
return temes_q.get_tema_by_id(tema_id)
|
||||
57
folkugat_web/services/temes/search.py
Normal file
57
folkugat_web/services/temes/search.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import time
|
||||
|
||||
import Levenshtein
|
||||
from folkugat_web.config import search as config
|
||||
from folkugat_web.dal.sql import get_connection
|
||||
from folkugat_web.dal.sql.temes import query as temes_q
|
||||
from folkugat_web.log import logger
|
||||
from folkugat_web.model import search as search_model
|
||||
from folkugat_web.model import temes as model
|
||||
|
||||
|
||||
def get_query_word_similarity(query_word: str, text_ngrams: model.NGrams) -> search_model.SearchMatch:
|
||||
n = len(query_word)
|
||||
if n < config.MIN_NGRAM_LENGTH:
|
||||
return search_model.SearchMatch(distance=0.0, ngram='')
|
||||
ns = filter(lambda i: i >= config.MIN_NGRAM_LENGTH, range(
|
||||
n - config.QUERY_NGRAM_RANGE, n + config.QUERY_NGRAM_RANGE + 1))
|
||||
candidate_ngrams = ((m, ngram)
|
||||
for m, ngrams in map(lambda i: (i, text_ngrams.get(i, [])), ns)
|
||||
for ngram in ngrams)
|
||||
return min(search_model.SearchMatch(distance=Levenshtein.distance(query_word, ngram)/m,
|
||||
ngram=ngram)
|
||||
for m, ngram in candidate_ngrams)
|
||||
|
||||
|
||||
def get_query_similarity(query: str, ngrams: model.NGrams) -> search_model.SearchMatch:
|
||||
query_words = query.lower().split()
|
||||
word_matches = map(lambda query_word: get_query_word_similarity(query_word, ngrams), query_words)
|
||||
return search_model.SearchMatch.combine_matches(word_matches)
|
||||
|
||||
|
||||
def build_result(query: str, entry: tuple[int, model.NGrams]) -> search_model.QueryResult:
|
||||
if len(query) == 0:
|
||||
return search_model.QueryResult(
|
||||
id=entry[0],
|
||||
distance=0,
|
||||
ngram="",
|
||||
)
|
||||
match = get_query_similarity(query, entry[1])
|
||||
return search_model.QueryResult(
|
||||
id=entry[0],
|
||||
distance=match.distance,
|
||||
ngram=match.ngram,
|
||||
)
|
||||
|
||||
|
||||
def busca_temes(query: str) -> list[model.Tema]:
|
||||
t0 = time.time()
|
||||
with get_connection() as con:
|
||||
tema_id_to_ngrams = temes_q.get_tema_id_to_ngrams(con)
|
||||
search_results = (build_result(query, entry) for entry in tema_id_to_ngrams.items())
|
||||
filtered_results = filter(lambda qr: qr.distance <= config.SEARCH_DISTANCE_THRESHOLD, search_results)
|
||||
# filtered_results = filter(lambda qr: True, search_results)
|
||||
sorted_results = sorted(filtered_results, key=lambda qr: qr.distance)
|
||||
sorted_temes = list(filter(None, map(lambda qr: temes_q.get_tema_by_id(qr.id, con), sorted_results)))
|
||||
logger.info(f"Search time: { int((time.time() - t0) * 1000) } ms")
|
||||
return sorted_temes
|
||||
62
folkugat_web/services/temes/write.py
Normal file
62
folkugat_web/services/temes/write.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import dataclasses
|
||||
|
||||
from folkugat_web.dal.sql import get_connection
|
||||
from folkugat_web.dal.sql.temes import query as temes_q
|
||||
from folkugat_web.dal.sql.temes import write as temes_w
|
||||
from folkugat_web.model import temes as model
|
||||
|
||||
|
||||
def update_title(tema_id: int, title: str) -> model.Tema:
|
||||
with get_connection() as con:
|
||||
tema = temes_q.get_tema_by_id(tema_id=tema_id, con=con)
|
||||
if tema is None:
|
||||
raise ValueError(f"No tune found with tema_id = {tema_id}!")
|
||||
|
||||
new_tema = dataclasses.replace(tema, title=title)
|
||||
new_tema.compute_ngrams()
|
||||
|
||||
temes_w.update_tema(tema=new_tema, con=con)
|
||||
|
||||
return new_tema
|
||||
|
||||
|
||||
def update_lyric(tema_id: int, lyric_idx: int, lyric: model.Lyrics) -> model.Tema:
|
||||
with get_connection() as con:
|
||||
tema = temes_q.get_tema_by_id(tema_id=tema_id, con=con)
|
||||
if tema is None:
|
||||
raise ValueError(f"No tune found with tema_id = {tema_id}!")
|
||||
if lyric_idx > len(tema.lyrics):
|
||||
raise ValueError(f'Lyric index out of bounds')
|
||||
|
||||
tema.lyrics[lyric_idx] = lyric
|
||||
temes_w.update_tema(tema=tema, con=con)
|
||||
|
||||
return tema
|
||||
|
||||
|
||||
def delete_lyric(tema_id: int, lyric_idx: int) -> model.Tema:
|
||||
with get_connection() as con:
|
||||
tema = temes_q.get_tema_by_id(tema_id=tema_id, con=con)
|
||||
if tema is None:
|
||||
raise ValueError(f"No tune found with tema_id = {tema_id}!")
|
||||
if lyric_idx > len(tema.lyrics):
|
||||
raise ValueError(f'Lyric index out of bounds')
|
||||
|
||||
del tema.lyrics[lyric_idx]
|
||||
temes_w.update_tema(tema=tema, con=con)
|
||||
|
||||
return tema
|
||||
|
||||
|
||||
def add_lyric(tema_id: int) -> model.Tema:
|
||||
with get_connection() as con:
|
||||
tema = temes_q.get_tema_by_id(tema_id=tema_id, con=con)
|
||||
if tema is None:
|
||||
raise ValueError(f"No tune found with tema_id = {tema_id}!")
|
||||
tema.lyrics.append(model.Lyrics(
|
||||
title=tema.title,
|
||||
content="",
|
||||
))
|
||||
temes_w.update_tema(tema=tema, con=con)
|
||||
|
||||
return tema
|
||||
Reference in New Issue
Block a user