Created
November 19, 2024 01:42
-
-
Save Jason-Young-AI/990f26e5ce894eca5504fc7b808ff68d to your computer and use it in GitHub Desktop.
How to get sub-clusters or super-cluster of a cluster by using HDBSCAN?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is the initial version provided by https://github.com/eamag. | |
import numpy as np | |
def find_all_subclusters(clusterer, cluster_labels): | |
tree = clusterer.condensed_tree_ | |
tree_df = tree.to_pandas() | |
def get_subclusters(node): | |
children = tree_df[tree_df['parent'] == node] | |
# If there are no children or only leaf children, return the node itself | |
if children.empty or all(children['child'] < len(cluster_labels)): | |
return {node: list(children[children['child'] < len(cluster_labels)]['child'])} | |
# Recursively get subclusters for non-leaf children | |
subclusters = {} | |
for _, child in children.iterrows(): | |
if child['child'] >= len(cluster_labels): | |
subclusters.update(get_subclusters(child['child'])) | |
else: | |
subclusters[node] = subclusters.get(node, []) + [child['child']] | |
return subclusters | |
all_subclusters = {} | |
unique_labels = np.unique(cluster_labels) | |
for label in unique_labels: | |
if label != -1: # Exclude noise points | |
cluster_points = np.where(cluster_labels == label)[0] | |
cluster_node = tree_df[tree_df['child_size'] == len(cluster_points)]['child'].iloc[0] | |
all_subclusters[label] = get_subclusters(cluster_node) | |
return all_subclusters | |
# Assuming you have already run HDBSCAN and have cluster_labels | |
# clusterer = hdbscan.HDBSCAN(min_cluster_size=15, min_samples=15) | |
# cluster_labels = clusterer.fit_predict(data) | |
all_subclusters = find_all_subclusters(clusterer, cluster_labels) | |
for cluster_label, subclusters in all_subclusters.items(): | |
print(f"\nCluster {cluster_label}:") | |
total_points = 0 | |
for subcluster, points in subclusters.items(): | |
print(f" Subcluster {subcluster}: {len(points)} points") | |
total_points += len(points) | |
print(f"Total points in all subclusters: {total_points}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is a Modified version of find_all_hdbscan_subcluster_eamags_version.py from https://github.com/youngwow. | |
# In line 32 of that file: cluster_node = tree_df[tree_df['child_size'] == len(cluster_points)]['child'].iloc[0]. | |
# "What if one have two child of the same child_size? If it only takes the first element. It seems that this is not always correct. | |
def find_all_hdbscan_subclusters(clusterer: hdbscan.HDBSCAN, cluster_labels): | |
# Modified from here: https://github.com/scikit-learn-contrib/hdbscan/issues/401#issuecomment-2389803566 | |
tree_df = clusterer.condensed_tree_.to_pandas() | |
def get_subclusters(node): | |
children = tree_df[tree_df['parent'] == node] | |
# If there are no children or only leaf children, return the node itself | |
if children.empty or all(children['child'] < len(cluster_labels)): | |
return {node: list(children[children['child'] < len(cluster_labels)]['child'])} | |
# Recursively get subclusters for non-leaf children | |
subclusters = {} | |
for _, child in children.iterrows(): | |
child_id = int(child['child']) | |
if child_id >= len(cluster_labels): | |
subclusters.update(get_subclusters(child_id)) | |
else: | |
subclusters[node] = subclusters.get(node, []) + [child_id] | |
return subclusters | |
def check_parent_recursive(points, node): | |
children = tree_df[tree_df['parent'] == node] | |
# Check if this node contains some points of label point | |
child_points = children[children['child'].isin(points)]['child'].values | |
if len(child_points) != 0: | |
return True | |
# Recursively search the parent for matching points | |
for _, child in children.iterrows(): | |
child_id = int(child['child']) | |
if child_id >= len(cluster_labels): | |
result = check_parent_recursive(points, child_id) | |
if result is not None: | |
return result | |
return False | |
all_subclusters = {} | |
unique_labels = numpy.unique(cluster_labels) | |
for label in unique_labels: | |
if label != -1: # Exclude noise points | |
cluster_points = numpy.where(cluster_labels == label)[0] | |
possible_candidates = tree_df[tree_df['child'].isin(cluster_points)]['parent'].unique() | |
candidate_found = False | |
cluster_nodes = [] | |
# The block of code that is most in use | |
for candidate in possible_candidates: | |
child_size = tree_df[tree_df['child'] == candidate]['child_size'].item() | |
if child_size == len(cluster_points): | |
cluster_nodes.append(candidate) | |
candidate_found = True | |
# In theory, the block of code isn't most in use | |
if not candidate_found: | |
for candidate in possible_candidates: | |
if check_parent_recursive(cluster_points, candidate): | |
cluster_nodes.append(candidate) | |
candidate_found = True | |
subclusters = {} | |
for cluster_node in cluster_nodes: | |
subclusters.update(get_subclusters(cluster_node)) | |
all_subclusters[label] = subclusters | |
return all_subclusters |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Reference: scikit-learn-contrib/hdbscan#401