Skip to content

Instantly share code, notes, and snippets.

@CodeByAidan
Created July 30, 2024 20:44
Show Gist options
  • Select an option

  • Save CodeByAidan/bce81f17c4e74fe83c8bbfe272b72997 to your computer and use it in GitHub Desktop.

Select an option

Save CodeByAidan/bce81f17c4e74fe83c8bbfe272b72997 to your computer and use it in GitHub Desktop.
Cluster a column based on text-similarity in a Polars DataFrame.
from collections import defaultdict
from typing import Any, Dict, List, Self, Tuple
import polars as pl
from fuzzywuzzy import fuzz, process
class NameClusterer:
def __init__(self, df: pl.DataFrame, column: str, threshold: int) -> None:
self.df: pl.DataFrame = df
self.column: str = column
self.threshold: int = threshold
self.unique_names: list[Any] = df[column].unique().to_list()
self.clusters: Dict[str, List[str]] = defaultdict(list)
self.clustered_names: List[str] = []
def __repr__(self) -> str:
return f"NameClusterer(column={self.column}, threshold={self.threshold})"
def __str__(self) -> str:
return f"NameClusterer with {len(self.unique_names)} unique names"
def __len__(self) -> int:
return len(self.unique_names)
def __contains__(self, name: str) -> bool:
return any(name in names for names in self.clusters.values())
@staticmethod
def get_matches(
name: str, unique_names: List[str], threshold: int
) -> List[Tuple[str, int]]:
matches: list[tuple[str, int]] = process.extractBests(
name, unique_names, scorer=fuzz.ratio, score_cutoff=threshold
)
print(f"Matches for {name}: {matches}")
return matches
@staticmethod
def process_match(
name: str,
match: Tuple[str, int],
clusters: Dict[str, List[str]],
unique_names: List[str],
) -> None:
match_name, _score = match
if match_name != name:
clusters[name].append(match_name)
unique_names.remove(match_name)
@staticmethod
def create_cluster_column(
df: pl.DataFrame, column: str, clusters: Dict[str, List[str]]
) -> List[str]:
cluster_column: List[str] = []
for name in df[column]:
for cluster_name, names in clusters.items():
if name in names:
cluster_column.append(cluster_name)
break
else:
cluster_column.append(name)
return cluster_column
@classmethod
def cluster_column(
cls, df: pl.DataFrame, column: str, threshold: int
) -> pl.DataFrame:
instance: Self = cls(df, column, threshold)
for name in instance.unique_names:
if name not in instance.clusters:
instance.clusters[name].append(name)
matches: List[Tuple[str, int]] = cls.get_matches(
name, instance.unique_names, threshold
)
for match in matches:
cls.process_match(
name, match, instance.clusters, instance.unique_names
)
instance.clustered_names = cls.create_cluster_column(
df, column, instance.clusters
)
df = df.with_columns(
pl.Series(instance.clustered_names).alias(f"{column}_cluster")
)
return df
df = pl.DataFrame({"names": ["Alice", "Alicia", "Bob", "Bobby", "Charlie"]})
clustered_df: pl.DataFrame = NameClusterer.cluster_column(df, "names", 70)
"""
Matches for Alice: [('Alice', 100), ('Alicia', 73)]
Matches for Bobby: [('Bobby', 100), ('Bob', 75)]
Matches for Charlie: [('Charlie', 100)]
"""
print(clustered_df)
"""
shape: (5, 2)
┌─────────┬───────────────┐
│ names ┆ names_cluster │
│ --- ┆ --- │
│ str ┆ str │
╞═════════╪═══════════════╡
│ Alice ┆ Alice │
│ Alicia ┆ Alice │
│ Bob ┆ Bobby │
│ Bobby ┆ Bobby │
│ Charlie ┆ Charlie │
└─────────┴───────────────┘
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment