Last active
September 12, 2022 19:50
-
-
Save djszemiako/648d6089b5a73629e21a84234a879e98 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from datetime import datetime | |
from itertools import combinations | |
from math import comb | |
from sys import argv | |
from time import monotonic | |
from typing import List | |
MAX_FAMILY_TOKENS = 6 | |
GROUP_SIZES = 2 | |
GROUPS = comb(MAX_FAMILY_TOKENS, GROUP_SIZES) | |
class Constants: | |
TIMESTAMP: str = datetime.utcnow().strftime("%Y%m%dT%H%M%S") | |
MINIMUM_TOKEN_LENGTH: int = 2 | |
DEFAULT_SEPR: str = "|" | |
THRESHOLD: float = 0.75 | |
@staticmethod | |
def return_ts() -> str: | |
return datetime.utcnow().strftime("%Y-%m-%dT%H.%M.%S") | |
def get_permutations(string: str) -> List[str]: | |
""" | |
Get the possible permutations of a given string with size `GROUP_SIZES`. | |
""" | |
tokens = str(string).split(" ") | |
combos = combinations(tokens, GROUP_SIZES) | |
combos = [" ".join(sorted(combo)) for combo in combos] | |
if not combos: | |
combos = [string] | |
combos = combos + [None] * (GROUPS - len(combos)) | |
return [string] + combos | |
def make_permutations_matrix(family_names: List[str]) -> pd.DataFrame: | |
""" | |
Apply the `get_permutations` method to a `list` of family names, in order to | |
make a DataFrame. | |
""" | |
data = [get_permutations(family_name) for family_name in family_names] | |
cols = [f"permutation_{i}" for i in range(1, GROUPS + 1)] | |
df_cols = ["__family_name__"] + cols | |
df = pd.DataFrame(data, columns=df_cols) | |
for i in range(len(cols)): | |
df[cols[i]] = df[cols[i]].fillna(df[cols[i - 1]]) | |
return df | |
def get_counts(permutation_df: pd.DataFrame, cols: List[str]) -> pd.DataFrame: | |
""" | |
Create a column of `counts` for each token of size `GROUP_SIZES`. | |
""" | |
tokens = permutation_df[cols].stack().reset_index() | |
tokens = tokens.iloc[:, -1] | |
counts = tokens.value_counts().reset_index() | |
counts.columns = ["tokens", "counts"] | |
return counts | |
def apply_counts(df: pd.DataFrame, col_name: str) -> pd.DataFrame: | |
""" | |
Count the most popular tokens across all columns, but then apply the counts | |
to each column as well (so that we can later "pick" the most popular column). | |
""" | |
families = df[col_name].unique().tolist() | |
permutation_df = make_permutations_matrix(families) | |
p_cols = [col for col in permutation_df.columns if col.startswith("permutation")] | |
counts = get_counts(permutation_df, p_cols) | |
for col in p_cols: | |
permutation_df = permutation_df.merge( | |
counts, left_on=[col], right_on=["tokens"], suffixes=["", "_count"] | |
) | |
permutation_df = permutation_df.drop(columns=[col]) | |
c_cols = [f"{col}_count" for col in p_cols] | |
permutation_col = [None] * (len(p_cols) + len(c_cols)) | |
permutation_col[::2] = p_cols | |
permutation_col[1::2] = c_cols | |
permutation_col = [col_name] + permutation_col | |
permutation_df.columns = permutation_col | |
return permutation_df | |
def get_most_frequent_value(df: pd.DataFrame) -> pd.DataFrame: | |
""" | |
Pick the column that has the highest "count"; in other words, pick the | |
most popular "token" for a given `name`. | |
""" | |
c_cols = [ | |
col | |
for col in df.columns | |
if col.startswith("permutation") and col.endswith("count") | |
] | |
df["max_col"] = df[c_cols].idxmax(axis=1) | |
df["max_col"] = df["max_col"].str.replace("_count", "") | |
idx_max_series = df["max_col"] | |
df["max_val"] = df.lookup(idx_max_series.index, idx_max_series.values) | |
return df | |
def main(filepath: str, data_type: str, col_name: str): | |
""" | |
id|name|brand_name|device_name | |
1|CARDINAL HEALTH SUTURE|CARDINAL|SUTURE | |
2|DREAMSTATION|DREAMSTATION|| | |
3|ADHESIVE BANDAGES||ADHESIVE BANDAGES | |
""" | |
start = monotonic() | |
df = pd.read_csv(filepath, sep="|", dtype=str) | |
end = monotonic() - start | |
print(f"Loading data took: {end}") | |
start = monotonic() | |
counts = apply_counts(df, col_name) | |
counts = get_most_frequent_value(counts) | |
print(f"Counts: {counts.shape}") | |
end = monotonic() - start | |
print(f"Getting most frequent values took: {end}") | |
temp = counts[[col_name, "max_val"]] | |
temp = temp.merge( | |
df, left_on=col_name, right_on=col_name, suffixes=["", "_"], how="left" | |
) | |
temp = temp[[col_name, "max_val"]] | |
temp.columns = ["new", "max_val"] | |
temp = temp.drop_duplicates() | |
temp["spaces"] = temp["new"].str.count(" ") | |
temp["length"] = temp["new"].str.len() | |
temp = temp.sort_values(by=["spaces", "length", "new"]) | |
common = temp.groupby("max_val").first().reset_index() | |
start = monotonic() | |
print(f"Common: {common.shape}") | |
counts = counts.merge( | |
common, | |
left_on=["max_val"], | |
right_on=["max_val"], | |
suffixes=["", "_"], | |
how="left", | |
) | |
end = monotonic() - start | |
print(f"Making patterns took: {end}") | |
start = monotonic() | |
end = monotonic() - start | |
print(f"Matching data took: {end}") | |
df = df.merge( | |
counts[[col_name, "max_val", "new"]], | |
left_on=[col_name], | |
right_on=[col_name], | |
how="left", | |
) | |
df["new"] = df["new"].fillna(df[col_name]).str.upper() | |
df.to_csv(f"{filepath}.{Constants.return_ts()}.tokens", sep="|", index=False) | |
if __name__ == "__main__": | |
main(argv[1], argv[2], argv[3]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment