Files
2025-10-10 00:14:36 +02:00

193 lines
7.3 KiB
Python

import datetime
import time
from collections.abc import Iterable, Iterator
from sqlite3 import Connection
from typing import Callable, TypeVar
import Levenshtein
from folkugat_web.config import search as config
from folkugat_web.dal.sql import get_connection
from folkugat_web.dal.sql.temes import properties as properties_dal
from folkugat_web.dal.sql.temes import query as temes_q
from folkugat_web.log import logger
from folkugat_web.model import search as search_model
from folkugat_web.model import temes as model
from folkugat_web.services.temes import properties as properties_service
from folkugat_web.services.temes import query as query_service
from folkugat_web.utils import FnChain, identity
T = TypeVar("T")
def get_query_word_similarity(query_word: str, text_ngrams: search_model.NGrams) -> search_model.SearchMatch:
n = len(query_word)
if n < config.MIN_NGRAM_LENGTH:
return search_model.SearchMatch(distance=0.0, ngram='')
ns = filter(lambda i: i >= config.MIN_NGRAM_LENGTH, range(
n - config.QUERY_NGRAM_RANGE, n + config.QUERY_NGRAM_RANGE + 1))
candidate_ngrams = ((m, ngram)
for m, ngrams in map(lambda i: (i, text_ngrams.get(i, [])), ns)
for ngram in ngrams)
return min((search_model.SearchMatch(distance=Levenshtein.distance(query_word, ngram)/m,
ngram=ngram)
for m, ngram in candidate_ngrams),
default=search_model.SearchMatch(distance=float("inf"), ngram=""))
def get_query_similarity(query: str, ngrams: search_model.NGrams) -> search_model.SearchMatch:
query_words = query.lower().split()
word_matches = map(lambda query_word: get_query_word_similarity(query_word, ngrams), query_words)
return search_model.SearchMatch.combine_matches(word_matches)
def _build_results_fn(query: str) -> Callable[[Iterable[tuple[T, search_model.NGrams]]],
Iterator[search_model.QueryResult[T]]]:
def build_result(entry: tuple[T, search_model.NGrams]) -> search_model.QueryResult[T]:
if len(query) == 0:
return search_model.QueryResult(
result=entry[0],
distance=0,
ngram="",
)
match = get_query_similarity(query, entry[1])
return search_model.QueryResult(
result=entry[0],
distance=match.distance,
ngram=match.ngram,
)
def build_results(entries: Iterable[tuple[T, search_model.NGrams]]) -> Iterator[search_model.QueryResult[T]]:
return map(build_result, entries)
return build_results
def _filter_distance(qrs: Iterable[search_model.QueryResult[T]]) -> Iterator[search_model.QueryResult[T]]:
return filter(lambda qr: qr.distance <= config.SEARCH_DISTANCE_THRESHOLD, qrs)
def _sort_by_distance(qrs: Iterable[search_model.QueryResult[T]]) -> list[search_model.QueryResult[T]]:
return sorted(qrs, key=lambda qr: qr.distance)
def _sort_by_times_played_fn(order: search_model.Order) -> Callable[[list[model.Tema]], list[model.Tema]]:
reverse = order is search_model.Order.DESC
def _sort_by_times_played(temes: list[model.Tema]) -> list[model.Tema]:
return sorted(temes, key=lambda tema: tema.stats.times_played if tema.stats else 0, reverse=reverse)
return _sort_by_times_played
def _sort_by_last_played_fn(order: search_model.Order) -> Callable[[list[model.Tema]], list[model.Tema]]:
reverse = order is search_model.Order.DESC
def _tune_last_played_key(tema: model.Tema) -> datetime.date:
if not tema.stats or not tema.stats.sessions_played:
return datetime.date.min
else:
return tema.stats.sessions_played[0].date
def _sort_by_last_played(temes: list[model.Tema]) -> list[model.Tema]:
return sorted(temes, key=_tune_last_played_key, reverse=reverse)
return _sort_by_last_played
def _build_sort_fn(order_params: search_model.OrderParams | None) -> Callable[[list[model.Tema]], list[model.Tema]]:
if order_params is None:
return identity
match order_params.order_by:
case search_model.OrderBy.TIMES_PLAYED:
return _sort_by_times_played_fn(order=order_params.order)
case search_model.OrderBy.LAST_PLAYED:
return _sort_by_last_played_fn(order=order_params.order)
def _query_results_to_temes(
con: Connection
) -> Callable[[Iterable[search_model.QueryResult[int]]], Iterator[model.Tema]]:
def fetch_temes(qrs: Iterable[search_model.QueryResult[int]]) -> Iterator[model.Tema]:
return filter(None, map(lambda qr: temes_q.get_tema_by_id(tema_id=qr.result, con=con), qrs))
return fetch_temes
def _filter_hidden(hidden: bool) -> Callable[[Iterable[model.Tema]], Iterator[model.Tema]]:
def filter_hidden(temes: Iterable[model.Tema]) -> Iterator[model.Tema]:
return filter(lambda t: hidden or not t.hidden, temes)
return filter_hidden
def _filter_properties(properties: list[str]) -> Callable[[Iterable[model.Tema]], Iterator[model.Tema]]:
properties_set = set(prop.lower() for prop in properties)
def has_properties(tema: model.Tema) -> bool:
tema_properties = {prop.value.lower() for prop in tema.properties}
return all(prop in tema_properties for prop in properties_set)
def filter_properties(temes: Iterable[model.Tema]) -> Iterator[model.Tema]:
return filter(has_properties, temes)
return filter_properties
def _apply_limit_offset(limit: int, offset: int) -> Callable[[Iterable[T]], list[T]]:
def apply_limit_offset(temes: Iterable[T]) -> list[T]:
return list(temes)[offset:offset + limit]
return apply_limit_offset
def busca_temes(
query: str,
properties: list[str],
order_params: search_model.OrderParams | None = None,
hidden: bool = False,
limit: int = 10,
offset: int = 0,
) -> list[model.Tema]:
"""
This function adds properties to Tema
"""
t0 = time.time()
with get_connection() as con:
result = (
FnChain.transform(temes_q.get_tema_id_to_ngrams(con).items()) |
_build_results_fn(query) |
_filter_distance |
_sort_by_distance |
_query_results_to_temes(con) |
_filter_hidden(hidden) |
properties_service.add_properties_to_temes |
_filter_properties(properties) |
query_service.temes_compute_stats |
_build_sort_fn(order_params=order_params) |
_apply_limit_offset(limit=limit, offset=offset)
).result()
logger.info(f"Temes search time: { int((time.time() - t0) * 1000) } ms")
return result
def _extract_properties(query_results: list[search_model.QueryResult[str]]) -> list[str]:
return [qr.result for qr in query_results]
def busca_properties(
query: str,
limit: int = 10,
offset: int = 0,
) -> list[str]:
if not query:
return []
t0 = time.time()
with get_connection() as con:
result = (
FnChain.transform(properties_dal.get_property_value_to_ngrams(con).items()) |
_build_results_fn(query) |
_filter_distance |
_sort_by_distance |
_apply_limit_offset(limit=limit, offset=offset) |
_extract_properties
).result()
logger.info(f"Properties search time: { int((time.time() - t0) * 1000) } ms")
return result