Skip to content

Instantly share code, notes, and snippets.

@lemon24
Last active November 14, 2025 13:38
Show Gist options
  • Select an option

  • Save lemon24/a0d8ba030cd416cb7fdc973067938a0c to your computer and use it in GitHub Desktop.

Select an option

Save lemon24/a0d8ba030cd416cb7fdc973067938a0c to your computer and use it in GitHub Desktop.
entry_dedupe logic with support for multiple heuristics https://github.com/lemon24/reader/issues/371
"""
entry_dedupe prototype that supports multiple heuristics for finding potential duplicates.
see https://github.com/lemon24/reader/issues/371 for details.
"""
import itertools, re
from math import *
from urllib.parse import urlparse
from reader import make_reader
from reader.plugins.entry_dedupe import _normalize as normalize_text, _jaccard_similarity
from reader.plugins.entry_dedupe import _is_duplicate_full as is_content_duplicate
from reader.plugins.entry_dedupe import _ngrams as ngrams
from collections import defaultdict, Counter
from functools import partial, cache
class DisjointSet:
# naive version of scipy.cluster.hierarchy.DisjointSet
def __init__(self):
self._subsets = {}
def add(self, *xs):
prev = None
for x in xs:
if x not in self._subsets:
self._subsets[x] = {x}
if prev is not None:
self.merge(prev, x)
prev = x
def merge(self, x, y):
x_subset = self._subsets[x]
y_subset = self._subsets[y]
if x_subset is y_subset:
return
if len(x_subset) < len(y_subset):
x_subset, y_subset = y_subset, x_subset
x_subset.update(y_subset)
for n in y_subset:
self._subsets[n] = x_subset
def subsets(self):
unique_subsets = {id(s): s for s in self._subsets.values()}
return [set(s) for s in unique_subsets.values()]
N = 4
def title_similarity(a, b):
return _jaccard_similarity(normalize_text(a), normalize_text(b), N)
def normalize_title(s, min_length=3):
return ' '.join(w for w in normalize_text(s).split() if len(w) >= min_length)
def normalize_url(url):
# not the same as the parser one!
# todo: handle errors
url = urlparse(url)
return url._replace(
scheme='https' if url.scheme == 'http' else url.scheme,
path=url.path.rstrip('/'),
).geturl()
reader = make_reader('db.sqlite')
feed = 'http://testandcode.com/rss'
# feed = 'https://qntm.org/rss.php'
# feed = 'https://xkcd.com/atom.xml'
# feed = 'https://www.bunniestudios.com/blog/?feed=rss2'
# feed = 'http://n-gate.com/index.atom'
entries = list(reader.get_entries(feed=feed))
dedupe_once = False
if dedupe_once:
new_entries = entries
else:
entries.sort(key=lambda e: e.last_updated)
last_updated = entries[-1].last_updated
new_entries = [e for e in entries if e.last_updated == e.added == last_updated]
def ngrams_f(d, n):
if len(d) < n:
return []
return [tuple(d[:n]),]
def ngrams_fl(d, n):
if len(d) < n:
return []
if len(d) == n:
return [tuple(d),]
return [tuple(d[:n]), tuple(d[-n:])]
def common_ngrams(
documents: list[tuple[str]],
ngram_range: tuple[int, int] = (1, 6),
ngrams: callable = ngrams,
min_ngram_lengths: dict[int, int] = {1: 12, 2: 10},
min_df: int = 7,
max_features: int = 50,
) -> dict[tuple[str], int]:
"""Extract common n-grams from a collection of tokenized documents.
Sub- n-grams of different lengths are skipped,
usually preferring the longer ones (heuristic).
"""
# vaguely like sklearn CountVectorizer, but less efficient
min_n, max_n = ngram_range
counts = Counter()
for n in reversed(range(min_n, max_n+1)):
gs = (g for d in documents for g in ngrams(d, n))
if min_length := min_ngram_lengths.get(n):
gs = (g for g in gs if sum(map(len, g)) >= min_length)
counts.update(gs)
for k, v in counts.items():
if v < min_df:
counts[k] = 0
counts += Counter()
rv = dict(counts.most_common(max_features))
return skip_sub_ngrams(rv, ngrams=ngrams)
def skip_sub_ngrams(
counts: dict[tuple[str], int], ngrams: callable = ngrams
) -> dict[tuple[str], int]:
"""
this kinda works, but I can't explain why, and it's not perfect
skip_sub_ngrams({
('hello',): 2,
('hello', 'world'): 1,
('hello', 'ngram'): 1,
('world',): 1,
('ngram',): 1,
})
returns {('hello',): 2, ('hello', 'world'): 1, ('hello', 'ngram'): 1}
should return {('hello',): 2, ('world',): 1, ('ngram',): 1}
"""
pairs = list(counts.items())
min_n = min(map(len, counts), default=0)
pairs.sort(key=lambda p: len(p[0]) * p[1], reverse=True)
# print('--- pairs', *pairs, '', sep='\n')
seen = set()
rv = {}
for ngram, count in pairs:
if ngram in seen:
continue
sub_ngrams = []
for n in range(min_n, len(ngram)):
sub_ngrams.extend(ngrams(ngram, n))
seen.add(ngram)
seen.update(sub_ngrams)
for sub_ngram in sub_ngrams:
if sub_ngram not in rv:
continue
# we accept the sub-ngram if it's at least 3x more frequent;
# the 3x is based on observation
if counts.get(sub_ngram, 0) >= count * 3:
break
else:
rv[ngram] = count
return rv
"""
for feed in reader.get_feeds():
# if not re.search("(?i)eli |test|bunnie", feed.resolved_title or ''):
# continue
documents = [normalize_title(e.title).split() for e in reader.get_entries(feed=feed)]
common = common_ngrams( documents,
ngrams=ngrams_f, min_ngram_lengths={1: 5},
)
print(feed.resolved_title or feed.url)
for ngram, count in common.items():
print(f" {count:>4} {' '.join(ngram)}")
print()
exit()
"""
def group_by(keyfn, items, only_items=None):
if only_items:
only_keys = list(map(keyfn, only_items))
groups = defaultdict(list)
for item in items:
key = keyfn(item)
if not key:
continue
if only_items and key not in only_keys:
continue
groups[key].append(item)
return groups.values()
def title_grouper(entries, new_entries):
def key(e):
return normalize_title(e.title)
return group_by(key, entries, new_entries)
def title_prefix_grouper(entries, new_entries):
documents = [normalize_title(e.title).split() for e in entries]
common = common_ngrams(
documents,
ngrams=ngrams_f,
min_ngram_lengths={1: 5},
min_df=ceil(len(new_entries) * .2),
)
pattern = f"^({'|'.join(map(re.escape, sorted(map(' '.join, common), key=len)))}) "
def key(e):
normalized = normalize_title(e.title)
rv = re.sub(pattern, '', normalized)
if rv:
return rv
return normalized
return group_by(key, entries, new_entries)
def url_grouper(entries, new_entries):
def key(e):
return normalize_url(e.link)
return group_by(key, entries, new_entries)
def datetime_grouper(entries, new_entries):
# todo: should be by minute
def key(e):
return e.published or e.updated
return group_by(key, entries, new_entries)
def date_grouper(entries, new_entries):
# todo: perhaps check frequency first! (if too often, don't bother)
def key(e):
rv = e.published or e.updated
if not rv:
return None
return rv.date()
return group_by(key, entries, new_entries)
def title_similarity_grouper(entries, new_entries):
# todo: replace common words with a single private use char
# (may make it faster because we're checking for fewer chars)
# (but, it might decrease similarity
# e.g. 'python' is closer to 'pyhton' than to '<single char>')
for one in new_entries:
for two in entries:
if one is two:
continue
if not one.title:
continue
if not two.title:
continue
if title_similarity(one.title, two.title) > .1:
yield [one, two]
groupers = [
title_grouper,
url_grouper,
title_prefix_grouper,
datetime_grouper,
title_similarity_grouper,
date_grouper,
# content prefix goes here? :)
]
HUGE_GROUP_SIZE = 8
MAX_GROUP_SIZE = 4
MASS_DUPLICATION_MIN_PAIRS = 4
all_by_id = {e.id: e for e in entries}
new_by_id = {e.id: e for e in new_entries}
def is_duplicate(one, two):
return is_content_duplicate(all_by_id[one], all_by_id[two])
is_duplicate = cache(is_duplicate)
all_duplicates = DisjointSet()
for grouper in groupers:
print(f"--- {grouper.__name__:<24} all={len(all_by_id):>3} new={len(new_by_id):>3}")
duplicates = DisjointSet()
for group in grouper(all_by_id.values(), new_by_id.values()):
if len(group) == 1:
continue
if len(group) > HUGE_GROUP_SIZE:
continue
group.sort(key=lambda e: e.added, reverse=True)
group = group[:MAX_GROUP_SIZE]
for one, two in itertools.combinations(group, 2):
if is_duplicate(one.id, two.id):
duplicates.add(one.id, two.id)
groups = duplicates.subsets()
pairs = [g for g in groups if len(g) == 2]
for group in groups:
all_duplicates.add(*group)
for eid in group:
new_by_id.pop(eid, None)
if len(pairs) >= MASS_DUPLICATION_MIN_PAIRS:
all_by_id.pop(eid, None)
# all_duplicates.subsets() is what we need to dedupe
print(f"--- {'done':<24} all={len(all_by_id):>3} new={len(new_by_id):>3}")
"""
more prototypes about how to extract common phrases
(preferrably faster, using regexes)
see https://github.com/lemon24/reader/issues/371 for details.
"""
import itertools, re
from math import *
from urllib.parse import urlparse
from reader import make_reader
from reader.plugins.entry_dedupe import *
from collections import defaultdict, Counter
from itertools import chain
from functools import partial, cache
import unicodedata,time
from contextlib import contextmanager
@contextmanager
def timed(time=time.monotonic):
start = time()
yield
end = time()
print(f" {end-start:.3f}s")
reader = make_reader('db.sqlite')
pua_re = r"[\U000F0000-\U000FFFFD]"
pua_sep_re = rf"(?<={pua_re}) (?={pua_re})"
pua_run_re = rf"(?:{pua_re} ?)+"
def common_phrases(documents):
repls = {}
reverse_repls = {}
next_repl = '\U000F0000'
def resolve(seq):
return tuple(reverse_repls[c] for c in seq)
ndocs = len(documents)
limits = [int(ndocs*i) for i in [.1, .05, 0.02, .01] if ndocs*i >= 7 * 1.5] + [7]
print(limits)
counts = Counter(w for d in documents for w in d)
common = counts.most_common(max(100, ndocs))
common.reverse()
text = '\n'.join(' '.join(d) for d in documents)
extracted = Counter()
def extract(match):
s = match[0]
trimmed = match[0].replace(' ', '')
seq = resolve(trimmed)
# if sum(map(len, seq)) < 5:
# return ' '.join(seq)
extracted[seq] += 1
for limit in limits:
if not common:
break
words = []
while common and common[-1][1] > limit:
word, count = common.pop()
if len(word) > 1:
words.append(word)
# print(words)
for word in words:
repls[word] = next_repl
reverse_repls[next_repl] = word
next_repl = chr(ord(next_repl) + 1)
pattern = rf"(?m)(^|(?<= ))({'|'.join(map(re.escape, words))})((?= )|$)"
text = re.sub(pattern, lambda m: repls.get(m.group(0)), text)
text = re.sub(pua_run_re, extract, text)
for k, c in extracted.items():
if c < 7:
extracted[k] = 0
if sum(map(len, k)) < 5:
extracted[k] = 0
extracted += Counter()
print(*extracted.most_common(), '', sep='\n')
# print(text)
# break
def common_phrases(documents):
ndocs = len(documents)
limits = [int(ndocs*i) for i in [.1, .05, 0.02, .01] if ndocs*i >= 7 * 1.5] + [7]
print(limits)
counts = Counter(w for d in documents for w in d)
common = counts.most_common(max(100, ndocs))
common.reverse()
text = '\n'.join(' '.join(d) for d in documents)
for limit in limits:
if not common:
break
words = []
while common and common[-1][1] > limit:
word, count = common.pop()
if len(word) > 1:
words.append(word)
words_re = rf"(?:{'|'.join(map(re.escape, words))})"
pattern = rf"(?m)(^|(?<= )){words_re}( {words_re})*((?= )|$)"
# print(pattern)
text = re.sub(pattern, lambda m: m[0].replace(' ', '-'), text)
# print(text)
# break
extracted = Counter(text.split())
print(*extracted.most_common(20), '', sep='\n')
def make_limits(n, min_n, rate=.75):
prev = None
while n >= min_n:
yield (prev := int(n))
n = n * rate
if prev != min_n:
yield min_n
# print(*make_limits(50, 4)); exit()
def common_phrases(documents, min_df=4, min_single_length=5, min_length=7):
ndocs = len(documents)
limits = list(make_limits(max(ndocs * .1, 20), min_df))
# print(f"{limits=}\n")
counts = Counter(w for d in documents for w in set(d))
text = '\n'.join(' '.join(d) for d in documents)
def repl(match):
string = match[0]
words = string.split()
skip = False
if len(words) == 1:
# single numbers are "longer"
length = len(string) * (1.5 if string.isnumeric() else 1)
if length < min_single_length:
skip = True
elif len(string) < min_length:
skip = True
if skip:
return string
extracted[string] += 1
for word in words:
counts[word] -= 1
return ''
extracted = Counter()
for limit in limits:
# print(f"{limit=}\n")
words = []
for word, count in counts.most_common(200):
if count < limit:
break
if len(word) <= 1:
continue
words.append(word)
words.sort(key=len, reverse=True)
# print(f"{words=}\n")
words_re = rf"(?:{'|'.join(map(re.escape, words))})"
pattern = rf"(?m)(^|(?<= )){words_re}( +{words_re})*((?= )|$)"
text = re.sub(pattern, repl, text)
for k, c in extracted.items():
if c < min_df:
extracted[k] = 0
extracted += Counter()
print(*extracted.most_common(40), '', sep='\n')
for feed in reader.get_feeds():
# if not re.search(r"(?i)eli |test|bunnie|georg|99|allu|xkcd\.com", feed.resolved_title or ''):
# continue
# if not re.search("(?i)eli", feed.resolved_title or ''):
# continue
print('#', feed.title)
print()
documents = [tokenize_title(e.title) for e in reader.get_entries(feed=feed)]
with timed():
common_phrases(documents)
print()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment