Created
June 9, 2022 21:03
-
-
Save jelmervdl/4ff421572c8ac2f53946ad817b83e77b to your computer and use it in GitHub Desktop.
Near duplicate deduplication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import gzip | |
import os | |
from collections import defaultdict | |
from xxhash import xxh64 | |
from unicodedata import category as cat | |
from unidecode import unidecode | |
from functools import reduce | |
from tqdm.autonotebook import tqdm | |
from io import TextIOWrapper, BufferedReader | |
class ProgressWrapper: | |
"""Wraps around a file-like object and shows a progress bar as to how much | |
of it has been read.""" | |
def __init__(self, fh): | |
self.fh = fh | |
self.tqdm = tqdm( | |
desc=fh.name, | |
total=os.fstat(fh.fileno()).st_size, | |
initial=fh.seekable() and fh.tell(), | |
file=sys.stderr, | |
unit='b', | |
unit_scale=True) | |
def __getattr__(self, attr: str): | |
return getattr(self.fh, attr) | |
def read(self, size: int = -1): | |
data = self.fh.read(size) | |
self.tqdm.update(len(data)) | |
return data | |
def read1(self, size: int = -1): | |
data = self.fh.read1(size) | |
self.tqdm.update(len(data)) | |
return data | |
def close(self) -> None: | |
self.tqdm.close() | |
self.fh.close() | |
class Entry: | |
__slots__ = 'line', 'count' | |
def __init__(self, line, count): | |
self.line = line | |
self.count = count | |
def magic_open(name, mode): | |
fh = open(name, 'rb') | |
fh = ProgressWrapper(fh) | |
if name.endswith('.gz'): | |
fh = gzip.open(fh, 'rb') | |
if 't' in mode: | |
fh = TextIOWrapper(fh) | |
else: | |
fh = BufferedReader(fh) | |
return fh | |
# The whole aggressive deduping bit is from https://github.com/bitextor/bifixer/blob/b4705e83a0f8aa584aa95dcf946e1c78dcadbb44/bifixer/bifixer.py#L36 | |
tbl = [chr(i) for i in range(sys.maxunicode) if not cat(chr(i)).startswith('L')] | |
remove_non_alpha = str.maketrans('', '', ''.join(tbl)) | |
counts: dict[int,dict[int,Entry]] = defaultdict(lambda: {}) | |
total = 0 | |
with magic_open(sys.argv[1], 'rt') as fh: | |
for n, line in enumerate(fh): | |
# Hash of the actual line, to count exact duplicates | |
line_hash = hash(line) | |
# Hash of a normalized version of the line, to detect near duplicates | |
# normalized = unidecode(line.lower().translate(remove_non_alpha)) | |
normalized = line.lower().translate(remove_non_alpha) | |
normalized_hash = xxh64(normalized).intdigest() | |
# Count how often this exact duplicate occurs in the near-duplicate entry | |
entry = counts[normalized_hash] | |
if line_hash in entry: | |
entry[line_hash].count += 1 | |
else: | |
entry[line_hash] = Entry(n, 1) | |
unique: list[int] | |
# Pick the most common variant for each near-duplicate | |
unique = [max(count.values(), key=lambda entry: entry.count).line for count in tqdm(counts.values())] | |
# Inverse unique: show all the variants of only lines that occur more than once | |
# unique = reduce(lambda acc, count: acc + [entry.line for entry in count.values()] if len(count) > 1 else acc, counts.values(), []) | |
del counts | |
# Print ratio to stderr | |
print("{} / {} = {:0.4f}%".format(len(unique), n + 1, len(unique) / (n + 1) * 100), file=sys.stderr) | |
# Iterate over input again, but only pass our selection to stdout | |
unique.sort() | |
m = 0 | |
with magic_open(sys.argv[1], 'rb') as fh: | |
for n, line in enumerate(fh): | |
if unique[m] == n: | |
sys.stdout.buffer.write(line) | |
m += 1 | |
if m == len(unique): | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment