Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save Jason-Young-AI/990f26e5ce894eca5504fc7b808ff68d to your computer and use it in GitHub Desktop.
Save Jason-Young-AI/990f26e5ce894eca5504fc7b808ff68d to your computer and use it in GitHub Desktop.
How to get sub-clusters or super-cluster of a cluster by using HDBSCAN?
# This is the initial version provided by https://github.com/eamag.
import numpy as np
def find_all_subclusters(clusterer, cluster_labels):
tree = clusterer.condensed_tree_
tree_df = tree.to_pandas()
def get_subclusters(node):
children = tree_df[tree_df['parent'] == node]
# If there are no children or only leaf children, return the node itself
if children.empty or all(children['child'] < len(cluster_labels)):
return {node: list(children[children['child'] < len(cluster_labels)]['child'])}
# Recursively get subclusters for non-leaf children
subclusters = {}
for _, child in children.iterrows():
if child['child'] >= len(cluster_labels):
subclusters.update(get_subclusters(child['child']))
else:
subclusters[node] = subclusters.get(node, []) + [child['child']]
return subclusters
all_subclusters = {}
unique_labels = np.unique(cluster_labels)
for label in unique_labels:
if label != -1: # Exclude noise points
cluster_points = np.where(cluster_labels == label)[0]
cluster_node = tree_df[tree_df['child_size'] == len(cluster_points)]['child'].iloc[0]
all_subclusters[label] = get_subclusters(cluster_node)
return all_subclusters
# Assuming you have already run HDBSCAN and have cluster_labels
# clusterer = hdbscan.HDBSCAN(min_cluster_size=15, min_samples=15)
# cluster_labels = clusterer.fit_predict(data)
all_subclusters = find_all_subclusters(clusterer, cluster_labels)
for cluster_label, subclusters in all_subclusters.items():
print(f"\nCluster {cluster_label}:")
total_points = 0
for subcluster, points in subclusters.items():
print(f" Subcluster {subcluster}: {len(points)} points")
total_points += len(points)
print(f"Total points in all subclusters: {total_points}")
# This is a Modified version of find_all_hdbscan_subcluster_eamags_version.py from https://github.com/youngwow.
# In line 32 of that file: cluster_node = tree_df[tree_df['child_size'] == len(cluster_points)]['child'].iloc[0].
# "What if one have two child of the same child_size? If it only takes the first element. It seems that this is not always correct.
def find_all_hdbscan_subclusters(clusterer: hdbscan.HDBSCAN, cluster_labels):
# Modified from here: https://github.com/scikit-learn-contrib/hdbscan/issues/401#issuecomment-2389803566
tree_df = clusterer.condensed_tree_.to_pandas()
def get_subclusters(node):
children = tree_df[tree_df['parent'] == node]
# If there are no children or only leaf children, return the node itself
if children.empty or all(children['child'] < len(cluster_labels)):
return {node: list(children[children['child'] < len(cluster_labels)]['child'])}
# Recursively get subclusters for non-leaf children
subclusters = {}
for _, child in children.iterrows():
child_id = int(child['child'])
if child_id >= len(cluster_labels):
subclusters.update(get_subclusters(child_id))
else:
subclusters[node] = subclusters.get(node, []) + [child_id]
return subclusters
def check_parent_recursive(points, node):
children = tree_df[tree_df['parent'] == node]
# Check if this node contains some points of label point
child_points = children[children['child'].isin(points)]['child'].values
if len(child_points) != 0:
return True
# Recursively search the parent for matching points
for _, child in children.iterrows():
child_id = int(child['child'])
if child_id >= len(cluster_labels):
result = check_parent_recursive(points, child_id)
if result is not None:
return result
return False
all_subclusters = {}
unique_labels = numpy.unique(cluster_labels)
for label in unique_labels:
if label != -1: # Exclude noise points
cluster_points = numpy.where(cluster_labels == label)[0]
possible_candidates = tree_df[tree_df['child'].isin(cluster_points)]['parent'].unique()
candidate_found = False
cluster_nodes = []
# The block of code that is most in use
for candidate in possible_candidates:
child_size = tree_df[tree_df['child'] == candidate]['child_size'].item()
if child_size == len(cluster_points):
cluster_nodes.append(candidate)
candidate_found = True
# In theory, the block of code isn't most in use
if not candidate_found:
for candidate in possible_candidates:
if check_parent_recursive(cluster_points, candidate):
cluster_nodes.append(candidate)
candidate_found = True
subclusters = {}
for cluster_node in cluster_nodes:
subclusters.update(get_subclusters(cluster_node))
all_subclusters[label] = subclusters
return all_subclusters
@Jason-Young-AI
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment