195 lines
5.9 KiB
Python
195 lines
5.9 KiB
Python
import datetime
|
|
from typing import Optional
|
|
|
|
from folkugat_web.dal.sql import Connection, get_connection
|
|
from folkugat_web.model import sessions as model
|
|
from folkugat_web.model.sql import OrderCol, Range
|
|
from folkugat_web.typing import OptionalListOrValue as OLV
|
|
|
|
|
|
def create_db(con: Optional[Connection] = None):
|
|
with get_connection(con) as con:
|
|
create_sessions_table(con)
|
|
|
|
|
|
def drop_sessions_table(con: Connection):
|
|
query = "DROP TABLE IF EXISTS sessions"
|
|
cur = con.cursor()
|
|
cur.execute(query)
|
|
|
|
|
|
def create_sessions_table(con: Connection):
|
|
query = """
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
id INTEGER PRIMARY KEY,
|
|
date TEXT NOT NULL,
|
|
start_time TEXT NOT NULL,
|
|
end_time TEXT NOT NULL,
|
|
venue_name TEXT,
|
|
venue_url TEXT,
|
|
is_live BOOLEAN DEFAULT false
|
|
)
|
|
"""
|
|
cur = con.cursor()
|
|
cur.execute(query)
|
|
|
|
|
|
def _session_to_row(sessio: model.Session) -> dict:
|
|
return {
|
|
'id': sessio.id,
|
|
'date': sessio.date,
|
|
'start_time': sessio.start_time.isoformat(),
|
|
'end_time': sessio.end_time.isoformat(),
|
|
'venue_name': sessio.venue.name,
|
|
'venue_url': sessio.venue.url,
|
|
'is_live': sessio.is_live,
|
|
}
|
|
|
|
|
|
def _row_to_session(row: tuple) -> model.Session:
|
|
return model.Session(
|
|
id=row[0],
|
|
date=datetime.date.fromisoformat(row[1]),
|
|
start_time=datetime.time.fromisoformat(row[2]),
|
|
end_time=datetime.time.fromisoformat(row[3]),
|
|
venue=model.SessionVenue(
|
|
name=row[4],
|
|
url=row[5],
|
|
),
|
|
is_live=row[6],
|
|
)
|
|
|
|
|
|
def insert_session(session: model.Session, con: Optional[Connection] = None):
|
|
query = """
|
|
INSERT INTO sessions
|
|
(id, date, start_time, end_time, venue_name, venue_url, is_live)
|
|
VALUES
|
|
(:id, :date, :start_time, :end_time, :venue_name, :venue_url, :is_live)
|
|
RETURNING *
|
|
"""
|
|
data = _session_to_row(session)
|
|
with get_connection(con) as con:
|
|
cur = con.cursor()
|
|
cur.execute(query, data)
|
|
row = cur.fetchone()
|
|
return _row_to_session(row)
|
|
|
|
|
|
def update_session(session: model.Session, con: Optional[Connection] = None):
|
|
query = """
|
|
UPDATE sessions SET
|
|
date = :date, start_time = :start_time, end_time = :end_time,
|
|
venue_name = :venue_name, venue_url = :venue_url, is_live = :is_live
|
|
WHERE id = :id
|
|
"""
|
|
data = _session_to_row(session)
|
|
with get_connection(con) as con:
|
|
cur = con.cursor()
|
|
cur.execute(query, data)
|
|
|
|
|
|
def _filter_clause(session_id: Optional[int] = None,
|
|
date_range: Optional[Range[datetime.date]] = None,
|
|
is_live: Optional[bool] = None) -> tuple[str, dict]:
|
|
filter_clauses = []
|
|
filter_data = {}
|
|
|
|
if session_id is not None:
|
|
filter_clauses.append(f"id = :session_id")
|
|
filter_data["session_id"] = session_id
|
|
|
|
if date_range:
|
|
if ub := date_range.upper_bound():
|
|
operator = "<=" if ub[1] else "<"
|
|
filter_clauses.append(f"date {operator} :date_ub")
|
|
filter_data["date_ub"] = ub[0]
|
|
if lb := date_range.lower_bound():
|
|
operator = ">=" if lb[1] else ">"
|
|
filter_clauses.append(f"date {operator} :date_lb")
|
|
filter_data["date_lb"] = lb[0]
|
|
|
|
if is_live is not None:
|
|
filter_clauses.append(f"is_live = :is_live")
|
|
filter_data["is_live"] = is_live
|
|
|
|
if filter_clauses:
|
|
filter_clause_str = " AND ".join(filter_clauses)
|
|
filter_clause = f"WHERE {filter_clause_str}"
|
|
return filter_clause, filter_data
|
|
else:
|
|
return "", {}
|
|
|
|
|
|
def _order_clause(order_by: OLV[OrderCol[model.SessionCols]]) -> str:
|
|
if not order_by:
|
|
return ""
|
|
if not isinstance(order_by, list):
|
|
order_by = [order_by]
|
|
|
|
order_clauses = [f"{ocol.column.value} {ocol.order.value}" for ocol in order_by]
|
|
order_clauses_str = " ".join(order_clauses)
|
|
return f"ORDER BY {order_clauses_str}"
|
|
|
|
|
|
def get_sessions(session_id: Optional[int] = None,
|
|
date_range: Optional[Range[datetime.date]] = None,
|
|
is_live: Optional[bool] = None,
|
|
order_by: OLV[OrderCol[model.SessionCols]] = None,
|
|
limit: Optional[int] = None, offset: Optional[int] = None,
|
|
con: Optional[Connection] = None) -> list[model.Session]:
|
|
clauses = []
|
|
filter_clause, data = _filter_clause(session_id=session_id, date_range=date_range, is_live=is_live)
|
|
if filter_clause:
|
|
clauses.append(filter_clause)
|
|
if order_clause := _order_clause(order_by=order_by):
|
|
clauses.append(order_clause)
|
|
if limit is not None:
|
|
clauses.append("LIMIT :limit")
|
|
data["limit"] = limit
|
|
if offset is not None:
|
|
clauses.append("OFFSET :offset")
|
|
data["offset"] = offset
|
|
|
|
clauses_str = " ".join(clauses)
|
|
query = f"""
|
|
SELECT id, date, start_time, end_time, venue_name, venue_url, is_live
|
|
FROM sessions
|
|
{clauses_str}
|
|
"""
|
|
with get_connection(con) as con:
|
|
cur = con.cursor()
|
|
cur.execute(query, data)
|
|
return list(map(_row_to_session, cur.fetchall()))
|
|
|
|
|
|
def delete_session_by_id(session_id: int, con: Optional[Connection] = None):
|
|
query = """
|
|
DELETE FROM sessions
|
|
WHERE id = :id
|
|
"""
|
|
data = dict(id=session_id)
|
|
with get_connection(con) as con:
|
|
cur = con.cursor()
|
|
cur.execute(query, data)
|
|
|
|
|
|
def stop_live_sessions(con: Optional[Connection] = None):
|
|
query = """
|
|
UPDATE sessions SET is_live = false WHERE is_live = true
|
|
"""
|
|
with get_connection(con) as con:
|
|
cur = con.cursor()
|
|
cur.execute(query)
|
|
|
|
|
|
def set_live_session(session_id: int, con: Optional[Connection] = None):
|
|
query = """
|
|
UPDATE sessions SET is_live = true WHERE id = :id
|
|
"""
|
|
data = dict(id=session_id)
|
|
with get_connection(con) as con:
|
|
stop_live_sessions(con=con)
|
|
cur = con.cursor()
|
|
cur.execute(query, data)
|