Files
folkugat-web/folkugat_web/services/playlists.py
2025-12-21 17:09:59 +01:00

236 lines
9.7 KiB
Python

import dataclasses
from folkugat_web.dal.sql import Connection
from folkugat_web.dal.sql._connection import get_connection
from folkugat_web.dal.sql.playlists import query, write
from folkugat_web.log import logger
from folkugat_web.model import playlists
from folkugat_web.model.lilypond import score as lilypond_model
from folkugat_web.services import files as files_service
from folkugat_web.services.lilypond import build as lilypond_build
from folkugat_web.services.lilypond import render as lilypond_render
from folkugat_web.services.lilypond import source as lilypond_source
from folkugat_web.services.temes import links as links_service
from folkugat_web.services.temes import lyrics as lyrics_service
from folkugat_web.services.temes import query as temes_query
from folkugat_web.services.temes import scores as scores_service
def add_temes_to_playlist(playlist: playlists.Playlist) -> playlists.Playlist:
for set_ in playlist.sets:
_ = add_temes_to_set(set_)
return playlist
def add_temes_to_set(set_: playlists.Set) -> playlists.Set:
for tema_in_set in set_.temes:
_ = add_tema_to_tema_in_set(tema_in_set)
return set_
def add_tema_to_tema_in_set(tema_in_set: playlists.TemaInSet) -> playlists.TemaInSet:
if tema_in_set.tema_id is not None:
tema_in_set.tema = temes_query.get_tema_by_id(tema_in_set.tema_id)
if not tema_in_set.tema:
logger.error("fCould not load tune in set: {tema_in_set}")
else:
_ = links_service.add_links_to_tema(tema_in_set.tema)
_ = lyrics_service.add_lyrics_to_tema(tema_in_set.tema)
_ = scores_service.add_scores_to_tema(tema_in_set.tema)
return tema_in_set
def get_playlist(playlist_id: int, con: Connection | None = None) -> playlists.Playlist:
with get_connection(con) as playlist_con:
playlist_name = query.get_playlist_name(playlist_id=playlist_id, con=playlist_con)
return playlists.Playlist.from_playlist_entries(
playlist_id=playlist_id,
name=playlist_name,
entries=list(query.get_playlist_entries(playlist_id=playlist_id, con=playlist_con))
)
def update_name(playlist_id: int, name: str | None) -> playlists.Playlist:
write.update_playlist_name(playlist_id=playlist_id, name=name)
return get_playlist(playlist_id=playlist_id)
def add_set(playlist_id: int, con: Connection | None = None) -> playlists.Set:
with get_connection(con) as con:
curr_playlist = get_playlist(playlist_id=playlist_id, con=con)
new_set_id = max([set_entry.id for set_entry in curr_playlist.sets], default=0) + 1
new_entry = playlists.PlaylistEntry(id=None, playlist_id=playlist_id, set_id=new_set_id, tema_id=None)
inserted_entry = write.insert_playlist_entry(new_entry)
return playlists.Set.from_playlist_entries(set_id=inserted_entry.set_id, entries=[inserted_entry])
def get_set(playlist_id: int, set_id: int, con: Connection | None = None) -> playlists.Set | None:
entries = list(query.get_playlist_entries(playlist_id=playlist_id, set_id=set_id, con=con))
if entries:
return playlists.Set.from_playlist_entries(set_id=set_id, entries=entries)
else:
return None
def delete_set(playlist_id: int, set_id: int, con: Connection | None = None):
write.delete_playlist_set(playlist_id=playlist_id, set_id=set_id, con=con)
def add_tema(playlist_id: int, set_id: int, con: Connection | None = None) -> playlists.TemaInSet:
with get_connection(con) as con:
new_entry = playlists.PlaylistEntry(id=None, playlist_id=playlist_id, set_id=set_id, tema_id=None)
inserted_entry = write.insert_playlist_entry(new_entry)
return playlists.TemaInSet.from_playlist_entry(inserted_entry)
def get_tema(entry_id: int, con: Connection | None = None) -> playlists.TemaInSet:
with get_connection(con) as con:
entry = next(query.get_playlist_entries(entry_id=entry_id))
return playlists.TemaInSet.from_playlist_entry(entry)
def delete_tema(entry_id: int, con: Connection | None = None):
with get_connection(con) as con:
write.delete_playlist_entry(entry_id=entry_id, con=con)
def set_tema(playlist_id: int, set_id: int, entry_id: int, tema_id: int | None,
con: Connection | None = None):
with get_connection(con) as con:
new_entry = playlists.PlaylistEntry(id=entry_id, playlist_id=playlist_id, set_id=set_id, tema_id=tema_id)
write.update_playlist_entry(entry=new_entry, con=con)
def _elegible_for_set_score(tune: lilypond_model.LilypondTune) -> bool:
# A tune will be eligible for a set score if it has a score, lyrics or is unknown
return tune.is_unknown or bool(tune.score_source) or bool(tune.lyrics)
async def get_or_create_set_score(tune_set: playlists.Set) -> playlists.SetScore | None:
# The tune_set is assumed to be enriched with tunes
if not tune_set.temes:
return None
if len(tune_set.temes) == 1:
if (tema := tune_set.temes[0].tema) is None or (main_score := tema.main_score()) is None:
return None
return playlists.SetScore(
pdf_url=main_score.pdf_url,
img_url=main_score.img_url,
)
lilypond_set = lilypond_build.set_from_set(set_entry=tune_set)
if not lilypond_set.tunes or not all(map(_elegible_for_set_score, lilypond_set.tunes)):
return None
set_score_hash = lilypond_set.hash().hex()
pdf_filepath = files_service.get_set_filename(f"{set_score_hash}.pdf")
png_filepath = files_service.get_set_filename(f"{set_score_hash}.cropped.png")
if not pdf_filepath.exists() or not png_filepath.exists():
# No score exists, so we need to create it
set_source = lilypond_source.set_source(tune_set=lilypond_set)
out_filepath = files_service.get_set_filename(set_score_hash)
async with files_service.tmp_file(content=set_source) as source_filepath:
if not pdf_filepath.exists():
pdf_result = await lilypond_render.render_file(
input_file=source_filepath,
output=lilypond_render.RenderOutput.PDF,
output_file=out_filepath,
)
if pdf_result.error is not None:
return None
if pdf_result.result is None:
raise RuntimeError("This shouldn't happen")
pdf_filepath = pdf_result.result
if not png_filepath.exists():
png_result = await lilypond_render.render_file(
input_file=source_filepath,
output=lilypond_render.RenderOutput.PNG_CROPPED,
output_file=out_filepath,
)
if png_result.error is not None:
return None
if png_result.result is None:
raise RuntimeError("This shouldn't happen")
png_filepath = png_result.result
return playlists.SetScore(
img_url=files_service.get_db_file_path(png_filepath),
pdf_url=files_service.get_db_file_path(pdf_filepath),
)
async def add_set_score_to_set(tune_set: playlists.Set) -> playlists.Set:
if score := await get_or_create_set_score(tune_set=tune_set):
return dataclasses.replace(
tune_set,
score=score,
)
else:
return tune_set
async def get_or_create_playlist_score(playlist: playlists.Playlist) -> playlists.PlaylistScore | None:
# The playlist is assumed to be enriched with tunes
if not playlist.sets:
return None
lilypond_playlist = lilypond_build.playlist_from_playlist(playlist)
if not lilypond_playlist.sets:
return None
playlist_score_hash = lilypond_playlist.hash().hex()
pdf_filepath = files_service.get_playlist_filename(f"{playlist_score_hash}.pdf")
if not pdf_filepath.exists():
# No score exists, so we need to create it
playlist_source = lilypond_source.playlist_source(playlist=lilypond_playlist)
out_filepath = files_service.get_playlist_filename(playlist_score_hash)
async with files_service.tmp_file(content=playlist_source) as source_filepath:
if not pdf_filepath.exists():
pdf_result = await lilypond_render.render_file(
input_file=source_filepath,
output=lilypond_render.RenderOutput.PDF,
output_file=out_filepath,
)
if pdf_result.error is not None:
return None
if pdf_result.result is None:
raise RuntimeError("This shouldn't happen")
pdf_filepath = pdf_result.result
return playlists.PlaylistScore(
img_url=None, # Only PDF generation for now
pdf_url=files_service.get_db_file_path(pdf_filepath),
)
async def add_playlist_score_to_playlist(playlist: playlists.Playlist) -> playlists.Playlist:
if score := await get_or_create_playlist_score(playlist=playlist):
return dataclasses.replace(
playlist,
playlist_score=score,
)
else:
return playlist
def get_all_playlists(logged_in: bool = False) -> list[playlists.Playlist]:
return list(query.get_all_playlists(logged_in=logged_in))
def set_visibility(playlist_id: int, hidden: bool) -> playlists.Playlist:
with get_connection() as con:
playlist = get_playlist(playlist_id=playlist_id, con=con)
if playlist is None:
raise ValueError(f"No playlist found with playlist_id = {playlist_id}!")
# Update hidden status in database
write.update_playlist_visibility(playlist_id=playlist_id, hidden=hidden, con=con)
# Return updated playlist
return dataclasses.replace(playlist, hidden=hidden)