Skip to content

Instantly share code, notes, and snippets.

@kzinmr
Last active September 10, 2024 10:05
Show Gist options
  • Save kzinmr/727049d3fdaf7f4baa4303a3149b2382 to your computer and use it in GitHub Desktop.
Save kzinmr/727049d3fdaf7f4baa4303a3149b2382 to your computer and use it in GitHub Desktop.
from collections import defaultdict, Counter
from operator import add
from functools import reduce
import numpy as np
from sklearn.cluster import KMeans
def dict_of_list(keys, values):
assert(len(keys) == len(values))
key2values = defaultdict(list)
for k, v in zip(keys, values):
key2values[k].append(v)
return key2values
def get_cluster2members(cluster_labels, documents):
words_list = [[w for w in s.split(' ')] for s in documents]
return dict_of_list(cluster_labels, words_list)
def kmeans_clustering(documents, encoder, n_clusters):
X = [encoder(s) for s in documents]
clusterer = KMeans(n_clusters=n_clusters, random_state=0)
cluster_labels = clusterer.fit_predict(X)
return cluster_labels
def get_cluster_df(cluster2words, vocab=None):
""" DFによるクラスタの可視化
# Internal criterion:
# (high intra-class DF * high inter-class IDF)
# cf. Silhouette Score
# (high intra-class similarity * high inter-class distance)
# cf. https://scikit-learn.org/stable/modules/generated/sklearn.metrics.silhouette_score.html
# from sklearn.metrics import silhouette_samples, silhouette_score
# silhouette_avg = silhouette_score(X, cluster_labels)
# sample_silhouette_values = silhouette_samples(X, cluster_labels)
Args:
- cluster2words: dict of クラスタid -> クラスタ内文書(単語リスト)
Returns:
- cluster2intra_cluster_df: dict of クラスタid -> クラスタ内DF; 大きいほど、クラスタをまとめる特徴
- inter_cluster_idf: クラスタ間IDF; 大きいほど、クラスタ間を分離する特徴
Note:
- クラスタ内類似度: 大きいほど、クラスタがまとまっている
- クラスタ間類似度: 小さいほど、クラスタ間が離れている
Issues:
- words==[] だと nan になる
"""
cluster2intra_cluster_df = {c: reduce(add, [Counter({w for w in ws}) for ws in ws_list])
for c, ws_list in cluster2words.items()}
N = len(cluster2words)
inter_cluster_df = reduce(add, [Counter({w for ws in ws_list for w in ws})
for c, ws_list in cluster2words.items()])
inter_cluster_idf = {w: N / df for w, df in inter_cluster_df.items()}
return cluster2intra_cluster_df, inter_cluster_idf
def get_cluster_scores(documents, cluster_labels, topk=3):
""" クラスタをintra/inter-dfスコアでソート """
cluster2words = get_cluster2members(cluster_labels, documents)
cluster2intra_cluster_df, inter_cluster_idf = get_cluster_df(cluster2words)
cluster_scores = []
for c, words_list in cluster2words.items():
intra_cluster_df = cluster2intra_cluster_df[c]
M = len(words_list)
repr_scores = sorted([df / M * inter_cluster_idf[w]
for w, df in filter(lambda x:x[1] > 1, intra_cluster_df.items())],
reverse=True)
repr_score = np.mean(repr_scores[:topk])
cluster_scores.append((c, repr_score))
return sorted(cluster_scores, key=lambda x: - (x[1] if not np.isnan(x[1]) else 0.))
@kzinmr
Copy link
Author

kzinmr commented Feb 26, 2020

Usage with kmeans_clustering():

def document_clustering_scoring(documents, K, encoder):
    """
    Args: 
     - documents: (N,)
     - texts: (N,)
     - K: (int) n_clusters
     - encoder: document -> vector
    Returns:
     - cluster_labels: (N,)
     - cluster_scores: (K,)
    """

    # Clustering (doc=='' -> cluster_id==-1)
    cluster_labels = kmeans_clustering(documents, encoder, n_clusters=K)
    # Scoring
    cluster_scores = get_cluster_scores(documents, cluster_labels, topk=3)
    return cluster_labels, cluster_scores```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment