99 lines
4.1 KiB
Python
99 lines
4.1 KiB
Python
import time
|
|
from collections.abc import Iterable, Iterator
|
|
from sqlite3 import Connection
|
|
from typing import Callable
|
|
|
|
import Levenshtein
|
|
from folkugat_web.config import search as config
|
|
from folkugat_web.dal.sql import get_connection
|
|
from folkugat_web.dal.sql.temes import query as temes_q
|
|
from folkugat_web.log import logger
|
|
from folkugat_web.model import search as search_model
|
|
from folkugat_web.model import temes as model
|
|
from folkugat_web.utils import FnChain
|
|
|
|
|
|
def get_query_word_similarity(query_word: str, text_ngrams: search_model.NGrams) -> search_model.SearchMatch:
|
|
n = len(query_word)
|
|
if n < config.MIN_NGRAM_LENGTH:
|
|
return search_model.SearchMatch(distance=0.0, ngram='')
|
|
ns = filter(lambda i: i >= config.MIN_NGRAM_LENGTH, range(
|
|
n - config.QUERY_NGRAM_RANGE, n + config.QUERY_NGRAM_RANGE + 1))
|
|
candidate_ngrams = ((m, ngram)
|
|
for m, ngrams in map(lambda i: (i, text_ngrams.get(i, [])), ns)
|
|
for ngram in ngrams)
|
|
return min((search_model.SearchMatch(distance=Levenshtein.distance(query_word, ngram)/m,
|
|
ngram=ngram)
|
|
for m, ngram in candidate_ngrams),
|
|
default=search_model.SearchMatch(distance=float("inf"), ngram=""))
|
|
|
|
|
|
def get_query_similarity(query: str, ngrams: search_model.NGrams) -> search_model.SearchMatch:
|
|
query_words = query.lower().split()
|
|
word_matches = map(lambda query_word: get_query_word_similarity(query_word, ngrams), query_words)
|
|
return search_model.SearchMatch.combine_matches(word_matches)
|
|
|
|
|
|
def _build_results_fn(query: str) -> Callable[[Iterable[tuple[int, search_model.NGrams]]],
|
|
Iterator[search_model.QueryResult]]:
|
|
def build_result(entry: tuple[int, search_model.NGrams]) -> search_model.QueryResult:
|
|
if len(query) == 0:
|
|
return search_model.QueryResult(
|
|
id=entry[0],
|
|
distance=0,
|
|
ngram="",
|
|
)
|
|
match = get_query_similarity(query, entry[1])
|
|
return search_model.QueryResult(
|
|
id=entry[0],
|
|
distance=match.distance,
|
|
ngram=match.ngram,
|
|
)
|
|
|
|
def build_results(entries: Iterable[tuple[int, search_model.NGrams]]) -> Iterator[search_model.QueryResult]:
|
|
return map(build_result, entries)
|
|
|
|
return build_results
|
|
|
|
|
|
def _filter_distance(qrs: Iterable[search_model.QueryResult]) -> Iterator[search_model.QueryResult]:
|
|
return filter(lambda qr: qr.distance <= config.SEARCH_DISTANCE_THRESHOLD, qrs)
|
|
|
|
|
|
def _sort_by_distance(qrs: Iterable[search_model.QueryResult]) -> list[search_model.QueryResult]:
|
|
return sorted(qrs, key=lambda qr: qr.distance)
|
|
|
|
|
|
def _query_results_to_temes(con: Connection) -> Callable[[Iterable[search_model.QueryResult]], Iterator[model.Tema]]:
|
|
def fetch_temes(qrs: Iterable[search_model.QueryResult]) -> Iterator[model.Tema]:
|
|
return filter(None, map(lambda qr: temes_q.get_tema_by_id(tema_id=qr.id, con=con), qrs))
|
|
return fetch_temes
|
|
|
|
|
|
def _filter_hidden(hidden: bool) -> Callable[[Iterable[model.Tema]], Iterator[model.Tema]]:
|
|
def filter_hidden(temes: Iterable[model.Tema]) -> Iterator[model.Tema]:
|
|
return filter(lambda t: hidden or not t.hidden, temes)
|
|
return filter_hidden
|
|
|
|
|
|
def _apply_limit_offset(limit: int, offset: int) -> Callable[[Iterable[model.Tema]], list[model.Tema]]:
|
|
def apply_limit_offset(temes: Iterable[model.Tema]) -> list[model.Tema]:
|
|
return list(temes)[offset:offset + limit]
|
|
return apply_limit_offset
|
|
|
|
|
|
def busca_temes(query: str, hidden: bool = False, limit: int = 10, offset: int = 0) -> list[model.Tema]:
|
|
t0 = time.time()
|
|
with get_connection() as con:
|
|
result = (
|
|
FnChain.transform(temes_q.get_tema_id_to_ngrams(con).items()) |
|
|
_build_results_fn(query) |
|
|
_filter_distance |
|
|
_sort_by_distance |
|
|
_query_results_to_temes(con) |
|
|
_filter_hidden(hidden) |
|
|
_apply_limit_offset(limit=limit, offset=offset)
|
|
).result()
|
|
logger.info(f"Search time: { int((time.time() - t0) * 1000) } ms")
|
|
return result
|