Last active
September 12, 2022 19:51
-
-
Save djs-basil-sys/6a0cd30f6d6758be4473aadb126959ed to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from datetime import datetime | |
from rapidfuzz.process import extractOne | |
from string_grouper import match_strings, group_similar_strings | |
from sys import argv | |
from time import monotonic | |
from uuid import uuid4 | |
class Constants: | |
UUID: str = str(uuid4()).split("-")[1] | |
TIMESTAMP: str = datetime.utcnow().strftime("%Y%m%dT%H%M%S") | |
MINIMUM_TOKEN_LENGTH: int = 2 | |
DEFAULT_SEPR: str = "|" | |
THRESHOLD: float = 0.75 | |
@staticmethod | |
def return_uuid() -> str: | |
return str(uuid4()).split("-")[1] | |
@staticmethod | |
def return_ts() -> str: | |
return datetime.utcnow().strftime("%Y-%m-%dT%H.%M.%S") | |
class Matchers: | |
@staticmethod | |
def levenshtein_matcher( | |
names: list[str], | |
df: pd.DataFrame, | |
find_in: str, | |
match_against: str, | |
threshold: float = Constants.THRESHOLD, | |
) -> pd.DataFrame: | |
""" | |
`find_in`: Column within which `names` may be found. | |
`match_against`: Column against which `names` are compared. | |
""" | |
matched = [] | |
for name in names: | |
against = df[df[find_in] == name][match_against].unique().tolist() | |
# Don't want to "filter out" matches, so no threshold. However, still need | |
# to pick one (and only one) match. | |
matches = extractOne(name, against) | |
if not matches: | |
matches = (None, 0.0) | |
matched.append([name, matches[0], matches[1]]) | |
return pd.DataFrame(matched, columns=[find_in, match_against, "similarity"]) | |
@staticmethod | |
def cosine_matcher( | |
df_1: pd.DataFrame, | |
col_1: str, | |
df_2: pd.DataFrame, | |
col_2: str, | |
threshold: float = Constants.THRESHOLD, | |
) -> pd.DataFrame: | |
""" | |
Use the `string_grouper.match_strings` method (which uses cosine | |
similarity) to match a "best fit" for each name. | |
""" | |
df = match_strings( | |
pd.Series(df_1[col_1].unique().tolist()).astype(str), | |
pd.Series(df_2[col_2].unique().tolist()).astype(str), | |
ignore_index=True, | |
min_similarity=threshold, | |
) | |
df = df.sort_values(by=["left_side", "similarity"], ascending=[True, False]) | |
df = df.drop_duplicates(subset="left_side") | |
df.columns = [col_1, "similarity", col_2] | |
return df | |
@staticmethod | |
def deduplicate( | |
df: pd.DataFrame, col: str, threshold: float = Constants.THRESHOLD | |
) -> pd.DataFrame: | |
df[f"deduplicated_{col}"] = group_similar_strings( | |
df[col], ignore_index=True, min_similarity=threshold | |
) | |
return df | |
@staticmethod | |
def get_best_match( | |
df_1: pd.DataFrame, | |
df_1_col: str, | |
df_2: pd.DataFrame, | |
df_2_cols: list[str, str], | |
threshold: float = Constants.THRESHOLD, | |
) -> pd.DataFrame: | |
start = monotonic() | |
print(f"Starting process to get best matching {df_1_col} from {df_2_cols[0]}.") | |
print(f"Original DataFrame has {df_1.shape[0]} records.") | |
df_1 = df_1.merge( | |
df_2[df_2_cols].drop_duplicates(), | |
how="left", | |
on=df_2_cols[0], | |
suffixes=["", "_"], | |
) | |
print( | |
f"DataFrame now has {df_1.shape[0]} following merge to secondary DataFrame." | |
) | |
counts = ( | |
df_1[df_1_col].value_counts().reset_index() | |
) # Get duplicates; given many-to-many nature of different levels, need to get best matches. | |
counts.columns = [df_1_col, "count"] | |
counts = counts[counts["count"] > 1] | |
print(f"There were {counts.shape[0]} duplicates post-merge.") | |
df_1_singl = df_1[~df_1[df_1_col].isin(counts[df_1_col])] | |
df_1_singl = df_1_singl[ | |
[df_1_col, df_2_cols[0], df_2_cols[1]] | |
].drop_duplicates() | |
print( | |
f"There are {df_1_singl.shape[0]} unique values that do not require further matching." | |
) | |
df_1_multi = df_1[df_1[df_1_col].isin(counts[df_1_col])] | |
print( | |
f"There are now {df_1_multi.shape[0]} names following merge of trade names to establishment names." | |
) | |
print(f"Getting Levenshtein distance between {df_1_col} and {df_2_cols[1]}.") | |
best_match = Matchers.levenshtein_matcher( | |
df_1_multi[df_1_col].unique().tolist(), | |
df_1_multi, | |
df_1_col, | |
df_2_cols[1], | |
threshold, | |
) | |
df_1_multi = df_1_multi.merge( | |
best_match, how="left", on=df_1_col, suffixes=["", "_"] | |
) | |
score_hierarchy = [ | |
col for col in df_1_multi.columns if col.startswith(df_2_cols[1]) | |
] | |
score_hierarchy = sorted(score_hierarchy, key=len, reverse=True) | |
df_1_multi[df_2_cols[1]] = df_1_multi[score_hierarchy[0]].fillna( | |
df_1_multi[score_hierarchy[1]] | |
) | |
df_1_multi = df_1_multi[ | |
[df_1_col, df_2_cols[0], df_2_cols[1]] | |
].drop_duplicates() | |
df = pd.concat([df_1_singl, df_1_multi]) | |
print( | |
f"Refining results complete; final DataFrame has has {df.shape[0]} records. Total process took: {monotonic() - start}" | |
) | |
return df | |
class DfReader: | |
""" | |
ERDL File: | |
registration_id|level_1|level_2|level_3 | |
7542|DENTSPLY SIRONA INC|DEGUDENT GMBH|DENTSPLY SIRONA INC | |
69194||BIO PLAS INC|BIO PLAS INC | |
6805|LUV N CARE LTD|LUV N CARE LTD|LUV N CARE LTD | |
""" | |
""" | |
Source File | |
id|name | |
1|VELA DIAGNOSTICS USA INC | |
2|OHMEDA MEDICAL | |
3|UROSURGE INC | |
""" | |
@staticmethod | |
def load_df(filepath: str) -> pd.DataFrame: | |
start = monotonic() | |
print(f"Starting load of `{filepath}`: {Constants.return_ts()}") | |
df = pd.read_csv(filepath, dtype=str, sep=Constants.DEFAULT_SEPR) | |
df = df.drop_duplicates() | |
df = df.fillna("NULL") | |
if "id" in df.columns: | |
df["id"] = df["id"].astype(int) | |
df = df.sort_values(by="id") | |
print(f"`{filepath}` loading took: {monotonic() - start}") | |
return df | |
@staticmethod | |
def write_df(df: pd.DataFrame, filepath): | |
df = df.fillna("NULL") | |
df.to_csv(filepath, sep="|", index=False) | |
print(f"DataFrame saved to:\n{filepath}") | |
def main(source_filepath: str, erdl_filepath: str, col: str, threshold: float): | |
start = monotonic() | |
threshold = float(threshold) | |
print( | |
f"Starting the creation of families against Establishment Registrations. Started: {Constants.return_ts()}" | |
) | |
df = DfReader.load_df(source_filepath) | |
print(f"Initial DataFrame has {df.shape[0]} records.") | |
temp_col = Constants.return_uuid() | |
df[temp_col] = df[col].str.replace(r"[^A-Za-z0-9\s]", " ", regex=True) | |
df[temp_col] = df[temp_col].apply(lambda x: " ".join(x.split()).strip()) | |
erdl = DfReader.load_df(erdl_filepath) | |
print(f"Establishment registrations file has {erdl.shape[0]} records.") | |
start = monotonic() | |
print("Starting initial match to business trade names and establishment names.") | |
matches_trade = Matchers.cosine_matcher(df, temp_col, erdl, "level_1", threshold) | |
print(f"Made {matches_trade.shape[0]} matches by trade name.") | |
matches_estab = Matchers.cosine_matcher(df, temp_col, erdl, "level_2", threshold) | |
matches_estab = matches_estab[[temp_col, "level_2"]].drop_duplicates() | |
print(f"Made {matches_estab.shape[0]} matches by establishment name.") | |
matches_trade = matches_trade[ | |
~matches_trade[temp_col].isin(matches_estab[temp_col]) | |
] | |
print( | |
f"Filtered duplicate matches from trade names using establishment names: {matches_trade.shape[0]}" | |
) | |
matches_trade = Matchers.get_best_match( | |
matches_trade, temp_col, erdl, ["level_1", "level_2"], threshold | |
) | |
sub_family_names = pd.concat([matches_estab, matches_trade]) | |
print( | |
f"Match to business trade names and establishment names done. Took: {monotonic() - start}" | |
) | |
start = monotonic() | |
print("Attempting to match establishment names to operators.") | |
sub_family_names = Matchers.get_best_match( | |
sub_family_names, temp_col, erdl, ["level_2", "level_3"], threshold | |
) | |
print(f"Matched {sub_family_names.shape[0]} records to an establishment.") | |
print( | |
f"Matching establishments to owner operators complete. Took: {monotonic() - start}" | |
) | |
start = monotonic() | |
print("Starting matching of companies without matched sub families.") | |
# Essentially same as merging sub_family_names into df, then getting | |
# values where level_2 or level_3 are null. | |
orphans = df[~df[temp_col].isin(sub_family_names[temp_col])] | |
print(f"There are {orphans.shape[0]} companies without a matching establishment.") | |
matches_operator = Matchers.cosine_matcher( | |
orphans, temp_col, erdl, "level_3", threshold | |
) | |
matches_operator = matches_operator[[temp_col, "level_3"]].drop_duplicates() | |
matches_operator["level_2"] = pd.NA | |
matches_operator = matches_operator[ | |
sub_family_names.columns | |
] # Make sure order of columns is the same. | |
print(f"Matched {matches_operator.shape[0]} records to an owner operator.") | |
matches = pd.concat([matches_operator, sub_family_names]) | |
print( | |
f"Matching companies to owner operators complete. Took: {monotonic() - start}" | |
) | |
start = monotonic() | |
print(f"Filling out families and removing duplicates.") | |
df = df.merge(matches, how="left", on=temp_col) | |
df["pre_family"] = df["level_3"].fillna(df[col]) | |
df = Matchers.deduplicate( | |
df, "pre_family", 0.85 | |
) # Use a 10% higher threshold for de-duplicating. | |
df = df.drop(columns=[temp_col]) | |
starting_size = len(df[col].unique()) | |
ending_size = len(df[f"deduplicated_pre_family"].unique()) | |
reduction_rate = (ending_size - starting_size) / starting_size | |
print( | |
f"Families condensed from {starting_size} to {ending_size} for a reduction rate of {abs(reduction_rate) * 100:.3f}%." | |
) | |
print(f"De-duplication complete. Took: {monotonic() - start}") | |
DfReader.write_df(df, f"{source_filepath}.{Constants.return_ts()}.processed") | |
print( | |
f"Completed creation of families. Ended: {Constants.return_ts()}. Took: {monotonic() - start}" | |
) | |
if __name__ == "__main__": | |
main(argv[1], argv[2], argv[3], argv[4]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment