Created
February 3, 2017 08:13
-
-
Save fulmicoton/1d38f681dc26afd2be6f9b4ef95ab30f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from collections import Counter, defaultdict | |
import hashlib | |
import time | |
from multiprocessing import Pool | |
WHITESPACE = re.compile("\W+") | |
def iter_shingles(desc): | |
md5 = hashlib.md5() | |
tokens = list(WHITESPACE.split(desc)) | |
tokens = [token.lower() for token in tokens if len(token) > 1] | |
for i in range(0, len(tokens) - 5): | |
yield " ".join(tokens[i:i+5]) | |
def hash_shingle(shingle, salt): | |
md5 = hashlib.md5() | |
md5.update(salt + shingle) | |
return int(md5.hexdigest(), 16) | |
SALTS = ["salt" + str(i) for i in range(10)] | |
def minhash(desc,): | |
res = [2**128] * len(SALTS) | |
for shingle in iter_shingles(desc): | |
for (d, salt) in enumerate(SALTS): | |
res[d] = min(res[d], hash_shingle(shingle, salt)) | |
return res | |
def pairs(l): | |
sorted_l = sorted(l) | |
for i in range(len(sorted_l)): | |
for j in range(i + 1, len(sorted_l)): | |
job_pair = (sorted_l[i], sorted_l[j]) | |
yield job_pair | |
NUM_GROUP_JOBS = 500 | |
NUM_PROCESS = 6 | |
def job_minhash(row): | |
(jobid, desc) = row | |
return (jobid, minhash(desc)) | |
def compute_duplicate_set_to_remove(jobstream, limit_similarity=2): | |
print "=========================" | |
print "minhash phase" | |
minhash_dic = defaultdict(set) | |
num_jobs = 0 | |
start = time.time() | |
pool = Pool(NUM_PROCESS) | |
for (jobid, min_hashes) in pool.imap(job_minhash, jobstream): | |
num_jobs += 1 | |
if num_jobs % NUM_GROUP_JOBS == 0: | |
stop = time.time() | |
print str(num_jobs).ljust(10), "--- %i jobs per seconds" % int(NUM_GROUP_JOBS / (stop - start)) | |
start = time.time() | |
for min_hash in min_hashes: | |
minhash_dic[min_hash].add(jobid) | |
print "=========================" | |
print "compute similarity phase" | |
similarity = Counter() | |
for jobids in minhash_dic.values(): | |
for job_pair in pairs(jobids): | |
similarity[job_pair] += 1 | |
print "=========================" | |
print "identify duplicate phase" | |
duplicates = set() | |
for sim_count in range(10, limit_similarity - 1, -1): | |
for (left, right), count in similarity.items(): | |
if count == sim_count: | |
duplicates.add(right) | |
print sim_count, len(duplicates) | |
return duplicates | |
if __name__ == "__main__": | |
import fileinput | |
import csv | |
def read_jobs(): | |
for (jobdesc, jobid) in csv.reader(fileinput.input()): | |
yield (jobid, jobdesc.decode("utf-8")) | |
duplicates = compute_duplicate_set_to_remove(read_jobs()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment