import time from collections.abc import Iterable, Iterator from sqlite3 import Connection from typing import Callable import Levenshtein from folkugat_web.config import search as config from folkugat_web.dal.sql import get_connection from folkugat_web.dal.sql.temes import query as temes_q from folkugat_web.log import logger from folkugat_web.model import search as search_model from folkugat_web.model import temes as model from folkugat_web.utils import FnChain def get_query_word_similarity(query_word: str, text_ngrams: search_model.NGrams) -> search_model.SearchMatch: n = len(query_word) if n < config.MIN_NGRAM_LENGTH: return search_model.SearchMatch(distance=0.0, ngram='') ns = filter(lambda i: i >= config.MIN_NGRAM_LENGTH, range( n - config.QUERY_NGRAM_RANGE, n + config.QUERY_NGRAM_RANGE + 1)) candidate_ngrams = ((m, ngram) for m, ngrams in map(lambda i: (i, text_ngrams.get(i, [])), ns) for ngram in ngrams) return min((search_model.SearchMatch(distance=Levenshtein.distance(query_word, ngram)/m, ngram=ngram) for m, ngram in candidate_ngrams), default=search_model.SearchMatch(distance=float("inf"), ngram="")) def get_query_similarity(query: str, ngrams: search_model.NGrams) -> search_model.SearchMatch: query_words = query.lower().split() word_matches = map(lambda query_word: get_query_word_similarity(query_word, ngrams), query_words) return search_model.SearchMatch.combine_matches(word_matches) def _build_results_fn(query: str) -> Callable[[Iterable[tuple[int, search_model.NGrams]]], Iterator[search_model.QueryResult]]: def build_result(entry: tuple[int, search_model.NGrams]) -> search_model.QueryResult: if len(query) == 0: return search_model.QueryResult( id=entry[0], distance=0, ngram="", ) match = get_query_similarity(query, entry[1]) return search_model.QueryResult( id=entry[0], distance=match.distance, ngram=match.ngram, ) def build_results(entries: Iterable[tuple[int, search_model.NGrams]]) -> Iterator[search_model.QueryResult]: return map(build_result, entries) return build_results def _filter_distance(qrs: Iterable[search_model.QueryResult]) -> Iterator[search_model.QueryResult]: return filter(lambda qr: qr.distance <= config.SEARCH_DISTANCE_THRESHOLD, qrs) def _sort_by_distance(qrs: Iterable[search_model.QueryResult]) -> list[search_model.QueryResult]: return sorted(qrs, key=lambda qr: qr.distance) def _query_results_to_temes(con: Connection) -> Callable[[Iterable[search_model.QueryResult]], Iterator[model.Tema]]: def fetch_temes(qrs: Iterable[search_model.QueryResult]) -> Iterator[model.Tema]: return filter(None, map(lambda qr: temes_q.get_tema_by_id(tema_id=qr.id, con=con), qrs)) return fetch_temes def _filter_hidden(hidden: bool) -> Callable[[Iterable[model.Tema]], Iterator[model.Tema]]: def filter_hidden(temes: Iterable[model.Tema]) -> Iterator[model.Tema]: return filter(lambda t: hidden or not t.hidden, temes) return filter_hidden def _apply_limit_offset(limit: int, offset: int) -> Callable[[Iterable[model.Tema]], list[model.Tema]]: def apply_limit_offset(temes: Iterable[model.Tema]) -> list[model.Tema]: return list(temes)[offset:offset + limit] return apply_limit_offset def busca_temes(query: str, hidden: bool = False, limit: int = 10, offset: int = 0) -> list[model.Tema]: t0 = time.time() with get_connection() as con: result = ( FnChain.transform(temes_q.get_tema_id_to_ngrams(con).items()) | _build_results_fn(query) | _filter_distance | _sort_by_distance | _query_results_to_temes(con) | _filter_hidden(hidden) | _apply_limit_offset(limit=limit, offset=offset) ).result() logger.info(f"Search time: { int((time.time() - t0) * 1000) } ms") return result