Skip to content

Instantly share code, notes, and snippets.

@polyrand
Forked from petehunt/sqlite-fts-userspace.py
Created October 25, 2021 07:22
Show Gist options
  • Save polyrand/5a371b64c4cf40d7dfd74f332c74a04a to your computer and use it in GitHub Desktop.
Save polyrand/5a371b64c4cf40d7dfd74f332c74a04a to your computer and use it in GitHub Desktop.
import binascii
import sqlite3
import collections
import math
import re
RE_SPLIT = re.compile(r'[^A-Za-z0-9_]+')
def tokenize(s):
# this is super basic and should be replaced with a real tokenizer
return [word[:-2] for word in RE_SPLIT.split(s.lower()) if len(word) > 2]
conn = sqlite3.connect(':memory:')
c = conn.cursor()
c.execute('''
create table terms (
search_index_id bigint not null,
term_id bigint not null,
doc_id bigint not null,
tf double not null,
primary key (term_id, doc_id)
)
''')
c.execute('''
create index idx_terms on terms (search_index_id, term_id, tf)
''')
def add_document(cursor, search_index_id, doc_id, tokens):
counter = collections.Counter(tokens)
rows = ((search_index_id, binascii.crc32(bytes(key, 'utf8')), doc_id, math.sqrt(count)) for key, count in counter.items())
cursor.executemany('insert into terms (search_index_id, term_id, doc_id, tf) values (?, ?, ?, ?)', rows)
def search(cursor, search_index_id, tokens, n_candidates = 10000):
# implementation of https://www.elastic.co/guide/en/elasticsearch/guide/current/practical-scoring-function.html
token_ids = set([binascii.crc32(bytes(key, 'utf8')) for key in tokens])
# first get idf for each term
cursor.execute('select count(*) from terms where search_index_id=?', (search_index_id,))
num_docs, = cursor.fetchone()
cursor.execute('select term_id, count(*) from terms where term_id in (%s) group by term_id' % (','.join(str(x) for x in token_ids),))
idf = {}
for term_id, doc_freq in cursor.fetchall():
idf[term_id] = 1 + math.log(num_docs / (doc_freq + 1))
# now get the most likely candidate doc_ids
limit = n_candidates * len(token_ids)
cursor.execute('select term_id, tf, doc_id from terms where search_index_id=%d and term_id in (%s) order by tf desc limit %d' % (search_index_id, ','.join(str(x) for x in token_ids), limit))
# score each candidate doc_id using tfidf * coord
results = collections.defaultdict(lambda: 0)
coords = collections.defaultdict(lambda: 0)
for term_id, tf, doc_id in cursor.fetchall():
results[doc_id] += tf * idf[term_id]
coords[doc_id] += 1
# calc coord (percent of terms in the query that match the doc)
for doc_id, num_matched in coords.items():
results[doc_id] *= num_matched / len(token_ids)
return sorted(((score, doc_id) for (doc_id, score) in results.items()), reverse=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment