import time from collections.abc import Iterable, Iterator from sqlite3 import Connection from typing import Callable, TypeVar import Levenshtein from folkugat_web.config import search as config from folkugat_web.dal.sql import get_connection from folkugat_web.dal.sql.temes import properties as properties_dal from folkugat_web.dal.sql.temes import query as temes_q from folkugat_web.log import logger from folkugat_web.model import search as search_model from folkugat_web.model import temes as model from folkugat_web.services.temes import properties as properties_service from folkugat_web.services.temes import query as query_service from folkugat_web.utils import FnChain T = TypeVar("T") def get_query_word_similarity(query_word: str, text_ngrams: search_model.NGrams) -> search_model.SearchMatch: n = len(query_word) if n < config.MIN_NGRAM_LENGTH: return search_model.SearchMatch(distance=0.0, ngram='') ns = filter(lambda i: i >= config.MIN_NGRAM_LENGTH, range( n - config.QUERY_NGRAM_RANGE, n + config.QUERY_NGRAM_RANGE + 1)) candidate_ngrams = ((m, ngram) for m, ngrams in map(lambda i: (i, text_ngrams.get(i, [])), ns) for ngram in ngrams) return min((search_model.SearchMatch(distance=Levenshtein.distance(query_word, ngram)/m, ngram=ngram) for m, ngram in candidate_ngrams), default=search_model.SearchMatch(distance=float("inf"), ngram="")) def get_query_similarity(query: str, ngrams: search_model.NGrams) -> search_model.SearchMatch: query_words = query.lower().split() word_matches = map(lambda query_word: get_query_word_similarity(query_word, ngrams), query_words) return search_model.SearchMatch.combine_matches(word_matches) def _build_results_fn(query: str) -> Callable[[Iterable[tuple[T, search_model.NGrams]]], Iterator[search_model.QueryResult[T]]]: def build_result(entry: tuple[T, search_model.NGrams]) -> search_model.QueryResult[T]: if len(query) == 0: return search_model.QueryResult( result=entry[0], distance=0, ngram="", ) match = get_query_similarity(query, entry[1]) return search_model.QueryResult( result=entry[0], distance=match.distance, ngram=match.ngram, ) def build_results(entries: Iterable[tuple[T, search_model.NGrams]]) -> Iterator[search_model.QueryResult[T]]: return map(build_result, entries) return build_results def _filter_distance(qrs: Iterable[search_model.QueryResult[T]]) -> Iterator[search_model.QueryResult[T]]: return filter(lambda qr: qr.distance <= config.SEARCH_DISTANCE_THRESHOLD, qrs) def _sort_by_distance(qrs: Iterable[search_model.QueryResult[T]]) -> list[search_model.QueryResult[T]]: return sorted(qrs, key=lambda qr: qr.distance) def _query_results_to_temes( con: Connection ) -> Callable[[Iterable[search_model.QueryResult[int]]], Iterator[model.Tema]]: def fetch_temes(qrs: Iterable[search_model.QueryResult[int]]) -> Iterator[model.Tema]: return filter(None, map(lambda qr: temes_q.get_tema_by_id(tema_id=qr.result, con=con), qrs)) return fetch_temes def _filter_hidden(hidden: bool) -> Callable[[Iterable[model.Tema]], Iterator[model.Tema]]: def filter_hidden(temes: Iterable[model.Tema]) -> Iterator[model.Tema]: return filter(lambda t: hidden or not t.hidden, temes) return filter_hidden def _filter_properties(properties: list[str]) -> Callable[[Iterable[model.Tema]], Iterator[model.Tema]]: properties_set = set(prop.lower() for prop in properties) def has_properties(tema: model.Tema) -> bool: tema_properties = {prop.value.lower() for prop in tema.properties} return all(prop in tema_properties for prop in properties_set) def filter_properties(temes: Iterable[model.Tema]) -> Iterator[model.Tema]: return filter(has_properties, temes) return filter_properties def _apply_limit_offset(limit: int, offset: int) -> Callable[[Iterable[T]], list[T]]: def apply_limit_offset(temes: Iterable[T]) -> list[T]: return list(temes)[offset:offset + limit] return apply_limit_offset def busca_temes( query: str, properties: list[str], hidden: bool = False, limit: int = 10, offset: int = 0, ) -> list[model.Tema]: """ This function adds properties to Tema """ t0 = time.time() with get_connection() as con: result = ( FnChain.transform(temes_q.get_tema_id_to_ngrams(con).items()) | _build_results_fn(query) | _filter_distance | _sort_by_distance | _query_results_to_temes(con) | _filter_hidden(hidden) | properties_service.add_properties_to_temes | _filter_properties(properties) | query_service.temes_compute_stats | _apply_limit_offset(limit=limit, offset=offset) ).result() logger.info(f"Temes search time: { int((time.time() - t0) * 1000) } ms") return result def _extract_properties(query_results: list[search_model.QueryResult[str]]) -> list[str]: return [qr.result for qr in query_results] def busca_properties( query: str, limit: int = 10, offset: int = 0, ) -> list[str]: if not query: return [] t0 = time.time() with get_connection() as con: result = ( FnChain.transform(properties_dal.get_property_value_to_ngrams(con).items()) | _build_results_fn(query) | _filter_distance | _sort_by_distance | _apply_limit_offset(limit=limit, offset=offset) | _extract_properties ).result() logger.info(f"Properties search time: { int((time.time() - t0) * 1000) } ms") return result