Skip to content

Instantly share code, notes, and snippets.

@kerin
Created May 27, 2020 15:55
Show Gist options
  • Save kerin/d67cde8363fe0c44dcb796da94407eb7 to your computer and use it in GitHub Desktop.
Save kerin/d67cde8363fe0c44dcb796da94407eb7 to your computer and use it in GitHub Desktop.
# coding=utf-8
import os
import csv
import re
import logging
import optparse
import dedupe
from unidecode import unidecode
# Do a little bit of data cleaning with the help of Unidecode and Regex.
# Things like casing, extra spaces, quotes and new lines can be ignored.
def preProcess(column):
try:
column = unidecode(column)
column = re.sub(" +", " ", column)
column = re.sub("\n", " ", column)
column = column.strip().strip('"').strip("'").lower().strip()
except AttributeError:
pass
# If data is missing, indicate that by setting the value to None
if not column:
column = None
return column
# Read in our data from a CSV file and create a dictionary of records,
# where the key is a unique record ID and each value is dict
def readData(filename):
data_d = {}
with open(filename) as f:
reader = csv.DictReader(f)
for row in reader:
# print(row["goods_value"])
row["goods_value"] = float(row["goods_value"])
clean_row = [(k, preProcess(v)) for (k, v) in row.items()]
row_id = int(row["id"])
data_d[row_id] = dict(clean_row)
return data_d
if __name__ == "__main__":
# Dedupe uses Python logging to show or suppress verbose output.
# This code block lets you change the level of loggin on the command line.
# You don’t need it if you don’t want that. To enable verbose logging,
# run python examples/csv_example/csv_example.py -v
optp = optparse.OptionParser()
optp.add_option(
"-v",
"--verbose",
dest="verbose",
action="count",
help="Increase verbosity (specify multiple times for more)",
)
(opts, args) = optp.parse_args()
log_level = logging.WARNING
if opts.verbose:
if opts.verbose == 1:
log_level = logging.INFO
elif opts.verbose >= 2:
log_level = logging.DEBUG
logging.getLogger().setLevel(log_level)
input_file = "application_detail_goods_andor.csv"
output_file = "application_detail_goods_andor_output.csv"
settings_file = "application_detail_goods_andor_learned_settings"
training_file = "application_detail_goods_andor_training.json"
print("importing data ...")
data_d = readData(input_file)
# If a settings file already exists, we’ll just load that and skip training
if os.path.exists(settings_file):
print("reading from", settings_file)
with open(settings_file, "rb") as f:
deduper = dedupe.StaticDedupe(f)
else:
# Training
# Define the fields dedupe will pay attention to
fields = [
{"field": "description", "type": "String"},
{"field": "part_no", "type": "String"},
{"field": "export_control_entry", "type": "String"},
{"field": "manufacturers_homepage", "type": "String"},
{"field": "item_no", "type": "String"},
{"field": "dti_comment", "type": "String"},
{"field": "technical_description", "type": "String"},
{"field": "goods_value", "type": "Price"},
]
# Create a new deduper object and pass our data model to it.
deduper = dedupe.Dedupe(fields)
# If we have training data saved from a previous run of dedupe,
# look for it and load it in. Note: if you want to train from
# scratch, delete the training_file
if os.path.exists(training_file):
print("reading labeled examples from ", training_file)
with open(training_file, "rb") as f:
deduper.prepare_training(data_d, f)
else:
deduper.prepare_training(data_d)
# Active learning
# Dedupe will find the next pair of records it is least certain
# about and ask you to label them as duplicates or not.
# use ‘y’, ‘n’ and ‘u’ keys to flag duplicates press ‘f’ when you are
# finished
print("starting active labeling...")
dedupe.console_label(deduper)
# Using the examples we just labeled, train the deduper and learn
# blocking predicates
deduper.train()
# When finished, save our training to disk
with open(training_file, "w") as tf:
deduper.write_training(tf)
# Save our weights and predicates to disk. If the settings file
# exists, we will skip all the training and learning next time we
# run this file.
with open(settings_file, "wb") as sf:
deduper.write_settings(sf)
# Clustering
# partition will return sets of records that dedupe believes are
# all referring to the same entity.
print("clustering...")
clustered_dupes = deduper.partition(data_d, 0.5)
print("# duplicate sets", len(clustered_dupes))
# Writing Results
# Write our original data back out to a CSV with a new column called
# ‘Cluster ID’ which indicates which records refer to each other.
cluster_membership = {}
for cluster_id, (records, scores) in enumerate(clustered_dupes):
for record_id, score in zip(records, scores):
cluster_membership[record_id] = {
"Cluster ID": cluster_id,
"confidence_score": score,
}
with open(output_file, "w") as f_output, open(input_file) as f_input:
reader = csv.DictReader(f_input)
fieldnames = ["Cluster ID", "confidence_score"] + reader.fieldnames
writer = csv.DictWriter(f_output, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
row_id = int(row["id"])
row.update(cluster_membership[row_id])
writer.writerow(row)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment