Last active
September 10, 2024 10:05
-
-
Save kzinmr/727049d3fdaf7f4baa4303a3149b2382 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict, Counter | |
from operator import add | |
from functools import reduce | |
import numpy as np | |
from sklearn.cluster import KMeans | |
def dict_of_list(keys, values): | |
assert(len(keys) == len(values)) | |
key2values = defaultdict(list) | |
for k, v in zip(keys, values): | |
key2values[k].append(v) | |
return key2values | |
def get_cluster2members(cluster_labels, documents): | |
words_list = [[w for w in s.split(' ')] for s in documents] | |
return dict_of_list(cluster_labels, words_list) | |
def kmeans_clustering(documents, encoder, n_clusters): | |
X = [encoder(s) for s in documents] | |
clusterer = KMeans(n_clusters=n_clusters, random_state=0) | |
cluster_labels = clusterer.fit_predict(X) | |
return cluster_labels | |
def get_cluster_df(cluster2words, vocab=None): | |
""" DFによるクラスタの可視化 | |
# Internal criterion: | |
# (high intra-class DF * high inter-class IDF) | |
# cf. Silhouette Score | |
# (high intra-class similarity * high inter-class distance) | |
# cf. https://scikit-learn.org/stable/modules/generated/sklearn.metrics.silhouette_score.html | |
# from sklearn.metrics import silhouette_samples, silhouette_score | |
# silhouette_avg = silhouette_score(X, cluster_labels) | |
# sample_silhouette_values = silhouette_samples(X, cluster_labels) | |
Args: | |
- cluster2words: dict of クラスタid -> クラスタ内文書(単語リスト) | |
Returns: | |
- cluster2intra_cluster_df: dict of クラスタid -> クラスタ内DF; 大きいほど、クラスタをまとめる特徴 | |
- inter_cluster_idf: クラスタ間IDF; 大きいほど、クラスタ間を分離する特徴 | |
Note: | |
- クラスタ内類似度: 大きいほど、クラスタがまとまっている | |
- クラスタ間類似度: 小さいほど、クラスタ間が離れている | |
Issues: | |
- words==[] だと nan になる | |
""" | |
cluster2intra_cluster_df = {c: reduce(add, [Counter({w for w in ws}) for ws in ws_list]) | |
for c, ws_list in cluster2words.items()} | |
N = len(cluster2words) | |
inter_cluster_df = reduce(add, [Counter({w for ws in ws_list for w in ws}) | |
for c, ws_list in cluster2words.items()]) | |
inter_cluster_idf = {w: N / df for w, df in inter_cluster_df.items()} | |
return cluster2intra_cluster_df, inter_cluster_idf | |
def get_cluster_scores(documents, cluster_labels, topk=3): | |
""" クラスタをintra/inter-dfスコアでソート """ | |
cluster2words = get_cluster2members(cluster_labels, documents) | |
cluster2intra_cluster_df, inter_cluster_idf = get_cluster_df(cluster2words) | |
cluster_scores = [] | |
for c, words_list in cluster2words.items(): | |
intra_cluster_df = cluster2intra_cluster_df[c] | |
M = len(words_list) | |
repr_scores = sorted([df / M * inter_cluster_idf[w] | |
for w, df in filter(lambda x:x[1] > 1, intra_cluster_df.items())], | |
reverse=True) | |
repr_score = np.mean(repr_scores[:topk]) | |
cluster_scores.append((c, repr_score)) | |
return sorted(cluster_scores, key=lambda x: - (x[1] if not np.isnan(x[1]) else 0.)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage with kmeans_clustering():