Created
July 30, 2024 20:44
-
-
Save CodeByAidan/bce81f17c4e74fe83c8bbfe272b72997 to your computer and use it in GitHub Desktop.
Cluster a column based on text-similarity in a Polars DataFrame.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from collections import defaultdict | |
| from typing import Any, Dict, List, Self, Tuple | |
| import polars as pl | |
| from fuzzywuzzy import fuzz, process | |
| class NameClusterer: | |
| def __init__(self, df: pl.DataFrame, column: str, threshold: int) -> None: | |
| self.df: pl.DataFrame = df | |
| self.column: str = column | |
| self.threshold: int = threshold | |
| self.unique_names: list[Any] = df[column].unique().to_list() | |
| self.clusters: Dict[str, List[str]] = defaultdict(list) | |
| self.clustered_names: List[str] = [] | |
| def __repr__(self) -> str: | |
| return f"NameClusterer(column={self.column}, threshold={self.threshold})" | |
| def __str__(self) -> str: | |
| return f"NameClusterer with {len(self.unique_names)} unique names" | |
| def __len__(self) -> int: | |
| return len(self.unique_names) | |
| def __contains__(self, name: str) -> bool: | |
| return any(name in names for names in self.clusters.values()) | |
| @staticmethod | |
| def get_matches( | |
| name: str, unique_names: List[str], threshold: int | |
| ) -> List[Tuple[str, int]]: | |
| matches: list[tuple[str, int]] = process.extractBests( | |
| name, unique_names, scorer=fuzz.ratio, score_cutoff=threshold | |
| ) | |
| print(f"Matches for {name}: {matches}") | |
| return matches | |
| @staticmethod | |
| def process_match( | |
| name: str, | |
| match: Tuple[str, int], | |
| clusters: Dict[str, List[str]], | |
| unique_names: List[str], | |
| ) -> None: | |
| match_name, _score = match | |
| if match_name != name: | |
| clusters[name].append(match_name) | |
| unique_names.remove(match_name) | |
| @staticmethod | |
| def create_cluster_column( | |
| df: pl.DataFrame, column: str, clusters: Dict[str, List[str]] | |
| ) -> List[str]: | |
| cluster_column: List[str] = [] | |
| for name in df[column]: | |
| for cluster_name, names in clusters.items(): | |
| if name in names: | |
| cluster_column.append(cluster_name) | |
| break | |
| else: | |
| cluster_column.append(name) | |
| return cluster_column | |
| @classmethod | |
| def cluster_column( | |
| cls, df: pl.DataFrame, column: str, threshold: int | |
| ) -> pl.DataFrame: | |
| instance: Self = cls(df, column, threshold) | |
| for name in instance.unique_names: | |
| if name not in instance.clusters: | |
| instance.clusters[name].append(name) | |
| matches: List[Tuple[str, int]] = cls.get_matches( | |
| name, instance.unique_names, threshold | |
| ) | |
| for match in matches: | |
| cls.process_match( | |
| name, match, instance.clusters, instance.unique_names | |
| ) | |
| instance.clustered_names = cls.create_cluster_column( | |
| df, column, instance.clusters | |
| ) | |
| df = df.with_columns( | |
| pl.Series(instance.clustered_names).alias(f"{column}_cluster") | |
| ) | |
| return df | |
| df = pl.DataFrame({"names": ["Alice", "Alicia", "Bob", "Bobby", "Charlie"]}) | |
| clustered_df: pl.DataFrame = NameClusterer.cluster_column(df, "names", 70) | |
| """ | |
| Matches for Alice: [('Alice', 100), ('Alicia', 73)] | |
| Matches for Bobby: [('Bobby', 100), ('Bob', 75)] | |
| Matches for Charlie: [('Charlie', 100)] | |
| """ | |
| print(clustered_df) | |
| """ | |
| shape: (5, 2) | |
| ┌─────────┬───────────────┐ | |
| │ names ┆ names_cluster │ | |
| │ --- ┆ --- │ | |
| │ str ┆ str │ | |
| ╞═════════╪═══════════════╡ | |
| │ Alice ┆ Alice │ | |
| │ Alicia ┆ Alice │ | |
| │ Bob ┆ Bobby │ | |
| │ Bobby ┆ Bobby │ | |
| │ Charlie ┆ Charlie │ | |
| └─────────┴───────────────┘ | |
| """ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment