Skip to content

Instantly share code, notes, and snippets.

@laminko
Last active January 4, 2016 11:49
Show Gist options
  • Save laminko/8617711 to your computer and use it in GitHub Desktop.
Save laminko/8617711 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 24 18:01:21 2014
@author: lmk
@description: k-means clustering
"""
#import sys
import os
import traceback
import itertools
import argparse
import chardet
#from datetime import datetime
#from multiprocessing import Pool
from operator import itemgetter
from collections import Counter
#import zerorpc
#import numpy
from sklearn.decomposition import RandomizedPCA
from sklearn.feature_extraction.text import HashingVectorizer
try:
print 'using custext.'
from sklearn.feature_extraction.custext import CountVectorizer
except:
print 'can\'t use custext. using text.'
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.cluster import MiniBatchKMeans
#from sklearn.externals import joblib
def recursiveGetFileList(fpath):
""" Return list. Get files recursively."""
return sorted([os.path.join(root,f)
for root, dirs, files in os.walk(fpath) for f in files ])
def yielder(iterator):
""" convert list into generator."""
for each in iterator:
yield each
def combinePooledLst(pLst):
""" combine list of lists into list. """
return list(itertools.chain(*pLst))
def chunks(l, n):
""" Return lists. Separate List with 'n' size. """
for i in xrange(0, len(l), n):
yield(l[i: i + n])
def groupSafeLimit(lst, chunk_size=5000, iterator=False):
""" list into chunks. """
chunked_lists = chunks(lst, chunk_size)
if not iterator:
chunked_lists = list(chunked_lists)
return chunked_lists
def sortByIndex(_array, _index, _asc):
""" sort list of list(s) by index. """
return sorted(_array, key=itemgetter(_index), reverse=not _asc)
def stringEncodingHandler(data, charset="utf_8", charset_error="strict",
path = None, verbose=True):
"""
stringEncodingHandler
parameter(s)
============
charset <str> name of charset.
charset_error <str> mode of charset error.
path <str> the path where the data is read from.
verbose <bool> print the errors, messages, etc.
return <str> decoded string.
"""
if not charset == "auto":
try:
data = data.decode(charset, charset_error)
except:
if verbose:
print "charset", charset, "### ! path:", path
data = data.decode(charset, "ignore")
else:
charset = "utf_8" ### fall-back charset
try:
tmp_cherr = charset_error
tmp_charset = chardet.detect(data).get('encoding') or charset
cslower = tmp_charset.lower()
cjkencoding = ["big5", "shift_jis"]
engencoding = ["ascii"]
if cslower in cjkencoding + engencoding:
if cslower in cjkencoding:
tmp_cherr= "ignore"
tmp_charset = charset
data = data.decode(tmp_charset, tmp_cherr)
except:
if verbose:
print "charset", charset, '###! Full Path:', path
data = data.decode(charset, "ignore")
return data
def readFileWithEncodingHandler(path,
charset="utf_8",
charset_error="strict"):
""" Read file and handle it's encoding. """
data = open(path, 'rb').read()
data = stringEncodingHandler(data,
charset=charset,
charset_error=charset_error,
path=path,
verbose=False)
return data
def reduce_dimension(X_train, n_comp=2):
""" Reduce dimensions according to n_comp. """
rpca = RandomizedPCA(n_components=n_comp)
return rpca.fit_transform(X_train)
def getTopTerms(terms, sortIndex=1, asc=True, Most_common=50):
""" get most common terms and sort by given index (of terms or count). """
try:
return [ sortByIndex(_terms.most_common(Most_common),
sortIndex, asc) \
for _terms in terms ]
except:
print traceback.format_exc()
return []
class clusterP(object):
def __init__(self, directory=None, verbose=False, batch=False,
hashfeature=5000, kmncluster=15, voronoipath="voronoi.json",
size=5000):
self.verbose = verbose
self.directory = directory
self.hashfeature = hashfeature
self.kmncluster = kmncluster
self.voronoipath = voronoipath
self.batch = batch
self.vectorizer = None
self.counter = None
self.kmeans = None
self.size = size
def collectDocs(self, paths):
for path in paths:
yield readFileWithEncodingHandler(path)
def generateVectorizer(self, stop_words='english',charset_error='ignore'):
self.vectorizer = HashingVectorizer(stop_words=stop_words,
n_features=self.hashfeature,
charset_error=charset_error)
if self.verbose:
print self.vectorizer
return self.vectorizer
def generateCountVectorizer(self, charset_error="ignore",
max_df=0.7, stop_words="english",
token_pattern=ur"\b([a-zA-Z]{3,})\b"):
self.counter = CountVectorizer(stop_words=stop_words,
max_df=max_df,
charset_error=charset_error,
token_pattern=token_pattern)
if self.verbose:
print self.counter
return self.counter
def generateMBKMeans(self, init='k-means++', n_init=1, init_size=1000,
batch_size=1000, verbose=1):
self.kmeans = MiniBatchKMeans(n_clusters=self.kmncluster,
init=init,
n_init=n_init,
init_size=init_size,
batch_size=batch_size,
verbose=verbose)
if self.verbose:
print self.kmeans
return self.kmeans
def vectorizeChunks(self, paths):
for path_group in paths:
yield reduce_dimension(self.vectorizer.fit_transform(self.collectDocs(path_group)))
def partialFitChunks(self, chunks):
""" MBKmean partial fit vectorized chunks."""
for e, each_chunk in enumerate(chunks):
if self.verbose:
print 'current chunkID:', e
print each_chunk
print each_chunk.tolist()[:10]
self.kmeans.partial_fit(each_chunk)
if self.verbose:
print 'no. of label:', len(self.kmeans.labels_.tolist() or [])
print 'clustered docs:', self.kmeans.counts_
print 'total docs processed:', sum(self.kmeans.counts_.tolist())
predicted = self.kmeans.predict(each_chunk)
if self.verbose:
predicted_tolist = predicted.tolist()
print 'Total predicted docs:', len(predicted_tolist)
counter = Counter(predicted_tolist)
print 'By Cluster:',sortByIndex(counter.items(), 0, True)
# print predicted.tolist()
def selectDocsByCluster(self, chunks):
centroids = self.kmeans.cluster_centers_
labels = self.kmeans.labels_
if self.verbose:
print 'labels:', len(labels.tolist())
checker_docs_count = 0
collected_data = dict()
for e, centroid in enumerate(centroids):
members = labels == e
for each_chunk in chunks:
docs_by_cluster = each_chunk[members]
if e in collected_data.keys():
collected_data[e].extend(docs_by_cluster.tolist())
else:
collected_data.update({e:docs_by_cluster.tolist()})
if self.verbose:
print 'Members:', len(members.tolist()), type(members)
total_selected = len(collected_data.get(e) or [])
print 'clusterID:', e, "Total Docs:", total_selected
checker_docs_count += total_selected
print 'check docs count:', checker_docs_count
def partialFitAndCluster(self, chunks):
checker_docs_count = 0
collected_data = dict()
for e, each_chunk in enumerate(chunks):
print e
self.kmeans.partial_fit(each_chunk)
if self.verbose:
print len(self.kmeans.cluster_centers_.tolist())
print self.kmeans.counts_
print sum(self.kmeans.counts_.tolist())
centroids = self.kmeans.cluster_centers_
labels = self.kmeans.labels_
if self.verbose:
print 'labels:', len(labels.tolist())
for e, centroid in enumerate(centroids):
members = labels == e
docs_by_cluster = each_chunk[members]
if e in collected_data.keys():
collected_data[e].extend(docs_by_cluster.tolist())
else:
collected_data.update({e:docs_by_cluster.tolist()})
if self.verbose:
print 'Members:', len(members.tolist()), type(members)
total_selected = len(collected_data.get(e) or [])
print 'clusterID:', e, "Total Docs:", total_selected
print 'Result:'
for k,v in collected_data.iteritems():
selected_docs_count = len(v)
print 'clusterID', k, 'docs:', selected_docs_count
checker_docs_count += selected_docs_count
print 'check docs count:', checker_docs_count
def clusterDocs(self):
if self.verbose:
print 'NOTE: VERBOSE IS ENABLED!'
if self.directory:
size = self.size
paths = recursiveGetFileList(self.directory)
self.generateVectorizer()
self.generateMBKMeans(init_size=size, batch_size=size)
if self.batch:
chunks = groupSafeLimit(paths, size, iterator=True)
print 'vectorizing... (batch)'
vectorized = self.vectorizeChunks(chunks)
print 'MBKmeans partial fitting...'
self.partialFitChunks(vectorized)
# print 'clustering...'
# self.selectDocsByCluster(vectorized)
#
# print 'partial fitting and clustering...'
# self.partialFitAndCluster(vectorized)
else:
print 'collecting data...'
docs = self.collectDocs(yielder(paths))
print 'vectorizing...'
x_train = reduce_dimension(self.vectorizer.fit_transform(docs))
print 'MBKmeans fitting...'
self.kmeans.fit(x_train)
centroids = self.kmeans.cluster_centers_
labels = self.kmeans.labels_
print 'clustering...'
checker_docs_count = 0
for e, centroid in enumerate(centroids):
members = labels == e
docs_by_cluster = x_train[members]
total_selected = len(docs_by_cluster.tolist())
if self.verbose:
print 'members:', len(members.tolist()), type(members)
print 'dbc:', type(docs_by_cluster)
print 'clusterID:', e, "Total Docs:", total_selected
checker_docs_count += total_selected
print 'check docs count:', checker_docs_count
print 'completed.'
else:
print 'No path, No processing.'
if __name__ == "__main__":
mycluster = clusterP()
parser = argparse.ArgumentParser(description="K-Means Clustering")
parser.add_argument("directory", metavar="DIR", type=str, help="directory of trainer docs.")
parser.add_argument("-v", "--verbose", action="store_true", default=False, help="show messages, prints.")
parser.add_argument("-b", "--batch", action="store_true", default=False, help="single dataset into chunks.")
parser.add_argument("-hvf", "--hashfeacture", type=int, help="no. of feature for hashing vectorizer.", default=5000)
parser.add_argument("-s", "--size", type=int, help="no. of docs in each batch.", default=5000)
parser.add_argument("-kmn", "--kmncluster", type=int, help="no. of cluster (MBKmeans).", default=15)
parser.add_argument("-vpath", "--voronoipath", type=str, help="json path for voronoi.", default="voronoi.json")
args = parser.parse_args(namespace=mycluster)
mycluster.clusterDocs()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment