import time import Levenshtein from folkugat_web.config import search as config from folkugat_web.dal.sql import get_connection from folkugat_web.dal.sql.temes import query as temes_q from folkugat_web.log import logger from folkugat_web.model import search as search_model from folkugat_web.model import temes as model def get_query_word_similarity(query_word: str, text_ngrams: model.NGrams) -> search_model.SearchMatch: n = len(query_word) if n < config.MIN_NGRAM_LENGTH: return search_model.SearchMatch(distance=0.0, ngram='') ns = filter(lambda i: i >= config.MIN_NGRAM_LENGTH, range( n - config.QUERY_NGRAM_RANGE, n + config.QUERY_NGRAM_RANGE + 1)) candidate_ngrams = ((m, ngram) for m, ngrams in map(lambda i: (i, text_ngrams.get(i, [])), ns) for ngram in ngrams) return min(search_model.SearchMatch(distance=Levenshtein.distance(query_word, ngram)/m, ngram=ngram) for m, ngram in candidate_ngrams) def get_query_similarity(query: str, ngrams: model.NGrams) -> search_model.SearchMatch: query_words = query.lower().split() word_matches = map(lambda query_word: get_query_word_similarity(query_word, ngrams), query_words) return search_model.SearchMatch.combine_matches(word_matches) def build_result(query: str, entry: tuple[int, model.NGrams]) -> search_model.QueryResult: if len(query) == 0: return search_model.QueryResult( id=entry[0], distance=0, ngram="", ) match = get_query_similarity(query, entry[1]) return search_model.QueryResult( id=entry[0], distance=match.distance, ngram=match.ngram, ) def busca_temes(query: str) -> list[model.Tema]: t0 = time.time() with get_connection() as con: tema_id_to_ngrams = temes_q.get_tema_id_to_ngrams(con) search_results = (build_result(query, entry) for entry in tema_id_to_ngrams.items()) filtered_results = filter(lambda qr: qr.distance <= config.SEARCH_DISTANCE_THRESHOLD, search_results) # filtered_results = filter(lambda qr: True, search_results) sorted_results = sorted(filtered_results, key=lambda qr: qr.distance) sorted_temes = list(filter(None, map(lambda qr: temes_q.get_tema_by_id(qr.id, con), sorted_results))) logger.info(f"Search time: { int((time.time() - t0) * 1000) } ms") return sorted_temes