Created
May 27, 2020 15:55
-
-
Save kerin/d67cde8363fe0c44dcb796da94407eb7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
import os | |
import csv | |
import re | |
import logging | |
import optparse | |
import dedupe | |
from unidecode import unidecode | |
# Do a little bit of data cleaning with the help of Unidecode and Regex. | |
# Things like casing, extra spaces, quotes and new lines can be ignored. | |
def preProcess(column): | |
try: | |
column = unidecode(column) | |
column = re.sub(" +", " ", column) | |
column = re.sub("\n", " ", column) | |
column = column.strip().strip('"').strip("'").lower().strip() | |
except AttributeError: | |
pass | |
# If data is missing, indicate that by setting the value to None | |
if not column: | |
column = None | |
return column | |
# Read in our data from a CSV file and create a dictionary of records, | |
# where the key is a unique record ID and each value is dict | |
def readData(filename): | |
data_d = {} | |
with open(filename) as f: | |
reader = csv.DictReader(f) | |
for row in reader: | |
# print(row["goods_value"]) | |
row["goods_value"] = float(row["goods_value"]) | |
clean_row = [(k, preProcess(v)) for (k, v) in row.items()] | |
row_id = int(row["id"]) | |
data_d[row_id] = dict(clean_row) | |
return data_d | |
if __name__ == "__main__": | |
# Dedupe uses Python logging to show or suppress verbose output. | |
# This code block lets you change the level of loggin on the command line. | |
# You don’t need it if you don’t want that. To enable verbose logging, | |
# run python examples/csv_example/csv_example.py -v | |
optp = optparse.OptionParser() | |
optp.add_option( | |
"-v", | |
"--verbose", | |
dest="verbose", | |
action="count", | |
help="Increase verbosity (specify multiple times for more)", | |
) | |
(opts, args) = optp.parse_args() | |
log_level = logging.WARNING | |
if opts.verbose: | |
if opts.verbose == 1: | |
log_level = logging.INFO | |
elif opts.verbose >= 2: | |
log_level = logging.DEBUG | |
logging.getLogger().setLevel(log_level) | |
input_file = "application_detail_goods_andor.csv" | |
output_file = "application_detail_goods_andor_output.csv" | |
settings_file = "application_detail_goods_andor_learned_settings" | |
training_file = "application_detail_goods_andor_training.json" | |
print("importing data ...") | |
data_d = readData(input_file) | |
# If a settings file already exists, we’ll just load that and skip training | |
if os.path.exists(settings_file): | |
print("reading from", settings_file) | |
with open(settings_file, "rb") as f: | |
deduper = dedupe.StaticDedupe(f) | |
else: | |
# Training | |
# Define the fields dedupe will pay attention to | |
fields = [ | |
{"field": "description", "type": "String"}, | |
{"field": "part_no", "type": "String"}, | |
{"field": "export_control_entry", "type": "String"}, | |
{"field": "manufacturers_homepage", "type": "String"}, | |
{"field": "item_no", "type": "String"}, | |
{"field": "dti_comment", "type": "String"}, | |
{"field": "technical_description", "type": "String"}, | |
{"field": "goods_value", "type": "Price"}, | |
] | |
# Create a new deduper object and pass our data model to it. | |
deduper = dedupe.Dedupe(fields) | |
# If we have training data saved from a previous run of dedupe, | |
# look for it and load it in. Note: if you want to train from | |
# scratch, delete the training_file | |
if os.path.exists(training_file): | |
print("reading labeled examples from ", training_file) | |
with open(training_file, "rb") as f: | |
deduper.prepare_training(data_d, f) | |
else: | |
deduper.prepare_training(data_d) | |
# Active learning | |
# Dedupe will find the next pair of records it is least certain | |
# about and ask you to label them as duplicates or not. | |
# use ‘y’, ‘n’ and ‘u’ keys to flag duplicates press ‘f’ when you are | |
# finished | |
print("starting active labeling...") | |
dedupe.console_label(deduper) | |
# Using the examples we just labeled, train the deduper and learn | |
# blocking predicates | |
deduper.train() | |
# When finished, save our training to disk | |
with open(training_file, "w") as tf: | |
deduper.write_training(tf) | |
# Save our weights and predicates to disk. If the settings file | |
# exists, we will skip all the training and learning next time we | |
# run this file. | |
with open(settings_file, "wb") as sf: | |
deduper.write_settings(sf) | |
# Clustering | |
# partition will return sets of records that dedupe believes are | |
# all referring to the same entity. | |
print("clustering...") | |
clustered_dupes = deduper.partition(data_d, 0.5) | |
print("# duplicate sets", len(clustered_dupes)) | |
# Writing Results | |
# Write our original data back out to a CSV with a new column called | |
# ‘Cluster ID’ which indicates which records refer to each other. | |
cluster_membership = {} | |
for cluster_id, (records, scores) in enumerate(clustered_dupes): | |
for record_id, score in zip(records, scores): | |
cluster_membership[record_id] = { | |
"Cluster ID": cluster_id, | |
"confidence_score": score, | |
} | |
with open(output_file, "w") as f_output, open(input_file) as f_input: | |
reader = csv.DictReader(f_input) | |
fieldnames = ["Cluster ID", "confidence_score"] + reader.fieldnames | |
writer = csv.DictWriter(f_output, fieldnames=fieldnames) | |
writer.writeheader() | |
for row in reader: | |
row_id = int(row["id"]) | |
row.update(cluster_membership[row_id]) | |
writer.writerow(row) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment