Use basedpyright refactor

This commit is contained in:
marc
2025-03-22 23:06:34 +01:00
parent ac54453b7f
commit 2f7c7c2429
41 changed files with 480 additions and 381 deletions

View File

@@ -49,10 +49,12 @@
# Python dependencies # Python dependencies
python python
pythonEnv pythonEnv
pyright basedpyright
black black
# Project dependencies # Project dependencies
nodePackages_latest.tailwindcss nodePackages_latest.tailwindcss
# Project tools
sqlite
]; ];
shellHook = '' shellHook = ''

View File

@@ -1,4 +1,4 @@
from typing import Annotated, Optional from typing import Annotated
from fastapi import Cookie, Form, Request from fastapi import Cookie, Form, Request
from folkugat_web.api import router from folkugat_web.api import router
@@ -8,13 +8,13 @@ from folkugat_web.services import auth as service
@router.get("/api/nota") @router.get("/api/nota")
def nota_input(request: Request, value: Optional[str] = None): def nota_input(request: Request, value: str | None = None):
return nota.input(request, value) return nota.input(request, value)
@router.post("/api/nota") @router.post("/api/nota")
def login(request: Request, value: Annotated[Optional[str], Form()] = None, def login(request: Request, value: Annotated[str | None, Form()] = None,
nota_folkugat: Annotated[Optional[str], Cookie()] = None): nota_folkugat: Annotated[str | None, Cookie()] = None):
logged_in = service.logged_in(nota_folkugat) logged_in = service.logged_in(nota_folkugat)
new_login = service.login(value) new_login = service.login(value)
if new_login and not logged_in: if new_login and not logged_in:

View File

@@ -24,8 +24,8 @@ def modify_session(
date: Annotated[datetime.date, Form()], date: Annotated[datetime.date, Form()],
start_time: Annotated[datetime.time, Form()], start_time: Annotated[datetime.time, Form()],
end_time: Annotated[datetime.time, Form()], end_time: Annotated[datetime.time, Form()],
venue_name: Annotated[Optional[str], Form()] = None, venue_name: Annotated[str | None, Form()] = None,
venue_url: Annotated[Optional[str], Form()] = None, venue_url: Annotated[str | None, Form()] = None,
): ):
session_date = sessions.model.Session(id=session_id, date=date, start_time=start_time, end_time=end_time, session_date = sessions.model.Session(id=session_id, date=date, start_time=start_time, end_time=end_time,
venue=sessions.model.SessionVenue(name=venue_name, url=venue_url)) venue=sessions.model.SessionVenue(name=venue_name, url=venue_url))

View File

@@ -1,10 +1,9 @@
from typing import Annotated, Optional from typing import Annotated
from fastapi import Request, UploadFile from fastapi import Request, UploadFile
from fastapi.params import File, Form, Param from fastapi.params import File, Form, Param
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
from folkugat_web.api import router from folkugat_web.api import router
from folkugat_web.config import db
from folkugat_web.fragments import tema, temes from folkugat_web.fragments import tema, temes
from folkugat_web.model import temes as model from folkugat_web.model import temes as model
from folkugat_web.services import auth, files from folkugat_web.services import auth, files
@@ -78,7 +77,7 @@ def set_lyric(
lyric: Annotated[str, Form()], lyric: Annotated[str, Form()],
): ):
new_lyric = model.Lyrics(id=lyric_id, title=title, content=lyric.strip()) new_lyric = model.Lyrics(id=lyric_id, title=title, content=lyric.strip())
temes_w.update_lyric(tema_id=tema_id, lyric_id=lyric_id, lyric=new_lyric) _ = temes_w.update_lyric(tema_id=tema_id, lyric_id=lyric_id, lyric=new_lyric)
return tema.lyric(request=request, logged_in=logged_in, tema_id=tema_id, lyric_id=lyric_id) return tema.lyric(request=request, logged_in=logged_in, tema_id=tema_id, lyric_id=lyric_id)
@@ -105,7 +104,7 @@ def delete_lyric(
tema_id: int, tema_id: int,
lyric_id: int, lyric_id: int,
): ):
temes_w.delete_lyric(tema_id=tema_id, lyric_id=lyric_id) _ = temes_w.delete_lyric(tema_id=tema_id, lyric_id=lyric_id)
return HTMLResponse() return HTMLResponse()
@@ -121,9 +120,9 @@ async def set_link(
tema_id: int, tema_id: int,
link_id: int, link_id: int,
content_type: Annotated[model.ContentType, Form()], content_type: Annotated[model.ContentType, Form()],
title: Annotated[Optional[str], Form()] = None, title: Annotated[str | None, Form()] = None,
url: Annotated[Optional[str], Form()] = None, url: Annotated[str | None, Form()] = None,
upload_file: Annotated[Optional[UploadFile], File()] = None, upload_file: Annotated[UploadFile | None, File()] = None,
): ):
if upload_file: if upload_file:
url = await files.store_file(tema_id=tema_id, upload_file=upload_file) url = await files.store_file(tema_id=tema_id, upload_file=upload_file)
@@ -136,7 +135,7 @@ async def set_link(
url=url or '', url=url or '',
title=title or '', title=title or '',
) )
new_tema = temes_w.update_link(tema_id=tema_id, link_id=link_id, link=new_link) _ = temes_w.update_link(tema_id=tema_id, link_id=link_id, link=new_link)
return tema.link(request=request, logged_in=logged_in, tema_id=tema_id, link_id=link_id) return tema.link(request=request, logged_in=logged_in, tema_id=tema_id, link_id=link_id)
@@ -164,7 +163,7 @@ def delete_link(
tema_id: int, tema_id: int,
link_id: int, link_id: int,
): ):
temes_w.delete_link(tema_id=tema_id, link_id=link_id) _tema = temes_w.delete_link(tema_id=tema_id, link_id=link_id)
return HTMLResponse( return HTMLResponse(
headers={ headers={
"HX-Trigger": f"reload-tema-{tema_id}-score" "HX-Trigger": f"reload-tema-{tema_id}-score"
@@ -219,7 +218,7 @@ def set_property(
value: Annotated[str, Form()], value: Annotated[str, Form()],
): ):
new_property = model.Property(id=property_id, field=field, value=value.strip()) new_property = model.Property(id=property_id, field=field, value=value.strip())
temes_w.update_property(tema_id=tema_id, property_id=property_id, prop=new_property) _ = temes_w.update_property(tema_id=tema_id, property_id=property_id, prop=new_property)
return tema.property_(request=request, logged_in=logged_in, tema_id=tema_id, property_id=property_id) return tema.property_(request=request, logged_in=logged_in, tema_id=tema_id, property_id=property_id)
@@ -243,7 +242,7 @@ def add_property(
@router.delete("/api/tema/{tema_id}/property/{property_id}") @router.delete("/api/tema/{tema_id}/property/{property_id}")
def delete_property(_: auth.RequireLogin, tema_id: int, property_id: int): def delete_property(_: auth.RequireLogin, tema_id: int, property_id: int):
temes_w.delete_property(tema_id=tema_id, property_id=property_id) _tema = temes_w.delete_property(tema_id=tema_id, property_id=property_id)
return HTMLResponse() return HTMLResponse()

View File

@@ -1,4 +1,4 @@
from typing import Annotated, Optional from typing import Annotated
from fastapi import Request from fastapi import Request
from fastapi.params import Param from fastapi.params import Param

View File

@@ -1 +1,3 @@
from ._connection import get_connection, Connection from ._connection import Connection, get_connection
__all__ = ["get_connection", "Connection"]

View File

@@ -8,7 +8,7 @@ Connection = sqlite3.Connection
@contextmanager @contextmanager
def get_connection(con: Optional[Connection] = None) -> Iterator[Connection]: def get_connection(con: Connection | None = None) -> Iterator[Connection]:
if con: if con:
yield con yield con
else: else:

View File

@@ -1,10 +1,11 @@
from folkugat_web.dal.sql import get_connection, sessions from folkugat_web.dal.sql import get_connection
from folkugat_web.dal.sql.playlists import ddl as playlists_ddl from folkugat_web.dal.sql.playlists import ddl as playlists_ddl
from folkugat_web.dal.sql.sessions import ddl as sessions_ddl
from folkugat_web.dal.sql.temes import ddl as temes_ddl from folkugat_web.dal.sql.temes import ddl as temes_ddl
def create_db(): def create_db():
with get_connection() as con: with get_connection() as con:
sessions.create_db(con) sessions_ddl.create_db(con)
temes_ddl.create_db(con) temes_ddl.create_db(con)
playlists_ddl.create_db(con) playlists_ddl.create_db(con)

View File

@@ -1,7 +1,18 @@
from typing import TypedDict
from folkugat_web.model import playlists as model from folkugat_web.model import playlists as model
PlaylistRowTuple = tuple[int, int, int, int | None]
def playlist_entry_to_row(tema_in_set: model.PlaylistEntry) -> dict:
class PlaylistRowDict(TypedDict):
id: int | None
session_id: int
set_id: int
tema_id: int | None
def playlist_entry_to_row(tema_in_set: model.PlaylistEntry) -> PlaylistRowDict:
return { return {
'id': tema_in_set.id, 'id': tema_in_set.id,
'session_id': tema_in_set.session_id, 'session_id': tema_in_set.session_id,
@@ -10,7 +21,7 @@ def playlist_entry_to_row(tema_in_set: model.PlaylistEntry) -> dict:
} }
def row_to_playlist_entry(row: tuple) -> model.PlaylistEntry: def row_to_playlist_entry(row: PlaylistRowTuple) -> model.PlaylistEntry:
return model.PlaylistEntry( return model.PlaylistEntry(
id=row[0], id=row[0],
session_id=row[1], session_id=row[1],

View File

@@ -1,9 +1,7 @@
from typing import Optional
from folkugat_web.dal.sql import Connection, get_connection from folkugat_web.dal.sql import Connection, get_connection
def create_db(con: Optional[Connection] = None): def create_db(con: Connection | None = None):
with get_connection(con) as con: with get_connection(con) as con:
create_playlists_table(con) create_playlists_table(con)
@@ -11,7 +9,7 @@ def create_db(con: Optional[Connection] = None):
def drop_playlists_table(con: Connection): def drop_playlists_table(con: Connection):
query = "DROP TABLE IF EXISTS playlists" query = "DROP TABLE IF EXISTS playlists"
cur = con.cursor() cur = con.cursor()
cur.execute(query) _ = cur.execute(query)
def create_playlists_table(con: Connection): def create_playlists_table(con: Connection):
@@ -24,4 +22,4 @@ def create_playlists_table(con: Connection):
) )
""" """
cur = con.cursor() cur = con.cursor()
cur.execute(query) _ = cur.execute(query)

View File

@@ -1,19 +1,27 @@
from collections.abc import Iterator from collections.abc import Iterator
from typing import Optional from typing import TypedDict
from folkugat_web.dal.sql import Connection, get_connection from folkugat_web.dal.sql import Connection, get_connection
from folkugat_web.dal.sql.sessions import conversion as sessions_conversion
from folkugat_web.model import playlists as model from folkugat_web.model import playlists as model
from folkugat_web.model.sessions import Session
from ._conversion import row_to_playlist_entry from . import conversion
class QueryData(TypedDict, total=False):
id: int
set_id: int
session_id: int
def _filter_clause( def _filter_clause(
entry_id: Optional[int] = None, entry_id: int | None = None,
set_id: Optional[int] = None, set_id: int | None = None,
session_id: Optional[int] = None, session_id: int | None = None,
) -> tuple[str, dict]: ) -> tuple[str, QueryData]:
filter_clauses = [] filter_clauses: list[str] = []
query_data = {} query_data: QueryData = {}
if entry_id is not None: if entry_id is not None:
filter_clauses.append("id = :id") filter_clauses.append("id = :id")
@@ -29,10 +37,10 @@ def _filter_clause(
def get_playlist_entries( def get_playlist_entries(
entry_id: Optional[int] = None, entry_id: int | None = None,
set_id: Optional[int] = None, set_id: int | None = None,
session_id: Optional[int] = None, session_id: int | None = None,
con: Optional[Connection] = None, con: Connection | None = None,
) -> Iterator[model.PlaylistEntry]: ) -> Iterator[model.PlaylistEntry]:
filter_clause, data = _filter_clause(entry_id=entry_id, set_id=set_id, session_id=session_id) filter_clause, data = _filter_clause(entry_id=entry_id, set_id=set_id, session_id=session_id)
query = f""" query = f"""
@@ -43,5 +51,18 @@ def get_playlist_entries(
""" """
with get_connection(con) as con: with get_connection(con) as con:
cur = con.cursor() cur = con.cursor()
cur.execute(query, data) _ = cur.execute(query, data)
return map(row_to_playlist_entry, cur.fetchall()) return map(conversion.row_to_playlist_entry, cur.fetchall())
def get_tune_sessions(tema_ids: list[int], con: Connection | None = None) -> Iterator[Session]:
query = """
SELECT p.tema_id, s.id, s.date, s.start_time, s.end_time, s.venue_name, s.venue_url, s.is_live
FROM playlists p JOIN sessions s ON p.session_id = s.id
WHERE p.tema_id IN :tema_ids
"""
data = dict(tema_ids=tuple(tema_ids))
with get_connection(con) as con:
cur = con.cursor()
_ = cur.execute(query, data)
return map(sessions_conversion.row_to_session, cur.fetchall())

View File

@@ -1,12 +1,10 @@
from typing import Optional
from folkugat_web.dal.sql import Connection, get_connection from folkugat_web.dal.sql import Connection, get_connection
from folkugat_web.model import playlists as model from folkugat_web.model import playlists as model
from ._conversion import playlist_entry_to_row, row_to_playlist_entry from . import conversion
def insert_playlist_entry(pl_entry: model.PlaylistEntry, con: Optional[Connection] = None) -> model.PlaylistEntry: def insert_playlist_entry(pl_entry: model.PlaylistEntry, con: Connection | None = None) -> model.PlaylistEntry:
query = """ query = """
INSERT INTO playlists INSERT INTO playlists
(id, session_id, set_id, tema_id) (id, session_id, set_id, tema_id)
@@ -14,15 +12,15 @@ def insert_playlist_entry(pl_entry: model.PlaylistEntry, con: Optional[Connectio
(:id, :session_id, :set_id, :tema_id) (:id, :session_id, :set_id, :tema_id)
RETURNING * RETURNING *
""" """
data = playlist_entry_to_row(pl_entry) data = conversion.playlist_entry_to_row(pl_entry)
with get_connection(con) as con: with get_connection(con) as con:
cur = con.cursor() cur = con.cursor()
cur.execute(query, data) _ = cur.execute(query, data)
row = cur.fetchone() row: conversion.PlaylistRowTuple = cur.fetchone()
return row_to_playlist_entry(row) return conversion.row_to_playlist_entry(row)
def update_playlist_entry(entry: model.PlaylistEntry, con: Optional[Connection] = None): def update_playlist_entry(entry: model.PlaylistEntry, con: Connection | None = None):
query = """ query = """
UPDATE playlists UPDATE playlists
SET SET
@@ -30,14 +28,14 @@ def update_playlist_entry(entry: model.PlaylistEntry, con: Optional[Connection]
WHERE WHERE
id = :id id = :id
""" """
data = playlist_entry_to_row(entry) data = conversion.playlist_entry_to_row(entry)
with get_connection(con) as con: with get_connection(con) as con:
cur = con.cursor() cur = con.cursor()
cur.execute(query, data) _ = cur.execute(query, data)
return return
def delete_playlist_entry(entry_id: int, con: Optional[Connection] = None): def delete_playlist_entry(entry_id: int, con: Connection | None = None):
query = """ query = """
DELETE FROM playlists DELETE FROM playlists
WHERE id = :id WHERE id = :id
@@ -45,11 +43,11 @@ def delete_playlist_entry(entry_id: int, con: Optional[Connection] = None):
data = dict(id=entry_id) data = dict(id=entry_id)
with get_connection(con) as con: with get_connection(con) as con:
cur = con.cursor() cur = con.cursor()
cur.execute(query, data) _ = cur.execute(query, data)
return return
def delete_playlist_set(session_id: int, set_id: int, con: Optional[Connection] = None): def delete_playlist_set(session_id: int, set_id: int, con: Connection | None = None):
query = """ query = """
DELETE FROM playlists DELETE FROM playlists
WHERE session_id = :session_id AND set_id = :set_id WHERE session_id = :session_id AND set_id = :set_id
@@ -57,5 +55,5 @@ def delete_playlist_set(session_id: int, set_id: int, con: Optional[Connection]
data = dict(session_id=session_id, set_id=set_id) data = dict(session_id=session_id, set_id=set_id)
with get_connection(con) as con: with get_connection(con) as con:
cur = con.cursor() cur = con.cursor()
cur.execute(query, data) _ = cur.execute(query, data)
return return

View File

@@ -1,194 +0,0 @@
import datetime
from typing import Optional
from folkugat_web.dal.sql import Connection, get_connection
from folkugat_web.model import sessions as model
from folkugat_web.model.sql import OrderCol, Range
from folkugat_web.typing import OptionalListOrValue as OLV
def create_db(con: Optional[Connection] = None):
with get_connection(con) as con:
create_sessions_table(con)
def drop_sessions_table(con: Connection):
query = "DROP TABLE IF EXISTS sessions"
cur = con.cursor()
cur.execute(query)
def create_sessions_table(con: Connection):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY,
date TEXT NOT NULL,
start_time TEXT NOT NULL,
end_time TEXT NOT NULL,
venue_name TEXT,
venue_url TEXT,
is_live BOOLEAN DEFAULT false
)
"""
cur = con.cursor()
cur.execute(query)
def _session_to_row(sessio: model.Session) -> dict:
return {
'id': sessio.id,
'date': sessio.date,
'start_time': sessio.start_time.isoformat(),
'end_time': sessio.end_time.isoformat(),
'venue_name': sessio.venue.name,
'venue_url': sessio.venue.url,
'is_live': sessio.is_live,
}
def _row_to_session(row: tuple) -> model.Session:
return model.Session(
id=row[0],
date=datetime.date.fromisoformat(row[1]),
start_time=datetime.time.fromisoformat(row[2]),
end_time=datetime.time.fromisoformat(row[3]),
venue=model.SessionVenue(
name=row[4],
url=row[5],
),
is_live=row[6],
)
def insert_session(session: model.Session, con: Optional[Connection] = None):
query = """
INSERT INTO sessions
(id, date, start_time, end_time, venue_name, venue_url, is_live)
VALUES
(:id, :date, :start_time, :end_time, :venue_name, :venue_url, :is_live)
RETURNING *
"""
data = _session_to_row(session)
with get_connection(con) as con:
cur = con.cursor()
cur.execute(query, data)
row = cur.fetchone()
return _row_to_session(row)
def update_session(session: model.Session, con: Optional[Connection] = None):
query = """
UPDATE sessions SET
date = :date, start_time = :start_time, end_time = :end_time,
venue_name = :venue_name, venue_url = :venue_url, is_live = :is_live
WHERE id = :id
"""
data = _session_to_row(session)
with get_connection(con) as con:
cur = con.cursor()
cur.execute(query, data)
def _filter_clause(session_id: Optional[int] = None,
date_range: Optional[Range[datetime.date]] = None,
is_live: Optional[bool] = None) -> tuple[str, dict]:
filter_clauses = []
filter_data = {}
if session_id is not None:
filter_clauses.append(f"id = :session_id")
filter_data["session_id"] = session_id
if date_range:
if ub := date_range.upper_bound():
operator = "<=" if ub[1] else "<"
filter_clauses.append(f"date {operator} :date_ub")
filter_data["date_ub"] = ub[0]
if lb := date_range.lower_bound():
operator = ">=" if lb[1] else ">"
filter_clauses.append(f"date {operator} :date_lb")
filter_data["date_lb"] = lb[0]
if is_live is not None:
filter_clauses.append(f"is_live = :is_live")
filter_data["is_live"] = is_live
if filter_clauses:
filter_clause_str = " AND ".join(filter_clauses)
filter_clause = f"WHERE {filter_clause_str}"
return filter_clause, filter_data
else:
return "", {}
def _order_clause(order_by: OLV[OrderCol[model.SessionCols]]) -> str:
if not order_by:
return ""
if not isinstance(order_by, list):
order_by = [order_by]
order_clauses = [f"{ocol.column.value} {ocol.order.value}" for ocol in order_by]
order_clauses_str = " ".join(order_clauses)
return f"ORDER BY {order_clauses_str}"
def get_sessions(session_id: Optional[int] = None,
date_range: Optional[Range[datetime.date]] = None,
is_live: Optional[bool] = None,
order_by: OLV[OrderCol[model.SessionCols]] = None,
limit: Optional[int] = None, offset: Optional[int] = None,
con: Optional[Connection] = None) -> list[model.Session]:
clauses = []
filter_clause, data = _filter_clause(session_id=session_id, date_range=date_range, is_live=is_live)
if filter_clause:
clauses.append(filter_clause)
if order_clause := _order_clause(order_by=order_by):
clauses.append(order_clause)
if limit is not None:
clauses.append("LIMIT :limit")
data["limit"] = limit
if offset is not None:
clauses.append("OFFSET :offset")
data["offset"] = offset
clauses_str = " ".join(clauses)
query = f"""
SELECT id, date, start_time, end_time, venue_name, venue_url, is_live
FROM sessions
{clauses_str}
"""
with get_connection(con) as con:
cur = con.cursor()
cur.execute(query, data)
return list(map(_row_to_session, cur.fetchall()))
def delete_session_by_id(session_id: int, con: Optional[Connection] = None):
query = """
DELETE FROM sessions
WHERE id = :id
"""
data = dict(id=session_id)
with get_connection(con) as con:
cur = con.cursor()
cur.execute(query, data)
def stop_live_sessions(con: Optional[Connection] = None):
query = """
UPDATE sessions SET is_live = false WHERE is_live = true
"""
with get_connection(con) as con:
cur = con.cursor()
cur.execute(query)
def set_live_session(session_id: int, con: Optional[Connection] = None):
query = """
UPDATE sessions SET is_live = true WHERE id = :id
"""
data = dict(id=session_id)
with get_connection(con) as con:
stop_live_sessions(con=con)
cur = con.cursor()
cur.execute(query, data)

View File

@@ -0,0 +1,42 @@
import datetime
from typing import TypedDict
from folkugat_web.model import sessions as model
SessionRowTuple = tuple[int, str, str, str, str | None, str | None, bool]
class SessionRowDict(TypedDict):
id: int | None
date: str
start_time: str
end_time: str
venue_name: str | None
venue_url: str | None
is_live: bool
def session_to_row(sessio: model.Session) -> SessionRowDict:
return {
'id': sessio.id,
'date': sessio.date.isoformat(),
'start_time': sessio.start_time.isoformat(),
'end_time': sessio.end_time.isoformat(),
'venue_name': sessio.venue.name,
'venue_url': sessio.venue.url,
'is_live': sessio.is_live,
}
def row_to_session(row: SessionRowTuple) -> model.Session:
return model.Session(
id=row[0],
date=datetime.date.fromisoformat(row[1]),
start_time=datetime.time.fromisoformat(row[2]),
end_time=datetime.time.fromisoformat(row[3]),
venue=model.SessionVenue(
name=row[4],
url=row[5],
),
is_live=row[6],
)

View File

@@ -0,0 +1,28 @@
from folkugat_web.dal.sql import Connection, get_connection
def create_db(con: Connection | None = None):
with get_connection(con) as con:
create_sessions_table(con)
def drop_sessions_table(con: Connection):
query = "DROP TABLE IF EXISTS sessions"
cur = con.cursor()
_ = cur.execute(query)
def create_sessions_table(con: Connection):
query = """
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY,
date TEXT NOT NULL,
start_time TEXT NOT NULL,
end_time TEXT NOT NULL,
venue_name TEXT,
venue_url TEXT,
is_live BOOLEAN DEFAULT false
)
"""
cur = con.cursor()
_ = cur.execute(query)

View File

@@ -0,0 +1,92 @@
import datetime
from typing import TypedDict
from folkugat_web.dal.sql import Connection, get_connection
from folkugat_web.model import sessions as model
from folkugat_web.model.sql import OrderCol, Range
from folkugat_web.typing import OptionalListOrValue as OLV
from . import conversion
class QueryData(TypedDict, total=False):
session_id: int | None
date_ub: datetime.date
date_lb: datetime.date
is_live: bool
limit: int
offset: int
def _filter_clause(session_id: int | None = None,
date_range: Range[datetime.date] | None = None,
is_live: bool | None = None) -> tuple[str, QueryData]:
filter_clauses: list[str] = []
filter_data: QueryData = {}
if session_id is not None:
filter_clauses.append(f"id = :session_id")
filter_data["session_id"] = session_id
if date_range:
if ub := date_range.upper_bound():
operator = "<=" if ub[1] else "<"
filter_clauses.append(f"date {operator} :date_ub")
filter_data["date_ub"] = ub[0]
if lb := date_range.lower_bound():
operator = ">=" if lb[1] else ">"
filter_clauses.append(f"date {operator} :date_lb")
filter_data["date_lb"] = lb[0]
if is_live is not None:
filter_clauses.append(f"is_live = :is_live")
filter_data["is_live"] = is_live
if filter_clauses:
filter_clause_str = " AND ".join(filter_clauses)
filter_clause = f"WHERE {filter_clause_str}"
return filter_clause, filter_data
else:
return "", {}
def _order_clause(order_by: OLV[OrderCol[model.SessionCols]]) -> str:
if not order_by:
return ""
if not isinstance(order_by, list):
order_by = [order_by]
order_clauses = [f"{ocol.column.value} {ocol.order.value}" for ocol in order_by]
order_clauses_str = " ".join(order_clauses)
return f"ORDER BY {order_clauses_str}"
def get_sessions(session_id: int | None = None,
date_range: Range[datetime.date] | None = None,
is_live: bool | None = None,
order_by: OLV[OrderCol[model.SessionCols]] = None,
limit: int | None = None, offset: int | None = None,
con: Connection | None = None) -> list[model.Session]:
clauses: list[str] = []
filter_clause, data = _filter_clause(session_id=session_id, date_range=date_range, is_live=is_live)
if filter_clause:
clauses.append(filter_clause)
if order_clause := _order_clause(order_by=order_by):
clauses.append(order_clause)
if limit is not None:
clauses.append("LIMIT :limit")
data["limit"] = limit
if offset is not None:
clauses.append("OFFSET :offset")
data["offset"] = offset
clauses_str = " ".join(clauses)
query = f"""
SELECT id, date, start_time, end_time, venue_name, venue_url, is_live
FROM sessions
{clauses_str}
"""
with get_connection(con) as con:
cur = con.cursor()
_ = cur.execute(query, data)
return list(map(conversion.row_to_session, cur.fetchall()))

View File

@@ -0,0 +1,64 @@
from folkugat_web.dal.sql import Connection, get_connection
from folkugat_web.model import sessions as model
from . import conversion
def insert_session(session: model.Session, con: Connection | None = None):
query = """
INSERT INTO sessions
(id, date, start_time, end_time, venue_name, venue_url, is_live)
VALUES
(:id, :date, :start_time, :end_time, :venue_name, :venue_url, :is_live)
RETURNING *
"""
data = conversion.session_to_row(session)
with get_connection(con) as con:
cur = con.cursor()
_ = cur.execute(query, data)
row: conversion.SessionRowTuple = cur.fetchone()
return conversion.row_to_session(row)
def update_session(session: model.Session, con: Connection | None = None):
query = """
UPDATE sessions SET
date = :date, start_time = :start_time, end_time = :end_time,
venue_name = :venue_name, venue_url = :venue_url, is_live = :is_live
WHERE id = :id
"""
data = conversion.session_to_row(session)
with get_connection(con) as con:
cur = con.cursor()
_ = cur.execute(query, data)
def delete_session_by_id(session_id: int, con: Connection | None = None):
query = """
DELETE FROM sessions
WHERE id = :id
"""
data = dict(id=session_id)
with get_connection(con) as con:
cur = con.cursor()
_ = cur.execute(query, data)
def stop_live_sessions(con: Connection | None = None):
query = """
UPDATE sessions SET is_live = false WHERE is_live = true
"""
with get_connection(con) as con:
cur = con.cursor()
_ = cur.execute(query)
def set_live_session(session_id: int, con: Connection | None = None):
query = """
UPDATE sessions SET is_live = true WHERE id = :id
"""
data = dict(id=session_id)
with get_connection(con) as con:
stop_live_sessions(con=con)
cur = con.cursor()
_ = cur.execute(query, data)

View File

@@ -1,11 +1,28 @@
import datetime import datetime
import json import json
from typing import TypedDict
from folkugat_web.model import IndexedList from folkugat_web.model import IndexedList
from folkugat_web.model import search as search_model
from folkugat_web.model import temes as model from folkugat_web.model import temes as model
TemaRowTuple = tuple[int, str, str, str, str, str, str, str, str, int]
def tema_to_row(tema: model.Tema) -> dict:
class TemaRowDict(TypedDict):
id: int | None
title: str
properties: str
links: str
lyrics: str
alternatives: str
ngrams: str
modification_date: str
creation_date: str
hidden: int
def tema_to_row(tema: model.Tema) -> TemaRowDict:
return { return {
'id': tema.id, 'id': tema.id,
'title': tema.title, 'title': tema.title,
@@ -20,11 +37,11 @@ def tema_to_row(tema: model.Tema) -> dict:
} }
def cell_to_ngrams(cell: str) -> model.NGrams: def cell_to_ngrams(cell: str) -> search_model.NGrams:
return {int(n): ngrams_ for n, ngrams_ in json.loads(cell).items()} return {int(n): ngrams_ for n, ngrams_ in json.loads(cell).items()}
def row_to_tema(row: tuple) -> model.Tema: def row_to_tema(row: TemaRowTuple) -> model.Tema:
return model.Tema( return model.Tema(
id=row[0], id=row[0],
title=row[1], title=row[1],

View File

@@ -1,23 +1,15 @@
from typing import Optional
from folkugat_web import data
from folkugat_web.dal.sql import Connection, get_connection from folkugat_web.dal.sql import Connection, get_connection
from .write import insert_tema
def create_db(con: Connection | None = None):
def create_db(con: Optional[Connection] = None):
with get_connection(con) as con: with get_connection(con) as con:
create_temes_table(con) create_temes_table(con)
# for tema in data.TEMES:
# insert_tema(tema, con)
def drop_temes_table(con: Connection): def drop_temes_table(con: Connection):
query = "DROP TABLE IF EXISTS temes" query = "DROP TABLE IF EXISTS temes"
cur = con.cursor() cur = con.cursor()
cur.execute(query) _ = cur.execute(query)
def create_temes_table(con: Connection): def create_temes_table(con: Connection):
@@ -36,4 +28,4 @@ def create_temes_table(con: Connection):
) )
""" """
cur = con.cursor() cur = con.cursor()
cur.execute(query) _ = cur.execute(query)

View File

@@ -1,14 +1,13 @@
from typing import Optional
from folkugat_web.dal.sql import Connection, get_connection from folkugat_web.dal.sql import Connection, get_connection
from folkugat_web.model import search as search_model
from folkugat_web.model import temes as model from folkugat_web.model import temes as model
from ._conversion import cell_to_ngrams, row_to_tema from . import conversion
TEMA_ID_TO_NGRAMS_CACHE = None _tema_id_to_ngrams_cache: dict[int, search_model.NGrams] | None = None
def get_tema_by_id(tema_id: int, con: Optional[Connection] = None) -> Optional[model.Tema]: def get_tema_by_id(tema_id: int, con: Connection | None = None) -> model.Tema | None:
query = """ query = """
SELECT SELECT
id, title, properties, links, lyrics, alternatives, ngrams, id, title, properties, links, lyrics, alternatives, ngrams,
@@ -19,30 +18,30 @@ def get_tema_by_id(tema_id: int, con: Optional[Connection] = None) -> Optional[m
data = dict(id=tema_id) data = dict(id=tema_id)
with get_connection(con) as con: with get_connection(con) as con:
cur = con.cursor() cur = con.cursor()
cur.execute(query, data) _ = cur.execute(query, data)
row = cur.fetchone() row: conversion.TemaRowTuple = cur.fetchone()
return row_to_tema(row) if row else None return conversion.row_to_tema(row) if row else None
def evict_tema_id_to_ngrams_cache(): def evict_tema_id_to_ngrams_cache():
global TEMA_ID_TO_NGRAMS_CACHE global _tema_id_to_ngrams_cache
TEMA_ID_TO_NGRAMS_CACHE = None _tema_id_to_ngrams_cache = None
def get_tema_id_to_ngrams(con: Optional[Connection] = None) -> dict[int, model.NGrams]: def get_tema_id_to_ngrams(con: Connection | None = None) -> dict[int, search_model.NGrams]:
global TEMA_ID_TO_NGRAMS_CACHE global _tema_id_to_ngrams_cache
if TEMA_ID_TO_NGRAMS_CACHE is None: if _tema_id_to_ngrams_cache is None:
TEMA_ID_TO_NGRAMS_CACHE = _get_tema_id_to_ngrams(con) _tema_id_to_ngrams_cache = _get_tema_id_to_ngrams(con)
return TEMA_ID_TO_NGRAMS_CACHE return _tema_id_to_ngrams_cache
def _get_tema_id_to_ngrams(con: Optional[Connection] = None) -> dict[int, model.NGrams]: def _get_tema_id_to_ngrams(con: Connection | None = None) -> dict[int, search_model.NGrams]:
query = """ query = """
SELECT id, ngrams SELECT id, ngrams
FROM temes FROM temes
""" """
with get_connection(con) as con: with get_connection(con) as con:
cur = con.cursor() cur = con.cursor()
cur.execute(query) _ = cur.execute(query)
rows = cur.fetchall() rows: list[tuple[int, str]] = cur.fetchall()
return {id_: cell_to_ngrams(ng) for id_, ng in rows} return {id_: conversion.cell_to_ngrams(ng) for id_, ng in rows}

View File

@@ -1,13 +1,11 @@
from typing import Optional
from folkugat_web.dal.sql import Connection, get_connection from folkugat_web.dal.sql import Connection, get_connection
from folkugat_web.model import temes as model from folkugat_web.model import temes as model
from ._conversion import row_to_tema, tema_to_row from . import conversion
from .query import evict_tema_id_to_ngrams_cache from .query import evict_tema_id_to_ngrams_cache
def insert_tema(tema: model.Tema, con: Optional[Connection] = None) -> model.Tema: def insert_tema(tema: model.Tema, con: Connection | None = None) -> model.Tema:
query = """ query = """
INSERT INTO temes INSERT INTO temes
(id, title, properties, links, lyrics, alternatives, ngrams, (id, title, properties, links, lyrics, alternatives, ngrams,
@@ -17,16 +15,16 @@ def insert_tema(tema: model.Tema, con: Optional[Connection] = None) -> model.Tem
:creation_date, :modification_date, :hidden) :creation_date, :modification_date, :hidden)
RETURNING * RETURNING *
""" """
data = tema_to_row(tema) data = conversion.tema_to_row(tema)
with get_connection(con) as con: with get_connection(con) as con:
cur = con.cursor() cur = con.cursor()
cur.execute(query, data) _ = cur.execute(query, data)
row = cur.fetchone() row: conversion.TemaRowTuple = cur.fetchone()
evict_tema_id_to_ngrams_cache() evict_tema_id_to_ngrams_cache()
return row_to_tema(row) return conversion.row_to_tema(row)
def update_tema(tema: model.Tema, con: Optional[Connection] = None): def update_tema(tema: model.Tema, con: Connection | None = None):
query = """ query = """
UPDATE temes UPDATE temes
SET SET
@@ -36,15 +34,15 @@ def update_tema(tema: model.Tema, con: Optional[Connection] = None):
WHERE WHERE
id = :id id = :id
""" """
data = tema_to_row(tema) data = conversion.tema_to_row(tema)
with get_connection(con) as con: with get_connection(con) as con:
cur = con.cursor() cur = con.cursor()
cur.execute(query, data) _ = cur.execute(query, data)
evict_tema_id_to_ngrams_cache() evict_tema_id_to_ngrams_cache()
return return
def delete_tema(tema_id: int, con: Optional[Connection] = None): def delete_tema(tema_id: int, con: Connection | None = None):
query = """ query = """
DELETE FROM temes DELETE FROM temes
WHERE id = :id WHERE id = :id
@@ -52,6 +50,6 @@ def delete_tema(tema_id: int, con: Optional[Connection] = None):
data = dict(id=tema_id) data = dict(id=tema_id)
with get_connection(con) as con: with get_connection(con) as con:
cur = con.cursor() cur = con.cursor()
cur.execute(query, data) _ = cur.execute(query, data)
evict_tema_id_to_ngrams_cache() evict_tema_id_to_ngrams_cache()
return return

View File

@@ -1,8 +1,9 @@
from fastapi import Request
from folkugat_web.config import nota as config from folkugat_web.config import nota as config
from folkugat_web.templates import templates from folkugat_web.templates import templates
def input(request, value=None): def input(request: Request, value: str | None = None):
return templates.TemplateResponse( return templates.TemplateResponse(
"fragments/nota/input.html", "fragments/nota/input.html",
{ {
@@ -12,7 +13,7 @@ def input(request, value=None):
) )
def footer(request, value, logged_in): def footer(request: Request, value: str | None, logged_in: bool):
response = templates.TemplateResponse( response = templates.TemplateResponse(
"fragments/nota/footer.html", "fragments/nota/footer.html",
{ {
@@ -24,7 +25,7 @@ def footer(request, value, logged_in):
return response return response
def nota(request): def nota(request: Request):
return templates.TemplateResponse( return templates.TemplateResponse(
"fragments/nota/nota.html", "fragments/nota/nota.html",
{ {

View File

@@ -147,7 +147,7 @@ def busca_tema(
) )
def set_tema(request: Request, logged_in: bool, session_id: int, set_id: int, entry_id: int, tema_id: Optional[int]): def set_tema(request: Request, logged_in: bool, session_id: int, set_id: int, entry_id: int, tema_id: int | None):
playlists_service.set_tema(session_id=session_id, set_id=set_id, entry_id=entry_id, tema_id=tema_id) playlists_service.set_tema(session_id=session_id, set_id=set_id, entry_id=entry_id, tema_id=tema_id)
tema_entry = playlists_service.get_tema(entry_id=entry_id) tema_entry = playlists_service.get_tema(entry_id=entry_id)
playlists_service.add_tema_to_tema_in_set(tema_entry) playlists_service.add_tema_to_tema_in_set(tema_entry)

View File

@@ -59,7 +59,7 @@ def sessions_historial(request: Request, limit: int, logged_in: bool):
def _sessions_list(request: Request, sessions: list[model.Session], has_more_sessions: bool, def _sessions_list(request: Request, sessions: list[model.Session], has_more_sessions: bool,
paging_config: config.PagingConfig, list_id: str, get_sessions_url: str, paging_config: config.PagingConfig, list_id: str, get_sessions_url: str,
limit: int, logged_in: bool, next_session_id: Optional[int] = None): limit: int, logged_in: bool, next_session_id: int | None = None):
if has_more_sessions: if has_more_sessions:
more_sessions = limit + paging_config.step more_sessions = limit + paging_config.step
else: else:
@@ -87,7 +87,7 @@ def _sessions_list(request: Request, sessions: list[model.Session], has_more_ses
) )
def sessions_editor_row(request: Request, session_date: Optional[model.Session] = None, session_id: Optional[int] = None): def sessions_editor_row(request: Request, session_date: model.Session | None = None, session_id: int | None = None):
if session_date is None: if session_date is None:
if session_id is None: if session_id is None:
raise ValueError("Must either give session or session_id") raise ValueError("Must either give session or session_id")
@@ -115,7 +115,7 @@ def sessions_editor_row_editing(request, session_id: int):
) )
def sessions_editor_insert_row(request, session_date: Optional[model.Session] = None): def sessions_editor_insert_row(request, session_date: model.Session | None = None):
session_date = service.insert_session(session_date or service.new_session()) session_date = service.insert_session(session_date or service.new_session())
return sessions_editor_row(request, session_date=session_date) return sessions_editor_row(request, session_date=session_date)

View File

@@ -7,7 +7,7 @@ from folkugat_web.services.temes.links import guess_link_type
from folkugat_web.templates import templates from folkugat_web.templates import templates
def title(request: Request, logged_in: bool, tema: Optional[model.Tema] = None, tema_id: Optional[int] = None): def title(request: Request, logged_in: bool, tema: model.Tema | None = None, tema_id: int | None = None):
if tema is None: if tema is None:
if tema_id is None: if tema_id is None:
raise ValueError("Either 'tema' or 'tema_id' must be given!") raise ValueError("Either 'tema' or 'tema_id' must be given!")

View File

@@ -1 +1,3 @@
from ._base import IndexedList from ._base import IndexedList
__all__ = ["IndexedList"]

View File

@@ -1,16 +1,19 @@
import dataclasses import dataclasses
from typing import Optional, TypeVar from typing import TypeVar
from typing_extensions import override
@dataclasses.dataclass @dataclasses.dataclass
class WithId: class WithId:
id: Optional[int] id: int | None
T = TypeVar("T", bound=WithId) T = TypeVar("T", bound=WithId)
class IndexedList(list[T]): class IndexedList(list[T]):
@override
def append(self, _item: T) -> None: def append(self, _item: T) -> None:
if _item.id is None: if _item.id is None:
_item.id = max((i.id or 0 for i in self), default=0) + 1 _item.id = max((i.id or 0 for i in self), default=0) + 1

View File

@@ -1,6 +1,6 @@
import dataclasses import dataclasses
from collections.abc import Iterator from collections.abc import Iterator
from typing import Optional, Self from typing import Self
from folkugat_web.model.temes import Tema from folkugat_web.model.temes import Tema
from folkugat_web.utils import groupby from folkugat_web.utils import groupby
@@ -8,17 +8,17 @@ from folkugat_web.utils import groupby
@dataclasses.dataclass @dataclasses.dataclass
class PlaylistEntry: class PlaylistEntry:
id: Optional[int] id: int | None
session_id: int session_id: int
set_id: int set_id: int
tema_id: Optional[int] tema_id: int | None
@dataclasses.dataclass @dataclasses.dataclass
class TemaInSet: class TemaInSet:
id: Optional[int] id: int | None
tema_id: Optional[int] tema_id: int | None
tema: Optional[Tema] tema: Tema | None
def to_playlist_entry(self, session_id: int, set_id: int) -> PlaylistEntry: def to_playlist_entry(self, session_id: int, set_id: int) -> PlaylistEntry:
return PlaylistEntry( return PlaylistEntry(
@@ -38,7 +38,7 @@ class Set:
id: int id: int
temes: list[TemaInSet] temes: list[TemaInSet]
def to_playlist_entries(self, session_id) -> Iterator[PlaylistEntry]: def to_playlist_entries(self, session_id: int) -> Iterator[PlaylistEntry]:
for tema_in_set in self.temes: for tema_in_set in self.temes:
yield tema_in_set.to_playlist_entry( yield tema_in_set.to_playlist_entry(
session_id=session_id, session_id=session_id,

View File

@@ -1,4 +1,8 @@
import dataclasses import dataclasses
from collections.abc import Iterable
from typing import Self
NGrams = dict[int, list[str]]
@dataclasses.dataclass(order=True) @dataclasses.dataclass(order=True)
@@ -7,13 +11,14 @@ class SearchMatch:
ngram: str ngram: str
@classmethod @classmethod
def combine_matches(cls, matches): def combine_matches(cls, matches: Iterable[Self]) -> Self:
ngrams, distances = zip(*((match.ngram, match.distance) for match in matches)) ngrams, distances = zip(*((match.ngram, match.distance) for match in matches))
return cls( return cls(
ngram=', '.join(ngrams), ngram=', '.join(ngrams),
distance=sum(distances)/len(distances) distance=sum(distances)/len(distances)
) )
@dataclasses.dataclass @dataclasses.dataclass
class QueryResult: class QueryResult:
id: int id: int

View File

@@ -1,7 +1,6 @@
import dataclasses import dataclasses
import datetime import datetime
import enum import enum
from typing import Optional
DEFAULT_START_TIME = datetime.time(20, 30) DEFAULT_START_TIME = datetime.time(20, 30)
DEFAULT_END_TIME = datetime.time(22, 30) DEFAULT_END_TIME = datetime.time(22, 30)
@@ -9,13 +8,13 @@ DEFAULT_END_TIME = datetime.time(22, 30)
@dataclasses.dataclass @dataclasses.dataclass
class SessionVenue: class SessionVenue:
name: Optional[str] = None name: str | None = None
url: Optional[str] = None url: str | None = None
@dataclasses.dataclass @dataclasses.dataclass
class Session: class Session:
id: Optional[int] = None id: int | None = None
date: datetime.date = dataclasses.field(default_factory=datetime.date.today) date: datetime.date = dataclasses.field(default_factory=datetime.date.today)
start_time: datetime.time = DEFAULT_START_TIME start_time: datetime.time = DEFAULT_START_TIME
end_time: datetime.time = DEFAULT_END_TIME end_time: datetime.time = DEFAULT_END_TIME

View File

@@ -1,10 +1,9 @@
from __future__ import annotations from __future__ import annotations
import dataclasses import dataclasses
import datetime
import enum import enum
from abc import abstractmethod from abc import abstractmethod
from typing import Generic, Optional, Protocol, TypeVar from typing import Generic, Protocol, TypeVar
class Comparable(Protocol): class Comparable(Protocol):
@@ -30,12 +29,12 @@ T = TypeVar("T", bound=Comparable)
@dataclasses.dataclass @dataclasses.dataclass
class Range(Generic[T]): class Range(Generic[T]):
gt: Optional[T] = None gt: T | None = None
gte: Optional[T] = None gte: T | None = None
lt: Optional[T] = None lt: T | None = None
lte: Optional[T] = None lte: T | None = None
def lower_bound(self) -> Optional[tuple[T, bool]]: def lower_bound(self) -> tuple[T, bool] | None:
if self.gt is None and self.gte is None: if self.gt is None and self.gte is None:
return None return None
elif self.gt is not None and self.gte is not None: elif self.gt is not None and self.gte is not None:
@@ -47,7 +46,7 @@ class Range(Generic[T]):
elif self.gte is not None: elif self.gte is not None:
return self.gte, True return self.gte, True
def upper_bound(self) -> Optional[tuple[T, bool]]: def upper_bound(self) -> tuple[T, bool] | None:
if self.lt is None and self.lte is None: if self.lt is None and self.lte is None:
return None return None
elif self.lt is not None and self.lte is not None: elif self.lt is not None and self.lte is not None:

View File

@@ -1,14 +1,14 @@
import dataclasses import dataclasses
import datetime import datetime
import enum import enum
from typing import Optional from typing import Self
from folkugat_web.model.search import NGrams
from folkugat_web.model.sessions import Session
from folkugat_web.services import ngrams from folkugat_web.services import ngrams
from ._base import IndexedList, WithId from ._base import IndexedList, WithId
NGrams = dict[int, list[str]]
class ContentType(enum.Enum): class ContentType(enum.Enum):
PARTITURA = "partitura" PARTITURA = "partitura"
@@ -28,7 +28,7 @@ class LinkType(enum.Enum):
@dataclasses.dataclass @dataclasses.dataclass
class Link(WithId): class Link(WithId):
content_type: ContentType content_type: ContentType
link_type: Optional[LinkType] link_type: LinkType | None
url: str url: str
title: str = "" title: str = ""
@@ -93,7 +93,7 @@ class Lyrics(WithId):
) )
@classmethod @classmethod
def from_dict(cls, d): def from_dict(cls, d) -> Self:
return cls( return cls(
id=d["id"], id=d["id"],
title=d["title"], title=d["title"],
@@ -103,7 +103,7 @@ class Lyrics(WithId):
@dataclasses.dataclass @dataclasses.dataclass
class Tema: class Tema:
id: Optional[int] = None id: int | None = None
# Info # Info
title: str = "" title: str = ""
properties: IndexedList[Property] = dataclasses.field(default_factory=IndexedList) properties: IndexedList[Property] = dataclasses.field(default_factory=IndexedList)
@@ -116,6 +116,8 @@ class Tema:
# Other info # Other info
modification_date: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) modification_date: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now)
creation_date: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now) creation_date: datetime.datetime = dataclasses.field(default_factory=datetime.datetime.now)
# Stats
played: list[Session] = dataclasses.field(default_factory=list)
def compute_ngrams(self): def compute_ngrams(self):
self.ngrams = ngrams.get_text_ngrams(self.title, *self.alternatives) self.ngrams = ngrams.get_text_ngrams(self.title, *self.alternatives)
@@ -124,5 +126,5 @@ class Tema:
self.compute_ngrams() self.compute_ngrams()
return self return self
def score(self) -> Optional[Link]: def score(self) -> Link | None:
return next(filter(lambda l: l.content_type is ContentType.PARTITURA and l.url.startswith('/'), self.links), None) return next(filter(lambda l: l.content_type is ContentType.PARTITURA and l.url.startswith('/'), self.links), None)

View File

@@ -7,13 +7,13 @@ from folkugat_web.config import auth as config
from folkugat_web.log import logger from folkugat_web.log import logger
def login(value): def login(value: str | None) -> bool:
if value and value.lower() == config.ADMIN_PASSWORD: if value and value.lower() == config.ADMIN_PASSWORD:
return True return True
return False return False
def logged_in(nota_folkugat: Annotated[Optional[str], Cookie()] = None) -> bool: def logged_in(nota_folkugat: Annotated[str | None, Cookie()] = None) -> bool:
if not nota_folkugat: if not nota_folkugat:
return False return False
try: try:

View File

@@ -1,19 +1,21 @@
import operator
from folkugat_web.config import search as config from folkugat_web.config import search as config
from folkugat_web.model.search import NGrams
from folkugat_web.utils import groupby from folkugat_web.utils import groupby
def get_all_ngrams(text): def get_all_ngrams(text: str) -> list[tuple[int, str]]:
return [(m, text[i:i+m]) return [(m, text[i:i+m])
for m in range(config.MIN_NGRAM_LENGTH, len(text) + 1) for m in range(config.MIN_NGRAM_LENGTH, len(text) + 1)
for i in range(len(text) - m + 1) for i in range(len(text) - m + 1)
if m > 0] if m > 0]
def get_text_ngrams(*texts):
texts = [word.lower() for text in texts for word in text.split()] def get_text_ngrams(*texts: str) -> NGrams:
word_ngrams = [ngram for ngrams in map(get_all_ngrams, texts) for ngram in ngrams] lower_texts = [word.lower() for text in texts for word in text.split()]
result = dict(groupby(word_ngrams, word_ngrams = [ngram for ngrams in map(get_all_ngrams, lower_texts) for ngram in ngrams]
key_fn=operator.itemgetter(0), result = dict(groupby(
group_fn=lambda gr: list(set(map(operator.itemgetter(1), gr))))) word_ngrams,
key_fn=lambda x: x[0],
group_fn=lambda gr: list(set(map(lambda x: x[1], gr))),
))
return result return result

View File

@@ -25,14 +25,14 @@ def add_tema_to_tema_in_set(tema_in_set: playlists.TemaInSet) -> playlists.TemaI
return tema_in_set return tema_in_set
def get_playlist(session_id: int, con: Optional[Connection] = None) -> playlists.Playlist: def get_playlist(session_id: int, con: Connection | None = None) -> playlists.Playlist:
return playlists.Playlist.from_playlist_entries( return playlists.Playlist.from_playlist_entries(
session_id=session_id, session_id=session_id,
entries=list(query.get_playlist_entries(session_id=session_id, con=con)) entries=list(query.get_playlist_entries(session_id=session_id, con=con))
) )
def add_set(session_id: int, con: Optional[Connection] = None) -> playlists.Set: def add_set(session_id: int, con: Connection | None = None) -> playlists.Set:
with get_connection(con) as con: with get_connection(con) as con:
curr_playlist = get_playlist(session_id=session_id, con=con) curr_playlist = get_playlist(session_id=session_id, con=con)
new_set_id = max([set_entry.id for set_entry in curr_playlist.sets], default=0) + 1 new_set_id = max([set_entry.id for set_entry in curr_playlist.sets], default=0) + 1
@@ -41,7 +41,7 @@ def add_set(session_id: int, con: Optional[Connection] = None) -> playlists.Set:
return playlists.Set.from_playlist_entries(set_id=inserted_entry.set_id, entries=[inserted_entry]) return playlists.Set.from_playlist_entries(set_id=inserted_entry.set_id, entries=[inserted_entry])
def get_set(session_id: int, set_id: int, con: Optional[Connection] = None) -> Optional[playlists.Set]: def get_set(session_id: int, set_id: int, con: Connection | None = None) -> playlists.Set | None:
entries = list(query.get_playlist_entries(session_id=session_id, set_id=set_id, con=con)) entries = list(query.get_playlist_entries(session_id=session_id, set_id=set_id, con=con))
if entries: if entries:
return playlists.Set.from_playlist_entries(set_id=set_id, entries=entries) return playlists.Set.from_playlist_entries(set_id=set_id, entries=entries)
@@ -49,30 +49,30 @@ def get_set(session_id: int, set_id: int, con: Optional[Connection] = None) -> O
return None return None
def delete_set(session_id: int, set_id: int, con: Optional[Connection] = None): def delete_set(session_id: int, set_id: int, con: Connection | None = None):
write.delete_playlist_set(session_id=session_id, set_id=set_id, con=con) write.delete_playlist_set(session_id=session_id, set_id=set_id, con=con)
def add_tema(session_id: int, set_id: int, con: Optional[Connection] = None) -> playlists.TemaInSet: def add_tema(session_id: int, set_id: int, con: Connection | None = None) -> playlists.TemaInSet:
with get_connection(con) as con: with get_connection(con) as con:
new_entry = playlists.PlaylistEntry(id=None, session_id=session_id, set_id=set_id, tema_id=None) new_entry = playlists.PlaylistEntry(id=None, session_id=session_id, set_id=set_id, tema_id=None)
inserted_entry = write.insert_playlist_entry(new_entry) inserted_entry = write.insert_playlist_entry(new_entry)
return playlists.TemaInSet.from_playlist_entry(inserted_entry) return playlists.TemaInSet.from_playlist_entry(inserted_entry)
def get_tema(entry_id: int, con: Optional[Connection] = None) -> playlists.TemaInSet: def get_tema(entry_id: int, con: Connection | None = None) -> playlists.TemaInSet:
with get_connection(con) as con: with get_connection(con) as con:
entry = next(query.get_playlist_entries(entry_id=entry_id)) entry = next(query.get_playlist_entries(entry_id=entry_id))
return playlists.TemaInSet.from_playlist_entry(entry) return playlists.TemaInSet.from_playlist_entry(entry)
def delete_tema(entry_id: int, con: Optional[Connection] = None): def delete_tema(entry_id: int, con: Connection | None = None):
with get_connection(con) as con: with get_connection(con) as con:
write.delete_playlist_entry(entry_id=entry_id, con=con) write.delete_playlist_entry(entry_id=entry_id, con=con)
def set_tema(session_id: int, set_id: int, entry_id: int, tema_id: Optional[int], def set_tema(session_id: int, set_id: int, entry_id: int, tema_id: int | None,
con: Optional[Connection] = None): con: Connection | None = None):
with get_connection(con) as con: with get_connection(con) as con:
new_entry = playlists.PlaylistEntry(id=entry_id, session_id=session_id, set_id=set_id, tema_id=tema_id) new_entry = playlists.PlaylistEntry(id=entry_id, session_id=session_id, set_id=set_id, tema_id=tema_id)
write.update_playlist_entry(entry=new_entry, con=con) write.update_playlist_entry(entry=new_entry, con=con)

View File

@@ -1,9 +1,8 @@
import datetime import datetime
from datetime import date as Date from datetime import date as Date
from typing import Optional
from folkugat_web.config import date as config from folkugat_web.config import date as config
from folkugat_web.dal.sql import sessions as dal from folkugat_web.dal.sql.sessions import query, write
from folkugat_web.model import sessions as model from folkugat_web.model import sessions as model
from folkugat_web.model.sql import Order, OrderCol, Range from folkugat_web.model.sql import Order, OrderCol, Range
@@ -27,11 +26,11 @@ def new_session() -> model.Session:
def get_sessions() -> list[model.Session]: def get_sessions() -> list[model.Session]:
return dal.get_sessions(order_by=OrderCol(model.SessionCols.DATE, Order.ASCENDING)) return query.get_sessions(order_by=OrderCol(model.SessionCols.DATE, Order.ASCENDING))
def get_next_sessions(limit: Optional[int] = None, offset: Optional[int] = None) -> list[model.Session]: def get_next_sessions(limit: int | None = None, offset: int | None = None) -> list[model.Session]:
return dal.get_sessions( return query.get_sessions(
date_range=Range(gte=datetime.date.today()), date_range=Range(gte=datetime.date.today()),
order_by=OrderCol(model.SessionCols.DATE, Order.ASCENDING), order_by=OrderCol(model.SessionCols.DATE, Order.ASCENDING),
limit=limit, limit=limit,
@@ -39,8 +38,8 @@ def get_next_sessions(limit: Optional[int] = None, offset: Optional[int] = None)
) )
def get_sessions_history(limit: Optional[int] = None, offset: Optional[int] = None) -> list[model.Session]: def get_sessions_history(limit: int | None = None, offset: int | None = None) -> list[model.Session]:
return dal.get_sessions( return query.get_sessions(
date_range=Range(lt=datetime.date.today()), date_range=Range(lt=datetime.date.today()),
order_by=OrderCol(model.SessionCols.DATE, Order.DESCENDING), order_by=OrderCol(model.SessionCols.DATE, Order.DESCENDING),
limit=limit, limit=limit,
@@ -48,33 +47,33 @@ def get_sessions_history(limit: Optional[int] = None, offset: Optional[int] = No
) )
def get_next_session() -> Optional[model.Session]: def get_next_session() -> model.Session | None:
return next(iter(get_next_sessions(limit=1,)), None) return next(iter(get_next_sessions(limit=1,)), None)
def get_live_session() -> Optional[model.Session]: def get_live_session() -> model.Session | None:
return next(iter(dal.get_sessions(is_live=True)), None) return next(iter(query.get_sessions(is_live=True)), None)
def get_session(session_id: int) -> Optional[model.Session]: def get_session(session_id: int) -> model.Session | None:
return next(iter(dal.get_sessions(session_id=session_id)), None) return next(iter(query.get_sessions(session_id=session_id)), None)
def insert_session(session: model.Session): def insert_session(session: model.Session):
return dal.insert_session(session) return write.insert_session(session)
def set_session(session: model.Session): def set_session(session: model.Session):
dal.update_session(session) write.update_session(session)
def delete_session(session_id: int): def delete_session(session_id: int):
dal.delete_session_by_id(session_id) write.delete_session_by_id(session_id)
def stop_live_sessions(): def stop_live_sessions():
dal.stop_live_sessions() write.stop_live_sessions()
def set_live_session(session_id: int): def set_live_session(session_id: int):
dal.set_live_session(session_id=session_id) write.set_live_session(session_id=session_id)

View File

@@ -24,7 +24,7 @@ LINK_RES = {
} }
def guess_link_type(url: str) -> Optional[model.LinkType]: def guess_link_type(url: str) -> model.LinkType | None:
for link_type, regexes in LINK_RES.items(): for link_type, regexes in LINK_RES.items():
for regex in regexes: for regex in regexes:
if regex.match(url): if regex.match(url):

View File

@@ -4,5 +4,5 @@ from folkugat_web.dal.sql.temes import query as temes_q
from folkugat_web.model import temes as model from folkugat_web.model import temes as model
def get_tema_by_id(tema_id: int) -> Optional[model.Tema]: def get_tema_by_id(tema_id: int) -> model.Tema | None:
return temes_q.get_tema_by_id(tema_id) return temes_q.get_tema_by_id(tema_id)

View File

@@ -1,7 +1,7 @@
import functools import functools
import time import time
import typing import typing
from collections.abc import Callable, Iterable, Iterator from collections.abc import Callable, Iterable
import Levenshtein import Levenshtein
from folkugat_web.config import search as config from folkugat_web.config import search as config
@@ -12,7 +12,7 @@ from folkugat_web.model import search as search_model
from folkugat_web.model import temes as model from folkugat_web.model import temes as model
def get_query_word_similarity(query_word: str, text_ngrams: model.NGrams) -> search_model.SearchMatch: def get_query_word_similarity(query_word: str, text_ngrams: search_model.NGrams) -> search_model.SearchMatch:
n = len(query_word) n = len(query_word)
if n < config.MIN_NGRAM_LENGTH: if n < config.MIN_NGRAM_LENGTH:
return search_model.SearchMatch(distance=0.0, ngram='') return search_model.SearchMatch(distance=0.0, ngram='')
@@ -27,13 +27,13 @@ def get_query_word_similarity(query_word: str, text_ngrams: model.NGrams) -> sea
default=search_model.SearchMatch(distance=float("inf"), ngram="")) default=search_model.SearchMatch(distance=float("inf"), ngram=""))
def get_query_similarity(query: str, ngrams: model.NGrams) -> search_model.SearchMatch: def get_query_similarity(query: str, ngrams: search_model.NGrams) -> search_model.SearchMatch:
query_words = query.lower().split() query_words = query.lower().split()
word_matches = map(lambda query_word: get_query_word_similarity(query_word, ngrams), query_words) word_matches = map(lambda query_word: get_query_word_similarity(query_word, ngrams), query_words)
return search_model.SearchMatch.combine_matches(word_matches) return search_model.SearchMatch.combine_matches(word_matches)
def build_result(query: str, entry: tuple[int, model.NGrams]) -> search_model.QueryResult: def build_result(query: str, entry: tuple[int, search_model.NGrams]) -> search_model.QueryResult:
if len(query) == 0: if len(query) == 0:
return search_model.QueryResult( return search_model.QueryResult(
id=entry[0], id=entry[0],
@@ -51,6 +51,7 @@ def build_result(query: str, entry: tuple[int, model.NGrams]) -> search_model.Qu
T = typing.TypeVar("T") T = typing.TypeVar("T")
@typing.no_type_check
def _thread(it: Iterable[T], *funcs: Callable[[Iterable], Iterable]) -> Iterable: def _thread(it: Iterable[T], *funcs: Callable[[Iterable], Iterable]) -> Iterable:
return functools.reduce(lambda i, fn: fn(i), funcs, it) return functools.reduce(lambda i, fn: fn(i), funcs, it)

View File

@@ -1,6 +1,22 @@
import itertools import itertools
from collections.abc import Callable, Iterable, Iterator
from typing import Protocol, Self, TypeVar
def groupby(it, key_fn=lambda x: x, group_fn=lambda x: x): class SupportsLessThan(Protocol):
def __lt__(self, other: Self) -> bool:
raise NotImplementedError()
T = TypeVar("T")
KeyT = TypeVar("KeyT", bound=SupportsLessThan)
GroupT = TypeVar("GroupT")
def groupby(
it: Iterable[T],
key_fn: Callable[[T], KeyT],
group_fn: Callable[[Iterable[T]], GroupT] = lambda x: x,
) -> Iterator[tuple[KeyT, GroupT]]:
for k, g in itertools.groupby(sorted(it, key=key_fn), key=key_fn): for k, g in itertools.groupby(sorted(it, key=key_fn), key=key_fn):
yield k, group_fn(g) yield k, group_fn(g)