diff --git a/flake.nix b/flake.nix index 8b0cb8b..55de1c8 100644 --- a/flake.nix +++ b/flake.nix @@ -49,10 +49,12 @@ # Python dependencies python pythonEnv - pyright + basedpyright black # Project dependencies nodePackages_latest.tailwindcss + # Project tools + sqlite ]; shellHook = '' diff --git a/folkugat_web/api/auth.py b/folkugat_web/api/auth.py index 523184c..28911cd 100644 --- a/folkugat_web/api/auth.py +++ b/folkugat_web/api/auth.py @@ -1,4 +1,4 @@ -from typing import Annotated, Optional +from typing import Annotated from fastapi import Cookie, Form, Request from folkugat_web.api import router @@ -8,13 +8,13 @@ from folkugat_web.services import auth as service @router.get("/api/nota") -def nota_input(request: Request, value: Optional[str] = None): +def nota_input(request: Request, value: str | None = None): return nota.input(request, value) @router.post("/api/nota") -def login(request: Request, value: Annotated[Optional[str], Form()] = None, - nota_folkugat: Annotated[Optional[str], Cookie()] = None): +def login(request: Request, value: Annotated[str | None, Form()] = None, + nota_folkugat: Annotated[str | None, Cookie()] = None): logged_in = service.logged_in(nota_folkugat) new_login = service.login(value) if new_login and not logged_in: diff --git a/folkugat_web/api/sessions/editor.py b/folkugat_web/api/sessions/editor.py index a6d8fef..c63e6c7 100644 --- a/folkugat_web/api/sessions/editor.py +++ b/folkugat_web/api/sessions/editor.py @@ -24,8 +24,8 @@ def modify_session( date: Annotated[datetime.date, Form()], start_time: Annotated[datetime.time, Form()], end_time: Annotated[datetime.time, Form()], - venue_name: Annotated[Optional[str], Form()] = None, - venue_url: Annotated[Optional[str], Form()] = None, + venue_name: Annotated[str | None, Form()] = None, + venue_url: Annotated[str | None, Form()] = None, ): session_date = sessions.model.Session(id=session_id, date=date, start_time=start_time, end_time=end_time, venue=sessions.model.SessionVenue(name=venue_name, url=venue_url)) diff --git a/folkugat_web/api/tema/index.py b/folkugat_web/api/tema/index.py index a79ec30..b6984d2 100644 --- a/folkugat_web/api/tema/index.py +++ b/folkugat_web/api/tema/index.py @@ -1,10 +1,9 @@ -from typing import Annotated, Optional +from typing import Annotated from fastapi import Request, UploadFile from fastapi.params import File, Form, Param from fastapi.responses import HTMLResponse from folkugat_web.api import router -from folkugat_web.config import db from folkugat_web.fragments import tema, temes from folkugat_web.model import temes as model from folkugat_web.services import auth, files @@ -78,7 +77,7 @@ def set_lyric( lyric: Annotated[str, Form()], ): new_lyric = model.Lyrics(id=lyric_id, title=title, content=lyric.strip()) - temes_w.update_lyric(tema_id=tema_id, lyric_id=lyric_id, lyric=new_lyric) + _ = temes_w.update_lyric(tema_id=tema_id, lyric_id=lyric_id, lyric=new_lyric) return tema.lyric(request=request, logged_in=logged_in, tema_id=tema_id, lyric_id=lyric_id) @@ -105,7 +104,7 @@ def delete_lyric( tema_id: int, lyric_id: int, ): - temes_w.delete_lyric(tema_id=tema_id, lyric_id=lyric_id) + _ = temes_w.delete_lyric(tema_id=tema_id, lyric_id=lyric_id) return HTMLResponse() @@ -121,9 +120,9 @@ async def set_link( tema_id: int, link_id: int, content_type: Annotated[model.ContentType, Form()], - title: Annotated[Optional[str], Form()] = None, - url: Annotated[Optional[str], Form()] = None, - upload_file: Annotated[Optional[UploadFile], File()] = None, + title: Annotated[str | None, Form()] = None, + url: Annotated[str | None, Form()] = None, + upload_file: Annotated[UploadFile | None, File()] = None, ): if upload_file: url = await files.store_file(tema_id=tema_id, upload_file=upload_file) @@ -136,7 +135,7 @@ async def set_link( url=url or '', title=title or '', ) - new_tema = temes_w.update_link(tema_id=tema_id, link_id=link_id, link=new_link) + _ = temes_w.update_link(tema_id=tema_id, link_id=link_id, link=new_link) return tema.link(request=request, logged_in=logged_in, tema_id=tema_id, link_id=link_id) @@ -164,7 +163,7 @@ def delete_link( tema_id: int, link_id: int, ): - temes_w.delete_link(tema_id=tema_id, link_id=link_id) + _tema = temes_w.delete_link(tema_id=tema_id, link_id=link_id) return HTMLResponse( headers={ "HX-Trigger": f"reload-tema-{tema_id}-score" @@ -219,7 +218,7 @@ def set_property( value: Annotated[str, Form()], ): new_property = model.Property(id=property_id, field=field, value=value.strip()) - temes_w.update_property(tema_id=tema_id, property_id=property_id, prop=new_property) + _ = temes_w.update_property(tema_id=tema_id, property_id=property_id, prop=new_property) return tema.property_(request=request, logged_in=logged_in, tema_id=tema_id, property_id=property_id) @@ -243,7 +242,7 @@ def add_property( @router.delete("/api/tema/{tema_id}/property/{property_id}") def delete_property(_: auth.RequireLogin, tema_id: int, property_id: int): - temes_w.delete_property(tema_id=tema_id, property_id=property_id) + _tema = temes_w.delete_property(tema_id=tema_id, property_id=property_id) return HTMLResponse() diff --git a/folkugat_web/api/temes/index.py b/folkugat_web/api/temes/index.py index cd5183e..ce37fe5 100644 --- a/folkugat_web/api/temes/index.py +++ b/folkugat_web/api/temes/index.py @@ -1,4 +1,4 @@ -from typing import Annotated, Optional +from typing import Annotated from fastapi import Request from fastapi.params import Param diff --git a/folkugat_web/dal/sql/__init__.py b/folkugat_web/dal/sql/__init__.py index 3960367..b77bf36 100644 --- a/folkugat_web/dal/sql/__init__.py +++ b/folkugat_web/dal/sql/__init__.py @@ -1 +1,3 @@ -from ._connection import get_connection, Connection +from ._connection import Connection, get_connection + +__all__ = ["get_connection", "Connection"] diff --git a/folkugat_web/dal/sql/_connection.py b/folkugat_web/dal/sql/_connection.py index 3038d09..d546ead 100644 --- a/folkugat_web/dal/sql/_connection.py +++ b/folkugat_web/dal/sql/_connection.py @@ -8,7 +8,7 @@ Connection = sqlite3.Connection @contextmanager -def get_connection(con: Optional[Connection] = None) -> Iterator[Connection]: +def get_connection(con: Connection | None = None) -> Iterator[Connection]: if con: yield con else: diff --git a/folkugat_web/dal/sql/ddl.py b/folkugat_web/dal/sql/ddl.py index e68475c..50acfd7 100644 --- a/folkugat_web/dal/sql/ddl.py +++ b/folkugat_web/dal/sql/ddl.py @@ -1,10 +1,11 @@ -from folkugat_web.dal.sql import get_connection, sessions +from folkugat_web.dal.sql import get_connection from folkugat_web.dal.sql.playlists import ddl as playlists_ddl +from folkugat_web.dal.sql.sessions import ddl as sessions_ddl from folkugat_web.dal.sql.temes import ddl as temes_ddl def create_db(): with get_connection() as con: - sessions.create_db(con) + sessions_ddl.create_db(con) temes_ddl.create_db(con) playlists_ddl.create_db(con) diff --git a/folkugat_web/dal/sql/playlists/_conversion.py b/folkugat_web/dal/sql/playlists/conversion.py similarity index 50% rename from folkugat_web/dal/sql/playlists/_conversion.py rename to folkugat_web/dal/sql/playlists/conversion.py index 787d4d3..663c6af 100644 --- a/folkugat_web/dal/sql/playlists/_conversion.py +++ b/folkugat_web/dal/sql/playlists/conversion.py @@ -1,7 +1,18 @@ +from typing import TypedDict + from folkugat_web.model import playlists as model +PlaylistRowTuple = tuple[int, int, int, int | None] -def playlist_entry_to_row(tema_in_set: model.PlaylistEntry) -> dict: + +class PlaylistRowDict(TypedDict): + id: int | None + session_id: int + set_id: int + tema_id: int | None + + +def playlist_entry_to_row(tema_in_set: model.PlaylistEntry) -> PlaylistRowDict: return { 'id': tema_in_set.id, 'session_id': tema_in_set.session_id, @@ -10,7 +21,7 @@ def playlist_entry_to_row(tema_in_set: model.PlaylistEntry) -> dict: } -def row_to_playlist_entry(row: tuple) -> model.PlaylistEntry: +def row_to_playlist_entry(row: PlaylistRowTuple) -> model.PlaylistEntry: return model.PlaylistEntry( id=row[0], session_id=row[1], diff --git a/folkugat_web/dal/sql/playlists/ddl.py b/folkugat_web/dal/sql/playlists/ddl.py index 3c5efeb..e641f4c 100644 --- a/folkugat_web/dal/sql/playlists/ddl.py +++ b/folkugat_web/dal/sql/playlists/ddl.py @@ -1,9 +1,7 @@ -from typing import Optional - from folkugat_web.dal.sql import Connection, get_connection -def create_db(con: Optional[Connection] = None): +def create_db(con: Connection | None = None): with get_connection(con) as con: create_playlists_table(con) @@ -11,7 +9,7 @@ def create_db(con: Optional[Connection] = None): def drop_playlists_table(con: Connection): query = "DROP TABLE IF EXISTS playlists" cur = con.cursor() - cur.execute(query) + _ = cur.execute(query) def create_playlists_table(con: Connection): @@ -24,4 +22,4 @@ def create_playlists_table(con: Connection): ) """ cur = con.cursor() - cur.execute(query) + _ = cur.execute(query) diff --git a/folkugat_web/dal/sql/playlists/query.py b/folkugat_web/dal/sql/playlists/query.py index 0c8bbdd..f8562d7 100644 --- a/folkugat_web/dal/sql/playlists/query.py +++ b/folkugat_web/dal/sql/playlists/query.py @@ -1,19 +1,27 @@ from collections.abc import Iterator -from typing import Optional +from typing import TypedDict from folkugat_web.dal.sql import Connection, get_connection +from folkugat_web.dal.sql.sessions import conversion as sessions_conversion from folkugat_web.model import playlists as model +from folkugat_web.model.sessions import Session -from ._conversion import row_to_playlist_entry +from . import conversion + + +class QueryData(TypedDict, total=False): + id: int + set_id: int + session_id: int def _filter_clause( - entry_id: Optional[int] = None, - set_id: Optional[int] = None, - session_id: Optional[int] = None, -) -> tuple[str, dict]: - filter_clauses = [] - query_data = {} + entry_id: int | None = None, + set_id: int | None = None, + session_id: int | None = None, +) -> tuple[str, QueryData]: + filter_clauses: list[str] = [] + query_data: QueryData = {} if entry_id is not None: filter_clauses.append("id = :id") @@ -29,10 +37,10 @@ def _filter_clause( def get_playlist_entries( - entry_id: Optional[int] = None, - set_id: Optional[int] = None, - session_id: Optional[int] = None, - con: Optional[Connection] = None, + entry_id: int | None = None, + set_id: int | None = None, + session_id: int | None = None, + con: Connection | None = None, ) -> Iterator[model.PlaylistEntry]: filter_clause, data = _filter_clause(entry_id=entry_id, set_id=set_id, session_id=session_id) query = f""" @@ -43,5 +51,18 @@ def get_playlist_entries( """ with get_connection(con) as con: cur = con.cursor() - cur.execute(query, data) - return map(row_to_playlist_entry, cur.fetchall()) + _ = cur.execute(query, data) + return map(conversion.row_to_playlist_entry, cur.fetchall()) + + +def get_tune_sessions(tema_ids: list[int], con: Connection | None = None) -> Iterator[Session]: + query = """ + SELECT p.tema_id, s.id, s.date, s.start_time, s.end_time, s.venue_name, s.venue_url, s.is_live + FROM playlists p JOIN sessions s ON p.session_id = s.id + WHERE p.tema_id IN :tema_ids + """ + data = dict(tema_ids=tuple(tema_ids)) + with get_connection(con) as con: + cur = con.cursor() + _ = cur.execute(query, data) + return map(sessions_conversion.row_to_session, cur.fetchall()) diff --git a/folkugat_web/dal/sql/playlists/write.py b/folkugat_web/dal/sql/playlists/write.py index 84fcc47..e8135a6 100644 --- a/folkugat_web/dal/sql/playlists/write.py +++ b/folkugat_web/dal/sql/playlists/write.py @@ -1,12 +1,10 @@ -from typing import Optional - from folkugat_web.dal.sql import Connection, get_connection from folkugat_web.model import playlists as model -from ._conversion import playlist_entry_to_row, row_to_playlist_entry +from . import conversion -def insert_playlist_entry(pl_entry: model.PlaylistEntry, con: Optional[Connection] = None) -> model.PlaylistEntry: +def insert_playlist_entry(pl_entry: model.PlaylistEntry, con: Connection | None = None) -> model.PlaylistEntry: query = """ INSERT INTO playlists (id, session_id, set_id, tema_id) @@ -14,15 +12,15 @@ def insert_playlist_entry(pl_entry: model.PlaylistEntry, con: Optional[Connectio (:id, :session_id, :set_id, :tema_id) RETURNING * """ - data = playlist_entry_to_row(pl_entry) + data = conversion.playlist_entry_to_row(pl_entry) with get_connection(con) as con: cur = con.cursor() - cur.execute(query, data) - row = cur.fetchone() - return row_to_playlist_entry(row) + _ = cur.execute(query, data) + row: conversion.PlaylistRowTuple = cur.fetchone() + return conversion.row_to_playlist_entry(row) -def update_playlist_entry(entry: model.PlaylistEntry, con: Optional[Connection] = None): +def update_playlist_entry(entry: model.PlaylistEntry, con: Connection | None = None): query = """ UPDATE playlists SET @@ -30,14 +28,14 @@ def update_playlist_entry(entry: model.PlaylistEntry, con: Optional[Connection] WHERE id = :id """ - data = playlist_entry_to_row(entry) + data = conversion.playlist_entry_to_row(entry) with get_connection(con) as con: cur = con.cursor() - cur.execute(query, data) + _ = cur.execute(query, data) return -def delete_playlist_entry(entry_id: int, con: Optional[Connection] = None): +def delete_playlist_entry(entry_id: int, con: Connection | None = None): query = """ DELETE FROM playlists WHERE id = :id @@ -45,11 +43,11 @@ def delete_playlist_entry(entry_id: int, con: Optional[Connection] = None): data = dict(id=entry_id) with get_connection(con) as con: cur = con.cursor() - cur.execute(query, data) + _ = cur.execute(query, data) return -def delete_playlist_set(session_id: int, set_id: int, con: Optional[Connection] = None): +def delete_playlist_set(session_id: int, set_id: int, con: Connection | None = None): query = """ DELETE FROM playlists WHERE session_id = :session_id AND set_id = :set_id @@ -57,5 +55,5 @@ def delete_playlist_set(session_id: int, set_id: int, con: Optional[Connection] data = dict(session_id=session_id, set_id=set_id) with get_connection(con) as con: cur = con.cursor() - cur.execute(query, data) + _ = cur.execute(query, data) return diff --git a/folkugat_web/dal/sql/sessions.py b/folkugat_web/dal/sql/sessions.py deleted file mode 100644 index 55a9eae..0000000 --- a/folkugat_web/dal/sql/sessions.py +++ /dev/null @@ -1,194 +0,0 @@ -import datetime -from typing import Optional - -from folkugat_web.dal.sql import Connection, get_connection -from folkugat_web.model import sessions as model -from folkugat_web.model.sql import OrderCol, Range -from folkugat_web.typing import OptionalListOrValue as OLV - - -def create_db(con: Optional[Connection] = None): - with get_connection(con) as con: - create_sessions_table(con) - - -def drop_sessions_table(con: Connection): - query = "DROP TABLE IF EXISTS sessions" - cur = con.cursor() - cur.execute(query) - - -def create_sessions_table(con: Connection): - query = """ - CREATE TABLE IF NOT EXISTS sessions ( - id INTEGER PRIMARY KEY, - date TEXT NOT NULL, - start_time TEXT NOT NULL, - end_time TEXT NOT NULL, - venue_name TEXT, - venue_url TEXT, - is_live BOOLEAN DEFAULT false - ) - """ - cur = con.cursor() - cur.execute(query) - - -def _session_to_row(sessio: model.Session) -> dict: - return { - 'id': sessio.id, - 'date': sessio.date, - 'start_time': sessio.start_time.isoformat(), - 'end_time': sessio.end_time.isoformat(), - 'venue_name': sessio.venue.name, - 'venue_url': sessio.venue.url, - 'is_live': sessio.is_live, - } - - -def _row_to_session(row: tuple) -> model.Session: - return model.Session( - id=row[0], - date=datetime.date.fromisoformat(row[1]), - start_time=datetime.time.fromisoformat(row[2]), - end_time=datetime.time.fromisoformat(row[3]), - venue=model.SessionVenue( - name=row[4], - url=row[5], - ), - is_live=row[6], - ) - - -def insert_session(session: model.Session, con: Optional[Connection] = None): - query = """ - INSERT INTO sessions - (id, date, start_time, end_time, venue_name, venue_url, is_live) - VALUES - (:id, :date, :start_time, :end_time, :venue_name, :venue_url, :is_live) - RETURNING * - """ - data = _session_to_row(session) - with get_connection(con) as con: - cur = con.cursor() - cur.execute(query, data) - row = cur.fetchone() - return _row_to_session(row) - - -def update_session(session: model.Session, con: Optional[Connection] = None): - query = """ - UPDATE sessions SET - date = :date, start_time = :start_time, end_time = :end_time, - venue_name = :venue_name, venue_url = :venue_url, is_live = :is_live - WHERE id = :id - """ - data = _session_to_row(session) - with get_connection(con) as con: - cur = con.cursor() - cur.execute(query, data) - - -def _filter_clause(session_id: Optional[int] = None, - date_range: Optional[Range[datetime.date]] = None, - is_live: Optional[bool] = None) -> tuple[str, dict]: - filter_clauses = [] - filter_data = {} - - if session_id is not None: - filter_clauses.append(f"id = :session_id") - filter_data["session_id"] = session_id - - if date_range: - if ub := date_range.upper_bound(): - operator = "<=" if ub[1] else "<" - filter_clauses.append(f"date {operator} :date_ub") - filter_data["date_ub"] = ub[0] - if lb := date_range.lower_bound(): - operator = ">=" if lb[1] else ">" - filter_clauses.append(f"date {operator} :date_lb") - filter_data["date_lb"] = lb[0] - - if is_live is not None: - filter_clauses.append(f"is_live = :is_live") - filter_data["is_live"] = is_live - - if filter_clauses: - filter_clause_str = " AND ".join(filter_clauses) - filter_clause = f"WHERE {filter_clause_str}" - return filter_clause, filter_data - else: - return "", {} - - -def _order_clause(order_by: OLV[OrderCol[model.SessionCols]]) -> str: - if not order_by: - return "" - if not isinstance(order_by, list): - order_by = [order_by] - - order_clauses = [f"{ocol.column.value} {ocol.order.value}" for ocol in order_by] - order_clauses_str = " ".join(order_clauses) - return f"ORDER BY {order_clauses_str}" - - -def get_sessions(session_id: Optional[int] = None, - date_range: Optional[Range[datetime.date]] = None, - is_live: Optional[bool] = None, - order_by: OLV[OrderCol[model.SessionCols]] = None, - limit: Optional[int] = None, offset: Optional[int] = None, - con: Optional[Connection] = None) -> list[model.Session]: - clauses = [] - filter_clause, data = _filter_clause(session_id=session_id, date_range=date_range, is_live=is_live) - if filter_clause: - clauses.append(filter_clause) - if order_clause := _order_clause(order_by=order_by): - clauses.append(order_clause) - if limit is not None: - clauses.append("LIMIT :limit") - data["limit"] = limit - if offset is not None: - clauses.append("OFFSET :offset") - data["offset"] = offset - - clauses_str = " ".join(clauses) - query = f""" - SELECT id, date, start_time, end_time, venue_name, venue_url, is_live - FROM sessions - {clauses_str} - """ - with get_connection(con) as con: - cur = con.cursor() - cur.execute(query, data) - return list(map(_row_to_session, cur.fetchall())) - - -def delete_session_by_id(session_id: int, con: Optional[Connection] = None): - query = """ - DELETE FROM sessions - WHERE id = :id - """ - data = dict(id=session_id) - with get_connection(con) as con: - cur = con.cursor() - cur.execute(query, data) - - -def stop_live_sessions(con: Optional[Connection] = None): - query = """ - UPDATE sessions SET is_live = false WHERE is_live = true - """ - with get_connection(con) as con: - cur = con.cursor() - cur.execute(query) - - -def set_live_session(session_id: int, con: Optional[Connection] = None): - query = """ - UPDATE sessions SET is_live = true WHERE id = :id - """ - data = dict(id=session_id) - with get_connection(con) as con: - stop_live_sessions(con=con) - cur = con.cursor() - cur.execute(query, data) diff --git a/folkugat_web/dal/sql/sessions/__init__.py b/folkugat_web/dal/sql/sessions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/folkugat_web/dal/sql/sessions/conversion.py b/folkugat_web/dal/sql/sessions/conversion.py new file mode 100644 index 0000000..b66bef6 --- /dev/null +++ b/folkugat_web/dal/sql/sessions/conversion.py @@ -0,0 +1,42 @@ +import datetime +from typing import TypedDict + +from folkugat_web.model import sessions as model + +SessionRowTuple = tuple[int, str, str, str, str | None, str | None, bool] + + +class SessionRowDict(TypedDict): + id: int | None + date: str + start_time: str + end_time: str + venue_name: str | None + venue_url: str | None + is_live: bool + + +def session_to_row(sessio: model.Session) -> SessionRowDict: + return { + 'id': sessio.id, + 'date': sessio.date.isoformat(), + 'start_time': sessio.start_time.isoformat(), + 'end_time': sessio.end_time.isoformat(), + 'venue_name': sessio.venue.name, + 'venue_url': sessio.venue.url, + 'is_live': sessio.is_live, + } + + +def row_to_session(row: SessionRowTuple) -> model.Session: + return model.Session( + id=row[0], + date=datetime.date.fromisoformat(row[1]), + start_time=datetime.time.fromisoformat(row[2]), + end_time=datetime.time.fromisoformat(row[3]), + venue=model.SessionVenue( + name=row[4], + url=row[5], + ), + is_live=row[6], + ) diff --git a/folkugat_web/dal/sql/sessions/ddl.py b/folkugat_web/dal/sql/sessions/ddl.py new file mode 100644 index 0000000..800506f --- /dev/null +++ b/folkugat_web/dal/sql/sessions/ddl.py @@ -0,0 +1,28 @@ +from folkugat_web.dal.sql import Connection, get_connection + + +def create_db(con: Connection | None = None): + with get_connection(con) as con: + create_sessions_table(con) + + +def drop_sessions_table(con: Connection): + query = "DROP TABLE IF EXISTS sessions" + cur = con.cursor() + _ = cur.execute(query) + + +def create_sessions_table(con: Connection): + query = """ + CREATE TABLE IF NOT EXISTS sessions ( + id INTEGER PRIMARY KEY, + date TEXT NOT NULL, + start_time TEXT NOT NULL, + end_time TEXT NOT NULL, + venue_name TEXT, + venue_url TEXT, + is_live BOOLEAN DEFAULT false + ) + """ + cur = con.cursor() + _ = cur.execute(query) diff --git a/folkugat_web/dal/sql/sessions/query.py b/folkugat_web/dal/sql/sessions/query.py new file mode 100644 index 0000000..b91a9d2 --- /dev/null +++ b/folkugat_web/dal/sql/sessions/query.py @@ -0,0 +1,92 @@ +import datetime +from typing import TypedDict + +from folkugat_web.dal.sql import Connection, get_connection +from folkugat_web.model import sessions as model +from folkugat_web.model.sql import OrderCol, Range +from folkugat_web.typing import OptionalListOrValue as OLV + +from . import conversion + + +class QueryData(TypedDict, total=False): + session_id: int | None + date_ub: datetime.date + date_lb: datetime.date + is_live: bool + limit: int + offset: int + + +def _filter_clause(session_id: int | None = None, + date_range: Range[datetime.date] | None = None, + is_live: bool | None = None) -> tuple[str, QueryData]: + filter_clauses: list[str] = [] + filter_data: QueryData = {} + + if session_id is not None: + filter_clauses.append(f"id = :session_id") + filter_data["session_id"] = session_id + + if date_range: + if ub := date_range.upper_bound(): + operator = "<=" if ub[1] else "<" + filter_clauses.append(f"date {operator} :date_ub") + filter_data["date_ub"] = ub[0] + if lb := date_range.lower_bound(): + operator = ">=" if lb[1] else ">" + filter_clauses.append(f"date {operator} :date_lb") + filter_data["date_lb"] = lb[0] + + if is_live is not None: + filter_clauses.append(f"is_live = :is_live") + filter_data["is_live"] = is_live + + if filter_clauses: + filter_clause_str = " AND ".join(filter_clauses) + filter_clause = f"WHERE {filter_clause_str}" + return filter_clause, filter_data + else: + return "", {} + + +def _order_clause(order_by: OLV[OrderCol[model.SessionCols]]) -> str: + if not order_by: + return "" + if not isinstance(order_by, list): + order_by = [order_by] + + order_clauses = [f"{ocol.column.value} {ocol.order.value}" for ocol in order_by] + order_clauses_str = " ".join(order_clauses) + return f"ORDER BY {order_clauses_str}" + + +def get_sessions(session_id: int | None = None, + date_range: Range[datetime.date] | None = None, + is_live: bool | None = None, + order_by: OLV[OrderCol[model.SessionCols]] = None, + limit: int | None = None, offset: int | None = None, + con: Connection | None = None) -> list[model.Session]: + clauses: list[str] = [] + filter_clause, data = _filter_clause(session_id=session_id, date_range=date_range, is_live=is_live) + if filter_clause: + clauses.append(filter_clause) + if order_clause := _order_clause(order_by=order_by): + clauses.append(order_clause) + if limit is not None: + clauses.append("LIMIT :limit") + data["limit"] = limit + if offset is not None: + clauses.append("OFFSET :offset") + data["offset"] = offset + + clauses_str = " ".join(clauses) + query = f""" + SELECT id, date, start_time, end_time, venue_name, venue_url, is_live + FROM sessions + {clauses_str} + """ + with get_connection(con) as con: + cur = con.cursor() + _ = cur.execute(query, data) + return list(map(conversion.row_to_session, cur.fetchall())) diff --git a/folkugat_web/dal/sql/sessions/write.py b/folkugat_web/dal/sql/sessions/write.py new file mode 100644 index 0000000..2f2015f --- /dev/null +++ b/folkugat_web/dal/sql/sessions/write.py @@ -0,0 +1,64 @@ +from folkugat_web.dal.sql import Connection, get_connection +from folkugat_web.model import sessions as model + +from . import conversion + + +def insert_session(session: model.Session, con: Connection | None = None): + query = """ + INSERT INTO sessions + (id, date, start_time, end_time, venue_name, venue_url, is_live) + VALUES + (:id, :date, :start_time, :end_time, :venue_name, :venue_url, :is_live) + RETURNING * + """ + data = conversion.session_to_row(session) + with get_connection(con) as con: + cur = con.cursor() + _ = cur.execute(query, data) + row: conversion.SessionRowTuple = cur.fetchone() + return conversion.row_to_session(row) + + +def update_session(session: model.Session, con: Connection | None = None): + query = """ + UPDATE sessions SET + date = :date, start_time = :start_time, end_time = :end_time, + venue_name = :venue_name, venue_url = :venue_url, is_live = :is_live + WHERE id = :id + """ + data = conversion.session_to_row(session) + with get_connection(con) as con: + cur = con.cursor() + _ = cur.execute(query, data) + + +def delete_session_by_id(session_id: int, con: Connection | None = None): + query = """ + DELETE FROM sessions + WHERE id = :id + """ + data = dict(id=session_id) + with get_connection(con) as con: + cur = con.cursor() + _ = cur.execute(query, data) + + +def stop_live_sessions(con: Connection | None = None): + query = """ + UPDATE sessions SET is_live = false WHERE is_live = true + """ + with get_connection(con) as con: + cur = con.cursor() + _ = cur.execute(query) + + +def set_live_session(session_id: int, con: Connection | None = None): + query = """ + UPDATE sessions SET is_live = true WHERE id = :id + """ + data = dict(id=session_id) + with get_connection(con) as con: + stop_live_sessions(con=con) + cur = con.cursor() + _ = cur.execute(query, data) diff --git a/folkugat_web/dal/sql/temes/_conversion.py b/folkugat_web/dal/sql/temes/conversion.py similarity index 71% rename from folkugat_web/dal/sql/temes/_conversion.py rename to folkugat_web/dal/sql/temes/conversion.py index fa3f502..89c713c 100644 --- a/folkugat_web/dal/sql/temes/_conversion.py +++ b/folkugat_web/dal/sql/temes/conversion.py @@ -1,11 +1,28 @@ import datetime import json +from typing import TypedDict from folkugat_web.model import IndexedList +from folkugat_web.model import search as search_model from folkugat_web.model import temes as model +TemaRowTuple = tuple[int, str, str, str, str, str, str, str, str, int] -def tema_to_row(tema: model.Tema) -> dict: + +class TemaRowDict(TypedDict): + id: int | None + title: str + properties: str + links: str + lyrics: str + alternatives: str + ngrams: str + modification_date: str + creation_date: str + hidden: int + + +def tema_to_row(tema: model.Tema) -> TemaRowDict: return { 'id': tema.id, 'title': tema.title, @@ -20,11 +37,11 @@ def tema_to_row(tema: model.Tema) -> dict: } -def cell_to_ngrams(cell: str) -> model.NGrams: +def cell_to_ngrams(cell: str) -> search_model.NGrams: return {int(n): ngrams_ for n, ngrams_ in json.loads(cell).items()} -def row_to_tema(row: tuple) -> model.Tema: +def row_to_tema(row: TemaRowTuple) -> model.Tema: return model.Tema( id=row[0], title=row[1], diff --git a/folkugat_web/dal/sql/temes/ddl.py b/folkugat_web/dal/sql/temes/ddl.py index f30b9e5..5de2754 100644 --- a/folkugat_web/dal/sql/temes/ddl.py +++ b/folkugat_web/dal/sql/temes/ddl.py @@ -1,23 +1,15 @@ -from typing import Optional - -from folkugat_web import data from folkugat_web.dal.sql import Connection, get_connection -from .write import insert_tema - -def create_db(con: Optional[Connection] = None): +def create_db(con: Connection | None = None): with get_connection(con) as con: create_temes_table(con) - # for tema in data.TEMES: - # insert_tema(tema, con) - def drop_temes_table(con: Connection): query = "DROP TABLE IF EXISTS temes" cur = con.cursor() - cur.execute(query) + _ = cur.execute(query) def create_temes_table(con: Connection): @@ -36,4 +28,4 @@ def create_temes_table(con: Connection): ) """ cur = con.cursor() - cur.execute(query) + _ = cur.execute(query) diff --git a/folkugat_web/dal/sql/temes/query.py b/folkugat_web/dal/sql/temes/query.py index 44b2470..1af93f5 100644 --- a/folkugat_web/dal/sql/temes/query.py +++ b/folkugat_web/dal/sql/temes/query.py @@ -1,14 +1,13 @@ -from typing import Optional - from folkugat_web.dal.sql import Connection, get_connection +from folkugat_web.model import search as search_model from folkugat_web.model import temes as model -from ._conversion import cell_to_ngrams, row_to_tema +from . import conversion -TEMA_ID_TO_NGRAMS_CACHE = None +_tema_id_to_ngrams_cache: dict[int, search_model.NGrams] | None = None -def get_tema_by_id(tema_id: int, con: Optional[Connection] = None) -> Optional[model.Tema]: +def get_tema_by_id(tema_id: int, con: Connection | None = None) -> model.Tema | None: query = """ SELECT id, title, properties, links, lyrics, alternatives, ngrams, @@ -19,30 +18,30 @@ def get_tema_by_id(tema_id: int, con: Optional[Connection] = None) -> Optional[m data = dict(id=tema_id) with get_connection(con) as con: cur = con.cursor() - cur.execute(query, data) - row = cur.fetchone() - return row_to_tema(row) if row else None + _ = cur.execute(query, data) + row: conversion.TemaRowTuple = cur.fetchone() + return conversion.row_to_tema(row) if row else None def evict_tema_id_to_ngrams_cache(): - global TEMA_ID_TO_NGRAMS_CACHE - TEMA_ID_TO_NGRAMS_CACHE = None + global _tema_id_to_ngrams_cache + _tema_id_to_ngrams_cache = None -def get_tema_id_to_ngrams(con: Optional[Connection] = None) -> dict[int, model.NGrams]: - global TEMA_ID_TO_NGRAMS_CACHE - if TEMA_ID_TO_NGRAMS_CACHE is None: - TEMA_ID_TO_NGRAMS_CACHE = _get_tema_id_to_ngrams(con) - return TEMA_ID_TO_NGRAMS_CACHE +def get_tema_id_to_ngrams(con: Connection | None = None) -> dict[int, search_model.NGrams]: + global _tema_id_to_ngrams_cache + if _tema_id_to_ngrams_cache is None: + _tema_id_to_ngrams_cache = _get_tema_id_to_ngrams(con) + return _tema_id_to_ngrams_cache -def _get_tema_id_to_ngrams(con: Optional[Connection] = None) -> dict[int, model.NGrams]: +def _get_tema_id_to_ngrams(con: Connection | None = None) -> dict[int, search_model.NGrams]: query = """ SELECT id, ngrams FROM temes """ with get_connection(con) as con: cur = con.cursor() - cur.execute(query) - rows = cur.fetchall() - return {id_: cell_to_ngrams(ng) for id_, ng in rows} + _ = cur.execute(query) + rows: list[tuple[int, str]] = cur.fetchall() + return {id_: conversion.cell_to_ngrams(ng) for id_, ng in rows} diff --git a/folkugat_web/dal/sql/temes/write.py b/folkugat_web/dal/sql/temes/write.py index 80157d4..b7f33cd 100644 --- a/folkugat_web/dal/sql/temes/write.py +++ b/folkugat_web/dal/sql/temes/write.py @@ -1,13 +1,11 @@ -from typing import Optional - from folkugat_web.dal.sql import Connection, get_connection from folkugat_web.model import temes as model -from ._conversion import row_to_tema, tema_to_row +from . import conversion from .query import evict_tema_id_to_ngrams_cache -def insert_tema(tema: model.Tema, con: Optional[Connection] = None) -> model.Tema: +def insert_tema(tema: model.Tema, con: Connection | None = None) -> model.Tema: query = """ INSERT INTO temes (id, title, properties, links, lyrics, alternatives, ngrams, @@ -17,16 +15,16 @@ def insert_tema(tema: model.Tema, con: Optional[Connection] = None) -> model.Tem :creation_date, :modification_date, :hidden) RETURNING * """ - data = tema_to_row(tema) + data = conversion.tema_to_row(tema) with get_connection(con) as con: cur = con.cursor() - cur.execute(query, data) - row = cur.fetchone() + _ = cur.execute(query, data) + row: conversion.TemaRowTuple = cur.fetchone() evict_tema_id_to_ngrams_cache() - return row_to_tema(row) + return conversion.row_to_tema(row) -def update_tema(tema: model.Tema, con: Optional[Connection] = None): +def update_tema(tema: model.Tema, con: Connection | None = None): query = """ UPDATE temes SET @@ -36,15 +34,15 @@ def update_tema(tema: model.Tema, con: Optional[Connection] = None): WHERE id = :id """ - data = tema_to_row(tema) + data = conversion.tema_to_row(tema) with get_connection(con) as con: cur = con.cursor() - cur.execute(query, data) + _ = cur.execute(query, data) evict_tema_id_to_ngrams_cache() return -def delete_tema(tema_id: int, con: Optional[Connection] = None): +def delete_tema(tema_id: int, con: Connection | None = None): query = """ DELETE FROM temes WHERE id = :id @@ -52,6 +50,6 @@ def delete_tema(tema_id: int, con: Optional[Connection] = None): data = dict(id=tema_id) with get_connection(con) as con: cur = con.cursor() - cur.execute(query, data) + _ = cur.execute(query, data) evict_tema_id_to_ngrams_cache() return diff --git a/folkugat_web/fragments/nota.py b/folkugat_web/fragments/nota.py index 7c7ed22..6bc6c6a 100644 --- a/folkugat_web/fragments/nota.py +++ b/folkugat_web/fragments/nota.py @@ -1,8 +1,9 @@ +from fastapi import Request from folkugat_web.config import nota as config from folkugat_web.templates import templates -def input(request, value=None): +def input(request: Request, value: str | None = None): return templates.TemplateResponse( "fragments/nota/input.html", { @@ -12,7 +13,7 @@ def input(request, value=None): ) -def footer(request, value, logged_in): +def footer(request: Request, value: str | None, logged_in: bool): response = templates.TemplateResponse( "fragments/nota/footer.html", { @@ -24,7 +25,7 @@ def footer(request, value, logged_in): return response -def nota(request): +def nota(request: Request): return templates.TemplateResponse( "fragments/nota/nota.html", { diff --git a/folkugat_web/fragments/sessio.py b/folkugat_web/fragments/sessio.py index 06efcdf..f2de843 100644 --- a/folkugat_web/fragments/sessio.py +++ b/folkugat_web/fragments/sessio.py @@ -147,7 +147,7 @@ def busca_tema( ) -def set_tema(request: Request, logged_in: bool, session_id: int, set_id: int, entry_id: int, tema_id: Optional[int]): +def set_tema(request: Request, logged_in: bool, session_id: int, set_id: int, entry_id: int, tema_id: int | None): playlists_service.set_tema(session_id=session_id, set_id=set_id, entry_id=entry_id, tema_id=tema_id) tema_entry = playlists_service.get_tema(entry_id=entry_id) playlists_service.add_tema_to_tema_in_set(tema_entry) diff --git a/folkugat_web/fragments/sessions.py b/folkugat_web/fragments/sessions.py index 1f349cf..f8cf5a0 100644 --- a/folkugat_web/fragments/sessions.py +++ b/folkugat_web/fragments/sessions.py @@ -59,7 +59,7 @@ def sessions_historial(request: Request, limit: int, logged_in: bool): def _sessions_list(request: Request, sessions: list[model.Session], has_more_sessions: bool, paging_config: config.PagingConfig, list_id: str, get_sessions_url: str, - limit: int, logged_in: bool, next_session_id: Optional[int] = None): + limit: int, logged_in: bool, next_session_id: int | None = None): if has_more_sessions: more_sessions = limit + paging_config.step else: @@ -87,7 +87,7 @@ def _sessions_list(request: Request, sessions: list[model.Session], has_more_ses ) -def sessions_editor_row(request: Request, session_date: Optional[model.Session] = None, session_id: Optional[int] = None): +def sessions_editor_row(request: Request, session_date: model.Session | None = None, session_id: int | None = None): if session_date is None: if session_id is None: raise ValueError("Must either give session or session_id") @@ -115,7 +115,7 @@ def sessions_editor_row_editing(request, session_id: int): ) -def sessions_editor_insert_row(request, session_date: Optional[model.Session] = None): +def sessions_editor_insert_row(request, session_date: model.Session | None = None): session_date = service.insert_session(session_date or service.new_session()) return sessions_editor_row(request, session_date=session_date) diff --git a/folkugat_web/fragments/tema.py b/folkugat_web/fragments/tema.py index 6da443e..e577a93 100644 --- a/folkugat_web/fragments/tema.py +++ b/folkugat_web/fragments/tema.py @@ -7,7 +7,7 @@ from folkugat_web.services.temes.links import guess_link_type from folkugat_web.templates import templates -def title(request: Request, logged_in: bool, tema: Optional[model.Tema] = None, tema_id: Optional[int] = None): +def title(request: Request, logged_in: bool, tema: model.Tema | None = None, tema_id: int | None = None): if tema is None: if tema_id is None: raise ValueError("Either 'tema' or 'tema_id' must be given!") diff --git a/folkugat_web/model/__init__.py b/folkugat_web/model/__init__.py index 08424dc..adac041 100644 --- a/folkugat_web/model/__init__.py +++ b/folkugat_web/model/__init__.py @@ -1 +1,3 @@ from ._base import IndexedList + +__all__ = ["IndexedList"] diff --git a/folkugat_web/model/_base.py b/folkugat_web/model/_base.py index 06f1f99..69b3a3d 100644 --- a/folkugat_web/model/_base.py +++ b/folkugat_web/model/_base.py @@ -1,16 +1,19 @@ import dataclasses -from typing import Optional, TypeVar +from typing import TypeVar + +from typing_extensions import override @dataclasses.dataclass class WithId: - id: Optional[int] + id: int | None T = TypeVar("T", bound=WithId) class IndexedList(list[T]): + @override def append(self, _item: T) -> None: if _item.id is None: _item.id = max((i.id or 0 for i in self), default=0) + 1 diff --git a/folkugat_web/model/playlists.py b/folkugat_web/model/playlists.py index 0857c75..5076b75 100644 --- a/folkugat_web/model/playlists.py +++ b/folkugat_web/model/playlists.py @@ -1,6 +1,6 @@ import dataclasses from collections.abc import Iterator -from typing import Optional, Self +from typing import Self from folkugat_web.model.temes import Tema from folkugat_web.utils import groupby @@ -8,17 +8,17 @@ from folkugat_web.utils import groupby @dataclasses.dataclass class PlaylistEntry: - id: Optional[int] + id: int | None session_id: int set_id: int - tema_id: Optional[int] + tema_id: int | None @dataclasses.dataclass class TemaInSet: - id: Optional[int] - tema_id: Optional[int] - tema: Optional[Tema] + id: int | None + tema_id: int | None + tema: Tema | None def to_playlist_entry(self, session_id: int, set_id: int) -> PlaylistEntry: return PlaylistEntry( @@ -38,7 +38,7 @@ class Set: id: int temes: list[TemaInSet] - def to_playlist_entries(self, session_id) -> Iterator[PlaylistEntry]: + def to_playlist_entries(self, session_id: int) -> Iterator[PlaylistEntry]: for tema_in_set in self.temes: yield tema_in_set.to_playlist_entry( session_id=session_id, diff --git a/folkugat_web/model/search.py b/folkugat_web/model/search.py index d46a5f3..05ccd90 100644 --- a/folkugat_web/model/search.py +++ b/folkugat_web/model/search.py @@ -1,4 +1,8 @@ import dataclasses +from collections.abc import Iterable +from typing import Self + +NGrams = dict[int, list[str]] @dataclasses.dataclass(order=True) @@ -7,13 +11,14 @@ class SearchMatch: ngram: str @classmethod - def combine_matches(cls, matches): + def combine_matches(cls, matches: Iterable[Self]) -> Self: ngrams, distances = zip(*((match.ngram, match.distance) for match in matches)) return cls( ngram=', '.join(ngrams), - distance=sum(distances)/len(distances) + distance=sum(distances)/len(distances) ) + @dataclasses.dataclass class QueryResult: id: int diff --git a/folkugat_web/model/sessions.py b/folkugat_web/model/sessions.py index cdb9c99..4af7594 100644 --- a/folkugat_web/model/sessions.py +++ b/folkugat_web/model/sessions.py @@ -1,7 +1,6 @@ import dataclasses import datetime import enum -from typing import Optional DEFAULT_START_TIME = datetime.time(20, 30) DEFAULT_END_TIME = datetime.time(22, 30) @@ -9,13 +8,13 @@ DEFAULT_END_TIME = datetime.time(22, 30) @dataclasses.dataclass class SessionVenue: - name: Optional[str] = None - url: Optional[str] = None + name: str | None = None + url: str | None = None @dataclasses.dataclass class Session: - id: Optional[int] = None + id: int | None = None date: datetime.date = dataclasses.field(default_factory=datetime.date.today) start_time: datetime.time = DEFAULT_START_TIME end_time: datetime.time = DEFAULT_END_TIME diff --git a/folkugat_web/model/sql.py b/folkugat_web/model/sql.py index 2a8cc36..a477352 100644 --- a/folkugat_web/model/sql.py +++ b/folkugat_web/model/sql.py @@ -1,10 +1,9 @@ from __future__ import annotations import dataclasses -import datetime import enum from abc import abstractmethod -from typing import Generic, Optional, Protocol, TypeVar +from typing import Generic, Protocol, TypeVar class Comparable(Protocol): @@ -30,12 +29,12 @@ T = TypeVar("T", bound=Comparable) @dataclasses.dataclass class Range(Generic[T]): - gt: Optional[T] = None - gte: Optional[T] = None - lt: Optional[T] = None - lte: Optional[T] = None + gt: T | None = None + gte: T | None = None + lt: T | None = None + lte: T | None = None - def lower_bound(self) -> Optional[tuple[T, bool]]: + def lower_bound(self) -> tuple[T, bool] | None: if self.gt is None and self.gte is None: return None elif self.gt is not None and self.gte is not None: @@ -47,7 +46,7 @@ class Range(Generic[T]): elif self.gte is not None: return self.gte, True - def upper_bound(self) -> Optional[tuple[T, bool]]: + def upper_bound(self) -> tuple[T, bool] | None: if self.lt is None and self.lte is None: return None elif self.lt is not None and self.lte is not None: diff --git a/folkugat_web/model/temes.py b/folkugat_web/model/temes.py index 64c9ee7..0a3c0fc 100644 --- a/folkugat_web/model/temes.py +++ b/folkugat_web/model/temes.py @@ -1,14 +1,14 @@ import dataclasses import datetime import enum -from typing import Optional +from typing import Self +from folkugat_web.model.search import NGrams +from folkugat_web.model.sessions import Session from folkugat_web.services import ngrams from ._base import IndexedList, WithId -NGrams = dict[int, list[str]] - class ContentType(enum.Enum): PARTITURA = "partitura" @@ -28,7 +28,7 @@ class LinkType(enum.Enum): @dataclasses.dataclass class Link(WithId): content_type: ContentType - link_type: Optional[LinkType] + link_type: LinkType | None url: str title: str = "" @@ -93,7 +93,7 @@ class Lyrics(WithId): ) @classmethod - def from_dict(cls, d): + def from_dict(cls, d) -> Self: return cls( id=d["id"], title=d["title"], @@ -103,7 +103,7 @@ class Lyrics(WithId): @dataclasses.dataclass class Tema: - id: Optional[int] = None + id: int | None = None # Info title: str = "" properties: IndexedList[Property] = dataclasses.field(default_factory=IndexedList) @@ -116,6 +116,8 @@ class Tema: # Other info modification_date: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) creation_date: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) + # Stats + played: list[Session] = dataclasses.field(default_factory=list) def compute_ngrams(self): self.ngrams = ngrams.get_text_ngrams(self.title, *self.alternatives) @@ -124,5 +126,5 @@ class Tema: self.compute_ngrams() return self - def score(self) -> Optional[Link]: + def score(self) -> Link | None: return next(filter(lambda l: l.content_type is ContentType.PARTITURA and l.url.startswith('/'), self.links), None) diff --git a/folkugat_web/services/auth.py b/folkugat_web/services/auth.py index 7d28a69..557c01e 100644 --- a/folkugat_web/services/auth.py +++ b/folkugat_web/services/auth.py @@ -7,13 +7,13 @@ from folkugat_web.config import auth as config from folkugat_web.log import logger -def login(value): +def login(value: str | None) -> bool: if value and value.lower() == config.ADMIN_PASSWORD: return True return False -def logged_in(nota_folkugat: Annotated[Optional[str], Cookie()] = None) -> bool: +def logged_in(nota_folkugat: Annotated[str | None, Cookie()] = None) -> bool: if not nota_folkugat: return False try: diff --git a/folkugat_web/services/ngrams.py b/folkugat_web/services/ngrams.py index e2cea61..f3cf125 100644 --- a/folkugat_web/services/ngrams.py +++ b/folkugat_web/services/ngrams.py @@ -1,19 +1,21 @@ -import operator - from folkugat_web.config import search as config +from folkugat_web.model.search import NGrams from folkugat_web.utils import groupby -def get_all_ngrams(text): +def get_all_ngrams(text: str) -> list[tuple[int, str]]: return [(m, text[i:i+m]) for m in range(config.MIN_NGRAM_LENGTH, len(text) + 1) for i in range(len(text) - m + 1) if m > 0] -def get_text_ngrams(*texts): - texts = [word.lower() for text in texts for word in text.split()] - word_ngrams = [ngram for ngrams in map(get_all_ngrams, texts) for ngram in ngrams] - result = dict(groupby(word_ngrams, - key_fn=operator.itemgetter(0), - group_fn=lambda gr: list(set(map(operator.itemgetter(1), gr))))) + +def get_text_ngrams(*texts: str) -> NGrams: + lower_texts = [word.lower() for text in texts for word in text.split()] + word_ngrams = [ngram for ngrams in map(get_all_ngrams, lower_texts) for ngram in ngrams] + result = dict(groupby( + word_ngrams, + key_fn=lambda x: x[0], + group_fn=lambda gr: list(set(map(lambda x: x[1], gr))), + )) return result diff --git a/folkugat_web/services/playlists.py b/folkugat_web/services/playlists.py index 94ed929..c9ef46d 100644 --- a/folkugat_web/services/playlists.py +++ b/folkugat_web/services/playlists.py @@ -25,14 +25,14 @@ def add_tema_to_tema_in_set(tema_in_set: playlists.TemaInSet) -> playlists.TemaI return tema_in_set -def get_playlist(session_id: int, con: Optional[Connection] = None) -> playlists.Playlist: +def get_playlist(session_id: int, con: Connection | None = None) -> playlists.Playlist: return playlists.Playlist.from_playlist_entries( session_id=session_id, entries=list(query.get_playlist_entries(session_id=session_id, con=con)) ) -def add_set(session_id: int, con: Optional[Connection] = None) -> playlists.Set: +def add_set(session_id: int, con: Connection | None = None) -> playlists.Set: with get_connection(con) as con: curr_playlist = get_playlist(session_id=session_id, con=con) new_set_id = max([set_entry.id for set_entry in curr_playlist.sets], default=0) + 1 @@ -41,7 +41,7 @@ def add_set(session_id: int, con: Optional[Connection] = None) -> playlists.Set: return playlists.Set.from_playlist_entries(set_id=inserted_entry.set_id, entries=[inserted_entry]) -def get_set(session_id: int, set_id: int, con: Optional[Connection] = None) -> Optional[playlists.Set]: +def get_set(session_id: int, set_id: int, con: Connection | None = None) -> playlists.Set | None: entries = list(query.get_playlist_entries(session_id=session_id, set_id=set_id, con=con)) if entries: return playlists.Set.from_playlist_entries(set_id=set_id, entries=entries) @@ -49,30 +49,30 @@ def get_set(session_id: int, set_id: int, con: Optional[Connection] = None) -> O return None -def delete_set(session_id: int, set_id: int, con: Optional[Connection] = None): +def delete_set(session_id: int, set_id: int, con: Connection | None = None): write.delete_playlist_set(session_id=session_id, set_id=set_id, con=con) -def add_tema(session_id: int, set_id: int, con: Optional[Connection] = None) -> playlists.TemaInSet: +def add_tema(session_id: int, set_id: int, con: Connection | None = None) -> playlists.TemaInSet: with get_connection(con) as con: new_entry = playlists.PlaylistEntry(id=None, session_id=session_id, set_id=set_id, tema_id=None) inserted_entry = write.insert_playlist_entry(new_entry) return playlists.TemaInSet.from_playlist_entry(inserted_entry) -def get_tema(entry_id: int, con: Optional[Connection] = None) -> playlists.TemaInSet: +def get_tema(entry_id: int, con: Connection | None = None) -> playlists.TemaInSet: with get_connection(con) as con: entry = next(query.get_playlist_entries(entry_id=entry_id)) return playlists.TemaInSet.from_playlist_entry(entry) -def delete_tema(entry_id: int, con: Optional[Connection] = None): +def delete_tema(entry_id: int, con: Connection | None = None): with get_connection(con) as con: write.delete_playlist_entry(entry_id=entry_id, con=con) -def set_tema(session_id: int, set_id: int, entry_id: int, tema_id: Optional[int], - con: Optional[Connection] = None): +def set_tema(session_id: int, set_id: int, entry_id: int, tema_id: int | None, + con: Connection | None = None): with get_connection(con) as con: new_entry = playlists.PlaylistEntry(id=entry_id, session_id=session_id, set_id=set_id, tema_id=tema_id) write.update_playlist_entry(entry=new_entry, con=con) diff --git a/folkugat_web/services/sessions.py b/folkugat_web/services/sessions.py index 8079d3a..0a99a61 100644 --- a/folkugat_web/services/sessions.py +++ b/folkugat_web/services/sessions.py @@ -1,9 +1,8 @@ import datetime from datetime import date as Date -from typing import Optional from folkugat_web.config import date as config -from folkugat_web.dal.sql import sessions as dal +from folkugat_web.dal.sql.sessions import query, write from folkugat_web.model import sessions as model from folkugat_web.model.sql import Order, OrderCol, Range @@ -27,11 +26,11 @@ def new_session() -> model.Session: def get_sessions() -> list[model.Session]: - return dal.get_sessions(order_by=OrderCol(model.SessionCols.DATE, Order.ASCENDING)) + return query.get_sessions(order_by=OrderCol(model.SessionCols.DATE, Order.ASCENDING)) -def get_next_sessions(limit: Optional[int] = None, offset: Optional[int] = None) -> list[model.Session]: - return dal.get_sessions( +def get_next_sessions(limit: int | None = None, offset: int | None = None) -> list[model.Session]: + return query.get_sessions( date_range=Range(gte=datetime.date.today()), order_by=OrderCol(model.SessionCols.DATE, Order.ASCENDING), limit=limit, @@ -39,8 +38,8 @@ def get_next_sessions(limit: Optional[int] = None, offset: Optional[int] = None) ) -def get_sessions_history(limit: Optional[int] = None, offset: Optional[int] = None) -> list[model.Session]: - return dal.get_sessions( +def get_sessions_history(limit: int | None = None, offset: int | None = None) -> list[model.Session]: + return query.get_sessions( date_range=Range(lt=datetime.date.today()), order_by=OrderCol(model.SessionCols.DATE, Order.DESCENDING), limit=limit, @@ -48,33 +47,33 @@ def get_sessions_history(limit: Optional[int] = None, offset: Optional[int] = No ) -def get_next_session() -> Optional[model.Session]: +def get_next_session() -> model.Session | None: return next(iter(get_next_sessions(limit=1,)), None) -def get_live_session() -> Optional[model.Session]: - return next(iter(dal.get_sessions(is_live=True)), None) +def get_live_session() -> model.Session | None: + return next(iter(query.get_sessions(is_live=True)), None) -def get_session(session_id: int) -> Optional[model.Session]: - return next(iter(dal.get_sessions(session_id=session_id)), None) +def get_session(session_id: int) -> model.Session | None: + return next(iter(query.get_sessions(session_id=session_id)), None) def insert_session(session: model.Session): - return dal.insert_session(session) + return write.insert_session(session) def set_session(session: model.Session): - dal.update_session(session) + write.update_session(session) def delete_session(session_id: int): - dal.delete_session_by_id(session_id) + write.delete_session_by_id(session_id) def stop_live_sessions(): - dal.stop_live_sessions() + write.stop_live_sessions() def set_live_session(session_id: int): - dal.set_live_session(session_id=session_id) + write.set_live_session(session_id=session_id) diff --git a/folkugat_web/services/temes/links.py b/folkugat_web/services/temes/links.py index 35aea64..f7a2b75 100644 --- a/folkugat_web/services/temes/links.py +++ b/folkugat_web/services/temes/links.py @@ -24,7 +24,7 @@ LINK_RES = { } -def guess_link_type(url: str) -> Optional[model.LinkType]: +def guess_link_type(url: str) -> model.LinkType | None: for link_type, regexes in LINK_RES.items(): for regex in regexes: if regex.match(url): diff --git a/folkugat_web/services/temes/query.py b/folkugat_web/services/temes/query.py index f274a00..890ae04 100644 --- a/folkugat_web/services/temes/query.py +++ b/folkugat_web/services/temes/query.py @@ -4,5 +4,5 @@ from folkugat_web.dal.sql.temes import query as temes_q from folkugat_web.model import temes as model -def get_tema_by_id(tema_id: int) -> Optional[model.Tema]: +def get_tema_by_id(tema_id: int) -> model.Tema | None: return temes_q.get_tema_by_id(tema_id) diff --git a/folkugat_web/services/temes/search.py b/folkugat_web/services/temes/search.py index 7d2654c..2c9fe17 100644 --- a/folkugat_web/services/temes/search.py +++ b/folkugat_web/services/temes/search.py @@ -1,7 +1,7 @@ import functools import time import typing -from collections.abc import Callable, Iterable, Iterator +from collections.abc import Callable, Iterable import Levenshtein from folkugat_web.config import search as config @@ -12,7 +12,7 @@ from folkugat_web.model import search as search_model from folkugat_web.model import temes as model -def get_query_word_similarity(query_word: str, text_ngrams: model.NGrams) -> search_model.SearchMatch: +def get_query_word_similarity(query_word: str, text_ngrams: search_model.NGrams) -> search_model.SearchMatch: n = len(query_word) if n < config.MIN_NGRAM_LENGTH: return search_model.SearchMatch(distance=0.0, ngram='') @@ -27,13 +27,13 @@ def get_query_word_similarity(query_word: str, text_ngrams: model.NGrams) -> sea default=search_model.SearchMatch(distance=float("inf"), ngram="")) -def get_query_similarity(query: str, ngrams: model.NGrams) -> search_model.SearchMatch: +def get_query_similarity(query: str, ngrams: search_model.NGrams) -> search_model.SearchMatch: query_words = query.lower().split() word_matches = map(lambda query_word: get_query_word_similarity(query_word, ngrams), query_words) return search_model.SearchMatch.combine_matches(word_matches) -def build_result(query: str, entry: tuple[int, model.NGrams]) -> search_model.QueryResult: +def build_result(query: str, entry: tuple[int, search_model.NGrams]) -> search_model.QueryResult: if len(query) == 0: return search_model.QueryResult( id=entry[0], @@ -51,6 +51,7 @@ def build_result(query: str, entry: tuple[int, model.NGrams]) -> search_model.Qu T = typing.TypeVar("T") +@typing.no_type_check def _thread(it: Iterable[T], *funcs: Callable[[Iterable], Iterable]) -> Iterable: return functools.reduce(lambda i, fn: fn(i), funcs, it) diff --git a/folkugat_web/utils.py b/folkugat_web/utils.py index fa3699e..b6ffc1d 100644 --- a/folkugat_web/utils.py +++ b/folkugat_web/utils.py @@ -1,6 +1,22 @@ import itertools +from collections.abc import Callable, Iterable, Iterator +from typing import Protocol, Self, TypeVar -def groupby(it, key_fn=lambda x: x, group_fn=lambda x: x): +class SupportsLessThan(Protocol): + def __lt__(self, other: Self) -> bool: + raise NotImplementedError() + + +T = TypeVar("T") +KeyT = TypeVar("KeyT", bound=SupportsLessThan) +GroupT = TypeVar("GroupT") + + +def groupby( + it: Iterable[T], + key_fn: Callable[[T], KeyT], + group_fn: Callable[[Iterable[T]], GroupT] = lambda x: x, +) -> Iterator[tuple[KeyT, GroupT]]: for k, g in itertools.groupby(sorted(it, key=key_fn), key=key_fn): yield k, group_fn(g)