import dataclasses from folkugat_web.dal.sql import Connection from folkugat_web.dal.sql._connection import get_connection from folkugat_web.dal.sql.playlists import query, write from folkugat_web.log import logger from folkugat_web.model import playlists from folkugat_web.model.lilypond import score as lilypond_model from folkugat_web.services import files as files_service from folkugat_web.services.lilypond import build as lilypond_build from folkugat_web.services.lilypond import render as lilypond_render from folkugat_web.services.lilypond import source as lilypond_source from folkugat_web.services.temes import links as links_service from folkugat_web.services.temes import lyrics as lyrics_service from folkugat_web.services.temes import query as temes_query from folkugat_web.services.temes import scores as scores_service def add_temes_to_playlist(playlist: playlists.Playlist) -> playlists.Playlist: for set_ in playlist.sets: _ = add_temes_to_set(set_) return playlist def add_temes_to_set(set_: playlists.Set) -> playlists.Set: for tema_in_set in set_.temes: _ = add_tema_to_tema_in_set(tema_in_set) return set_ def add_tema_to_tema_in_set(tema_in_set: playlists.TemaInSet) -> playlists.TemaInSet: if tema_in_set.tema_id is not None: tema_in_set.tema = temes_query.get_tema_by_id(tema_in_set.tema_id) if not tema_in_set.tema: logger.error("fCould not load tune in set: {tema_in_set}") else: _ = links_service.add_links_to_tema(tema_in_set.tema) _ = lyrics_service.add_lyrics_to_tema(tema_in_set.tema) _ = scores_service.add_scores_to_tema(tema_in_set.tema) return tema_in_set def get_playlist(playlist_id: int, con: Connection | None = None) -> playlists.Playlist: with get_connection(con) as playlist_con: playlist_name = query.get_playlist_name(playlist_id=playlist_id, con=playlist_con) return playlists.Playlist.from_playlist_entries( playlist_id=playlist_id, name=playlist_name, entries=list(query.get_playlist_entries(playlist_id=playlist_id, con=playlist_con)) ) def update_name(playlist_id: int, name: str | None) -> playlists.Playlist: write.update_playlist_name(playlist_id=playlist_id, name=name) return get_playlist(playlist_id=playlist_id) def add_set(playlist_id: int, con: Connection | None = None) -> playlists.Set: with get_connection(con) as con: curr_playlist = get_playlist(playlist_id=playlist_id, con=con) new_set_id = max([set_entry.id for set_entry in curr_playlist.sets], default=0) + 1 new_entry = playlists.PlaylistEntry(id=None, playlist_id=playlist_id, set_id=new_set_id, tema_id=None) inserted_entry = write.insert_playlist_entry(new_entry) return playlists.Set.from_playlist_entries(set_id=inserted_entry.set_id, entries=[inserted_entry]) def get_set(playlist_id: int, set_id: int, con: Connection | None = None) -> playlists.Set | None: entries = list(query.get_playlist_entries(playlist_id=playlist_id, set_id=set_id, con=con)) if entries: return playlists.Set.from_playlist_entries(set_id=set_id, entries=entries) else: return None def delete_set(playlist_id: int, set_id: int, con: Connection | None = None): write.delete_playlist_set(playlist_id=playlist_id, set_id=set_id, con=con) def add_tema(playlist_id: int, set_id: int, con: Connection | None = None) -> playlists.TemaInSet: with get_connection(con) as con: new_entry = playlists.PlaylistEntry(id=None, playlist_id=playlist_id, set_id=set_id, tema_id=None) inserted_entry = write.insert_playlist_entry(new_entry) return playlists.TemaInSet.from_playlist_entry(inserted_entry) def get_tema(entry_id: int, con: Connection | None = None) -> playlists.TemaInSet: with get_connection(con) as con: entry = next(query.get_playlist_entries(entry_id=entry_id)) return playlists.TemaInSet.from_playlist_entry(entry) def delete_tema(entry_id: int, con: Connection | None = None): with get_connection(con) as con: write.delete_playlist_entry(entry_id=entry_id, con=con) def set_tema(playlist_id: int, set_id: int, entry_id: int, tema_id: int | None, con: Connection | None = None): with get_connection(con) as con: new_entry = playlists.PlaylistEntry(id=entry_id, playlist_id=playlist_id, set_id=set_id, tema_id=tema_id) write.update_playlist_entry(entry=new_entry, con=con) def _elegible_for_set_score(tune: lilypond_model.LilypondTune) -> bool: # A tune will be eligible for a set score if it has a score, lyrics or is unknown return tune.is_unknown or bool(tune.score_source) or bool(tune.lyrics) async def get_or_create_set_score(tune_set: playlists.Set) -> playlists.SetScore | None: # The tune_set is assumed to be enriched with tunes if not tune_set.temes: return None if len(tune_set.temes) == 1: if (tema := tune_set.temes[0].tema) is None or (main_score := tema.main_score()) is None: return None return playlists.SetScore( pdf_url=main_score.pdf_url, img_url=main_score.img_url, ) lilypond_set = lilypond_build.set_from_set(set_entry=tune_set) if not lilypond_set.tunes or not all(map(_elegible_for_set_score, lilypond_set.tunes)): return None set_score_hash = lilypond_set.hash().hex() pdf_filepath = files_service.get_set_filename(f"{set_score_hash}.pdf") png_filepath = files_service.get_set_filename(f"{set_score_hash}.cropped.png") if not pdf_filepath.exists() or not png_filepath.exists(): # No score exists, so we need to create it set_source = lilypond_source.set_source(tune_set=lilypond_set) out_filepath = files_service.get_set_filename(set_score_hash) async with files_service.tmp_file(content=set_source) as source_filepath: if not pdf_filepath.exists(): pdf_result = await lilypond_render.render_file( input_file=source_filepath, output=lilypond_render.RenderOutput.PDF, output_file=out_filepath, ) if pdf_result.error is not None: return None if pdf_result.result is None: raise RuntimeError("This shouldn't happen") pdf_filepath = pdf_result.result if not png_filepath.exists(): png_result = await lilypond_render.render_file( input_file=source_filepath, output=lilypond_render.RenderOutput.PNG_CROPPED, output_file=out_filepath, ) if png_result.error is not None: return None if png_result.result is None: raise RuntimeError("This shouldn't happen") png_filepath = png_result.result return playlists.SetScore( img_url=files_service.get_db_file_path(png_filepath), pdf_url=files_service.get_db_file_path(pdf_filepath), ) async def add_set_score_to_set(tune_set: playlists.Set) -> playlists.Set: if score := await get_or_create_set_score(tune_set=tune_set): return dataclasses.replace( tune_set, score=score, ) else: return tune_set async def get_or_create_playlist_score(playlist: playlists.Playlist) -> playlists.PlaylistScore | None: # The playlist is assumed to be enriched with tunes if not playlist.sets: return None lilypond_playlist = lilypond_build.playlist_from_playlist(playlist) if not lilypond_playlist.sets: return None playlist_score_hash = lilypond_playlist.hash().hex() pdf_filepath = files_service.get_playlist_filename(f"{playlist_score_hash}.pdf") if not pdf_filepath.exists(): # No score exists, so we need to create it playlist_source = lilypond_source.playlist_source(playlist=lilypond_playlist) out_filepath = files_service.get_playlist_filename(playlist_score_hash) async with files_service.tmp_file(content=playlist_source) as source_filepath: if not pdf_filepath.exists(): pdf_result = await lilypond_render.render_file( input_file=source_filepath, output=lilypond_render.RenderOutput.PDF, output_file=out_filepath, ) if pdf_result.error is not None: return None if pdf_result.result is None: raise RuntimeError("This shouldn't happen") pdf_filepath = pdf_result.result return playlists.PlaylistScore( img_url=None, # Only PDF generation for now pdf_url=files_service.get_db_file_path(pdf_filepath), ) async def add_playlist_score_to_playlist(playlist: playlists.Playlist) -> playlists.Playlist: if score := await get_or_create_playlist_score(playlist=playlist): return dataclasses.replace( playlist, playlist_score=score, ) else: return playlist def get_all_playlists(logged_in: bool = False) -> list[playlists.Playlist]: return list(query.get_all_playlists(logged_in=logged_in)) def set_visibility(playlist_id: int, hidden: bool) -> playlists.Playlist: with get_connection() as con: playlist = get_playlist(playlist_id=playlist_id, con=con) if playlist is None: raise ValueError(f"No playlist found with playlist_id = {playlist_id}!") # Update hidden status in database write.update_playlist_visibility(playlist_id=playlist_id, hidden=hidden, con=con) # Return updated playlist return dataclasses.replace(playlist, hidden=hidden) def create_playlist(name: str | None = None) -> playlists.Playlist: playlist_id = write.create_playlist(name=name) return get_playlist(playlist_id=playlist_id) def delete_playlist(playlist_id: int): write.delete_playlist(playlist_id=playlist_id)