Last active
May 2, 2016 22:08
-
-
Save tmylk/4da77c179adf7075ab09 to your computer and use it in GitHub Desktop.
heavily logged versions of LDA in sklearn and gensim to enable comparison
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Copyright (C) 2011 Radim Rehurek <[email protected]> | |
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html | |
# | |
# Parts of the LDA inference code come from Dr. Hoffman's `onlineldavb.py` script, | |
# (C) 2010 Matthew D. Hoffman, GNU GPL 3.0 | |
""" | |
**For a faster implementation of LDA (parallelized for multicore machines), see** :mod:`gensim.models.ldamulticore`. | |
Latent Dirichlet Allocation (LDA) in Python. | |
This module allows both LDA model estimation from a training corpus and inference of topic | |
distribution on new, unseen documents. The model can also be updated with new documents | |
for online training. | |
The core estimation code is based on the `onlineldavb.py` script by M. Hoffman [1]_, see | |
**Hoffman, Blei, Bach: Online Learning for Latent Dirichlet Allocation, NIPS 2010.** | |
The algorithm: | |
* is **streamed**: training documents may come in sequentially, no random access required, | |
* runs in **constant memory** w.r.t. the number of documents: size of the | |
training corpus does not affect memory footprint, can process corpora larger than RAM, and | |
* is **distributed**: makes use of a cluster of machines, if available, to | |
speed up model estimation. | |
.. [1] http://www.cs.princeton.edu/~mdhoffma | |
""" | |
import logging | |
import numpy # for arrays, array broadcasting etc. | |
from gensim import interfaces, utils, matutils | |
from itertools import chain | |
from scipy.special import gammaln, psi # gamma function utils | |
from scipy.special import polygamma | |
from six.moves import xrange | |
import six | |
# log(sum(exp(x))) that tries to avoid overflow | |
try: | |
# try importing from here if older scipy is installed | |
from scipy.maxentropy import logsumexp | |
except ImportError: | |
# maxentropy has been removed in recent releases, logsumexp now in misc | |
from scipy.misc import logsumexp | |
logger = logging.getLogger('gensim.models.ldamodel') | |
EPS = numpy.finfo(numpy.float).eps | |
def dirichlet_expectation(alpha): | |
""" | |
For a vector `theta~Dir(alpha)`, compute `E[log(theta)]`. | |
""" | |
if (len(alpha.shape) == 1): | |
result = psi(alpha) - psi(numpy.sum(alpha)) | |
else: | |
result = psi(alpha) - psi(numpy.sum(alpha, 1))[:, numpy.newaxis] | |
return result.astype(alpha.dtype) # keep the same precision as input | |
class LdaState(utils.SaveLoad): | |
""" | |
Encapsulate information for distributed computation of LdaModel objects. | |
Objects of this class are sent over the network, so try to keep them lean to | |
reduce traffic. | |
""" | |
def __init__(self, eta, shape): | |
self.eta = eta | |
self.sstats = numpy.zeros(shape) | |
self.numdocs = 0 | |
def reset(self): | |
""" | |
Prepare the state for a new EM iteration (reset sufficient stats). | |
""" | |
self.sstats[:] = 0.0 | |
self.numdocs = 0 | |
def merge(self, other): | |
""" | |
Merge the result of an E step from one node with that of another node | |
(summing up sufficient statistics). | |
The merging is trivial and after merging all cluster nodes, we have the | |
exact same result as if the computation was run on a single node (no | |
approximation). | |
""" | |
assert other is not None | |
self.sstats += other.sstats | |
self.numdocs += other.numdocs | |
def blend(self, rhot, other, targetsize=None): | |
""" | |
Given LdaState `other`, merge it with the current state. Stretch both to | |
`targetsize` documents before merging, so that they are of comparable | |
magnitude. | |
Merging is done by average weighting: in the extremes, `rhot=0.0` means | |
`other` is completely ignored; `rhot=1.0` means `self` is completely ignored. | |
This procedure corresponds to the stochastic gradient update from Hoffman | |
et al., algorithm 2 (eq. 14). | |
""" | |
assert other is not None | |
if targetsize is None: | |
targetsize = self.numdocs | |
# stretch the current model's expected n*phi counts to target size | |
if self.numdocs == 0 or targetsize == self.numdocs: | |
scale = 1.0 | |
else: | |
scale = 1.0 * targetsize / self.numdocs | |
self.sstats *= (1.0 - rhot) * scale | |
# stretch the incoming n*phi counts to target size | |
if other.numdocs == 0 or targetsize == other.numdocs: | |
scale = 1.0 | |
else: | |
logger.info("merging changes from %i documents into a model of %i documents", | |
other.numdocs, targetsize) | |
scale = 1.0 * targetsize / other.numdocs | |
self.sstats += rhot * scale * other.sstats | |
self.numdocs = targetsize | |
def blend2(self, rhot, other, targetsize=None): | |
""" | |
Alternative, more simple blend. | |
""" | |
assert other is not None | |
if targetsize is None: | |
targetsize = self.numdocs | |
# merge the two matrices by summing | |
self.sstats += other.sstats | |
self.numdocs = targetsize | |
def get_lambda(self): | |
return self.eta + self.sstats | |
def get_Elogbeta(self): | |
return dirichlet_expectation(self.get_lambda()) | |
# endclass LdaState | |
class LdaModel(interfaces.TransformationABC): | |
""" | |
The constructor estimates Latent Dirichlet Allocation model parameters based | |
on a training corpus: | |
>>> lda = LdaModel(corpus, num_topics=10) | |
You can then infer topic distributions on new, unseen documents, with | |
>>> doc_lda = lda[doc_bow] | |
The model can be updated (trained) with new documents via | |
>>> lda.update(other_corpus) | |
Model persistency is achieved through its `load`/`save` methods. | |
""" | |
def __init__(self, corpus=None, num_topics=100, id2word=None, | |
distributed=False, chunksize=2000, passes=1, update_every=1, | |
alpha='symmetric', eta=None, decay=0.5, offset=1.0, | |
eval_every=10, iterations=50, gamma_threshold=0.001, | |
minimum_probability=0.01): | |
""" | |
If given, start training from the iterable `corpus` straight away. If not given, | |
the model is left untrained (presumably because you want to call `update()` manually). | |
`num_topics` is the number of requested latent topics to be extracted from | |
the training corpus. | |
`id2word` is a mapping from word ids (integers) to words (strings). It is | |
used to determine the vocabulary size, as well as for debugging and topic | |
printing. | |
`alpha` and `eta` are hyperparameters that affect sparsity of the document-topic | |
(theta) and topic-word (lambda) distributions. Both default to a symmetric | |
1.0/num_topics prior. | |
`alpha` can be set to an explicit array = prior of your choice. It also | |
support special values of 'asymmetric' and 'auto': the former uses a fixed | |
normalized asymmetric 1.0/topicno prior, the latter learns an asymmetric | |
prior directly from your data. | |
`eta` can be a scalar for a symmetric prior over topic/word | |
distributions, or a matrix of shape num_topics x num_words, | |
which can be used to impose asymmetric priors over the word | |
distribution on a per-topic basis. This may be useful if you | |
want to seed certain topics with particular words by boosting | |
the priors for those words. | |
Turn on `distributed` to force distributed computing (see the `web tutorial <http://radimrehurek.com/gensim/distributed.html>`_ | |
on how to set up a cluster of machines for gensim). | |
Calculate and log perplexity estimate from the latest mini-batch every | |
`eval_every` model updates (setting this to 1 slows down training ~2x; | |
default is 10 for better performance). Set to None to disable perplexity estimation. | |
`decay` and `offset` parameters are the same as Kappa and Tau_0 in | |
Hoffman et al, respectively. | |
`minimum_probability` controls filtering the topics returned for a document (bow). | |
Example: | |
>>> lda = LdaModel(corpus, num_topics=100) # train model | |
>>> print(lda[doc_bow]) # get topic probability distribution for a document | |
>>> lda.update(corpus2) # update the LDA model with additional documents | |
>>> print(lda[doc_bow]) | |
>>> lda = LdaModel(corpus, num_topics=50, alpha='auto', eval_every=5) # train asymmetric alpha from data | |
""" | |
# store user-supplied parameters | |
self.id2word = id2word | |
if corpus is None and self.id2word is None: | |
raise ValueError('at least one of corpus/id2word must be specified, to establish input space dimensionality') | |
if self.id2word is None: | |
logger.warning("no word id mapping provided; initializing from corpus, assuming identity") | |
self.id2word = utils.dict_from_corpus(corpus) | |
self.num_terms = len(self.id2word) | |
elif len(self.id2word) > 0: | |
self.num_terms = 1 + max(self.id2word.keys()) | |
else: | |
self.num_terms = 0 | |
if self.num_terms == 0: | |
raise ValueError("cannot compute LDA over an empty collection (no terms)") | |
self.distributed = bool(distributed) | |
self.num_topics = int(num_topics) | |
self.chunksize = chunksize | |
self.decay = decay | |
self.offset = offset | |
self.minimum_probability = minimum_probability | |
self.num_updates = 0 | |
self.passes = passes | |
self.update_every = update_every | |
self.eval_every = eval_every | |
self.optimize_alpha = alpha == 'auto' | |
if alpha == 'symmetric' or alpha is None: | |
logger.info("using symmetric alpha at %s", 1.0 / num_topics) | |
self.alpha = numpy.asarray([1.0 / num_topics for i in xrange(num_topics)]) | |
elif alpha == 'asymmetric': | |
self.alpha = numpy.asarray([1.0 / (i + numpy.sqrt(num_topics)) for i in xrange(num_topics)]) | |
self.alpha /= self.alpha.sum() | |
logger.info("using asymmetric alpha %s", list(self.alpha)) | |
elif alpha == 'auto': | |
self.alpha = numpy.asarray([1.0 / num_topics for i in xrange(num_topics)]) | |
logger.info("using autotuned alpha, starting with %s", list(self.alpha)) | |
else: | |
# must be either float or an array of floats, of size num_topics | |
self.alpha = alpha if isinstance(alpha, numpy.ndarray) else numpy.asarray([alpha] * num_topics) | |
if len(self.alpha) != num_topics: | |
raise RuntimeError("invalid alpha shape (must match num_topics)") | |
if eta is None: | |
self.eta = 1.0 / num_topics | |
else: | |
self.eta = eta | |
# VB constants | |
self.iterations = iterations | |
self.gamma_threshold = gamma_threshold | |
# set up distributed environment if necessary | |
if not distributed: | |
logger.info("using serial LDA version on this node") | |
self.dispatcher = None | |
self.numworkers = 1 | |
else: | |
if self.optimize_alpha: | |
raise NotImplementedError("auto-optimizing alpha not implemented in distributed LDA") | |
# set up distributed version | |
try: | |
import Pyro4 | |
dispatcher = Pyro4.Proxy('PYRONAME:gensim.lda_dispatcher') | |
logger.debug("looking for dispatcher at %s" % str(dispatcher._pyroUri)) | |
dispatcher.initialize(id2word=self.id2word, num_topics=num_topics, | |
chunksize=chunksize, alpha=alpha, eta=eta, distributed=False) | |
self.dispatcher = dispatcher | |
self.numworkers = len(dispatcher.getworkers()) | |
logger.info("using distributed version with %i workers" % self.numworkers) | |
except Exception as err: | |
logger.error("failed to initialize distributed LDA (%s)", err) | |
raise RuntimeError("failed to initialize distributed LDA (%s)" % err) | |
# Initialize the variational distribution q(beta|lambda) | |
self.state = LdaState(self.eta, (self.num_topics, self.num_terms)) | |
self.state.sstats = numpy.random.gamma(100., 1. / 100., (self.num_topics, self.num_terms)) | |
if __debug__: | |
logger.debug("Initial sstats %s", str(self.state.sstats)) | |
logger.debug("Initial lambda = eta + sstats is %s", str(self.state.get_lambda())) | |
## this adds eta which we don't want | |
# self.sync_state() | |
# !!! first change by Lev | |
self.expElogbeta = numpy.exp(dirichlet_expectation(self.state.sstats)) | |
if __debug__: | |
logger.debug("Initial expElogbeta %s", str(self.expElogbeta)) | |
# if a training corpus was provided, start estimating the model right away | |
if corpus is not None: | |
self.update(corpus) | |
def __str__(self): | |
return "LdaModel(num_terms=%s, num_topics=%s, decay=%s, chunksize=%s)" % \ | |
(self.num_terms, self.num_topics, self.decay, self.chunksize) | |
def sync_state(self): | |
if __debug__: | |
logger.debug("syncing state: old expElogbeta: %s , eta %s, .sstats %s", self.expElogbeta, self.state.eta,\ | |
self.state.sstats) | |
self.expElogbeta = numpy.exp(self.state.get_Elogbeta()) | |
if __debug__: | |
logger.debug("synced state: new expElogbeta: %s", self.expElogbeta) | |
def clear(self): | |
"""Clear model state (free up some memory). Used in the distributed algo.""" | |
self.state = None | |
self.Elogbeta = None | |
def inference(self, chunk, collect_sstats=False): | |
""" | |
Given a chunk of sparse document vectors, estimate gamma (parameters | |
controlling the topic weights) for each document in the chunk. | |
This function does not modify the model (=is read-only aka const). The | |
whole input chunk of document is assumed to fit in RAM; chunking of a | |
large corpus must be done earlier in the pipeline. | |
If `collect_sstats` is True, also collect sufficient statistics needed | |
to update the model's topic-word distributions, and return a 2-tuple | |
`(gamma, sstats)`. Otherwise, return `(gamma, None)`. `gamma` is of shape | |
`len(chunk) x self.num_topics`. | |
Avoids computing the `phi` variational parameter directly using the | |
optimization presented in **Lee, Seung: Algorithms for non-negative matrix factorization, NIPS 2001**. | |
""" | |
try: | |
_ = len(chunk) | |
except: | |
# convert iterators/generators to plain list, so we have len() etc. | |
chunk = list(chunk) | |
if len(chunk) > 1: | |
logger.debug("performing inference on a chunk of %i documents", len(chunk)) | |
# Initialize the variational distribution q(theta|gamma) for the chunk | |
gamma = numpy.random.gamma(100., 1. / 100., (len(chunk), self.num_topics)) | |
if __debug__: | |
logger.debug("(n_samples = %s, n_topics =%s))", len(chunk), self.num_topics) | |
logger.debug("Initialised doc_topic_distr aka q(theta|gamma) %s", str(gamma)) | |
Elogtheta = dirichlet_expectation(gamma) | |
expElogtheta = numpy.exp(Elogtheta) | |
if collect_sstats: | |
sstats = numpy.zeros_like(self.expElogbeta) | |
else: | |
sstats = None | |
if __debug__: | |
logger.debug("Initialised sstats as %s", str(sstats)) | |
converged = 0 | |
# Now, for each document d update that document's gamma and phi | |
# Inference code copied from Hoffman's `onlineldavb.py` (esp. the | |
# Lee&Seung trick which speeds things up by an order of magnitude, compared | |
# to Blei's original LDA-C code, cool!). | |
for d, doc in enumerate(chunk): | |
ids = [id for id, _ in doc] | |
cts = numpy.array([cnt for _, cnt in doc]) | |
gammad = gamma[d, :] | |
Elogthetad = Elogtheta[d, :] | |
expElogthetad = expElogtheta[d, :] | |
expElogbetad = self.expElogbeta[:, ids] | |
# The optimal phi_{dwk} is proportional to expElogthetad_k * expElogbetad_w. | |
# phinorm is the normalizer. | |
# TODO treat zeros explicitly, instead of adding 1e-100? | |
# !!! second change by level: adding explicit EPS to match sklearn | |
phinorm = numpy.dot(expElogthetad, expElogbetad) + EPS | |
if __debug__: | |
logger.debug("document %i, iteration initial, exp_doc_topic_d aka expElogthetad %s" , d, expElogthetad) | |
logger.debug("document %i, iteration initial, exp_topic_word_d aka expElogbetad %s" , d, expElogbetad) | |
logger.debug("document %i, norm_phi aka phinorm %s", d, phinorm) | |
# Iterate between gamma and phi until convergence | |
for _ in xrange(self.iterations): | |
lastgamma = gammad | |
# We represent phi implicitly to save memory and time. | |
# Substituting the value of the optimal phi back into | |
# the update for gamma gives this update. Cf. Lee&Seung 2001. | |
gammad = self.alpha + expElogthetad * numpy.dot(cts / phinorm, expElogbetad.T) | |
if __debug__: | |
logger.debug("document %i, iteration %i, new doc_topic_d aka gammad %s", d, _, str(gammad)) | |
Elogthetad = dirichlet_expectation(gammad) | |
expElogthetad = numpy.exp(Elogthetad) | |
phinorm = numpy.dot(expElogthetad, expElogbetad) + EPS | |
if __debug__: | |
logger.debug("document %i, iteration %i, exp_doc_topic_d aka expElogthetad %s" , d, _, expElogthetad) | |
logger.debug("document %i, iteration %i, exp_topic_word_d aka expElogbetad %s" , d, _, expElogbetad) | |
logger.debug("document %i, iteration %i, norm_phi aka phinorm %s", d, _, phinorm) | |
# If gamma hasn't changed much, we're done. | |
meanchange = numpy.mean(abs(gammad - lastgamma)) | |
if (meanchange < self.gamma_threshold): | |
converged += 1 | |
break | |
gamma[d, :] = gammad | |
if collect_sstats: | |
# Contribution of document d to the expected sufficient | |
# statistics for the M step. | |
sstats[:, ids] += numpy.outer(expElogthetad.T, cts / phinorm) | |
if __debug__: | |
logger.debug("document %i, final, suff_stats aka sstats %s" , d, str(sstats)) | |
logger.debug("document %i, final, norm_phi aka phinorm %s", d, phinorm) | |
logger.debug("document %i, final, norm_phi shape %s", d, phinorm.shape) | |
if len(chunk) > 1: | |
logger.debug("%i/%i documents converged within %i iterations", | |
converged, len(chunk), self.iterations) | |
if collect_sstats: | |
# This step finishes computing the sufficient statistics for the | |
# M step, so that | |
# sstats[k, w] = \sum_d n_{dw} * phi_{dwk} | |
# = \sum_d n_{dw} * exp{Elogtheta_{dk} + Elogbeta_{kw}} / phinorm_{dw}. | |
sstats *= self.expElogbeta | |
if __debug__: | |
logger.debug("all documents, (suff_stats aka sstats) multiplied by expElogbeta %s" , str(sstats)) | |
return gamma, sstats | |
def do_estep(self, chunk, state=None): | |
""" | |
Perform inference on a chunk of documents, and accumulate the collected | |
sufficient statistics in `state` (or `self.state` if None). | |
""" | |
if state is None: | |
state = self.state | |
gamma, sstats = self.inference(chunk, collect_sstats=True) | |
state.sstats += sstats | |
state.numdocs += gamma.shape[0] # avoids calling len(chunk) on a generator | |
return gamma | |
def update_alpha(self, gammat, rho): | |
""" | |
Update parameters for the Dirichlet prior on the per-document | |
topic weights `alpha` given the last `gammat`. | |
Uses Newton's method, described in **Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters.** | |
http://jonathan-huang.org/research/dirichlet/dirichlet.pdf | |
""" | |
N = float(len(gammat)) | |
logphat = sum(dirichlet_expectation(gamma) for gamma in gammat) / N | |
dalpha = numpy.copy(self.alpha) | |
gradf = N * (psi(numpy.sum(self.alpha)) - psi(self.alpha) + logphat) | |
c = N * polygamma(1, numpy.sum(self.alpha)) | |
q = -N * polygamma(1, self.alpha) | |
b = numpy.sum(gradf / q) / (1 / c + numpy.sum(1 / q)) | |
dalpha = -(gradf - b) / q | |
if all(rho * dalpha + self.alpha > 0): | |
self.alpha += rho * dalpha | |
else: | |
logger.warning("updated alpha not positive") | |
logger.info("optimized alpha %s", list(self.alpha)) | |
return self.alpha | |
def log_perplexity(self, chunk, total_docs=None): | |
""" | |
Calculate and return per-word likelihood bound, using the `chunk` of | |
documents as evaluation corpus. Also output the calculated statistics. incl. | |
perplexity=2^(-bound), to log at INFO level. | |
""" | |
if __debug__: | |
logger.debug("calculating perplexity") | |
if total_docs is None: | |
total_docs = len(chunk) | |
corpus_words = sum(cnt for document in chunk for _, cnt in document) | |
subsample_ratio = 1.0 * total_docs / len(chunk) | |
bound = self.bound(chunk, subsample_ratio=subsample_ratio) | |
word_cnt = (subsample_ratio * corpus_words) | |
perwordbound = bound / word_cnt | |
logger.info("%.3f per-word bound, %.1f perplexity estimate based on a held-out corpus of %i documents with %i words" % | |
(perwordbound, numpy.exp2(-perwordbound), len(chunk), corpus_words)) | |
if __debug__: | |
logger.debug("bound %s, word_cnt %s, perword_bound %s", bound, word_cnt, perwordbound) | |
return perwordbound | |
def update(self, corpus, chunksize=None, decay=None, offset=None, | |
passes=None, update_every=None, eval_every=None, iterations=None, | |
gamma_threshold=None): | |
""" | |
Train the model with new documents, by EM-iterating over `corpus` until | |
the topics converge (or until the maximum number of allowed iterations | |
is reached). `corpus` must be an iterable (repeatable stream of documents), | |
In distributed mode, the E step is distributed over a cluster of machines. | |
This update also supports updating an already trained model (`self`) | |
with new documents from `corpus`; the two models are then merged in | |
proportion to the number of old vs. new documents. This feature is still | |
experimental for non-stationary input streams. | |
For stationary input (no topic drift in new documents), on the other hand, | |
this equals the online update of Hoffman et al. and is guaranteed to | |
converge for any `decay` in (0.5, 1.0>. Additionally, for smaller | |
`corpus` sizes, an increasing `offset` may be beneficial (see | |
Table 1 in Hoffman et al.) | |
""" | |
# use parameters given in constructor, unless user explicitly overrode them | |
if decay is None: | |
decay = self.decay | |
if offset is None: | |
offset = self.offset | |
if passes is None: | |
passes = self.passes | |
if update_every is None: | |
update_every = self.update_every | |
if eval_every is None: | |
eval_every = self.eval_every | |
if iterations is None: | |
iterations = self.iterations | |
if gamma_threshold is None: | |
gamma_threshold = self.gamma_threshold | |
try: | |
lencorpus = len(corpus) | |
except: | |
logger.warning("input corpus stream has no len(); counting documents") | |
lencorpus = sum(1 for _ in corpus) | |
if lencorpus == 0: | |
logger.warning("LdaModel.update() called with an empty corpus") | |
return | |
if chunksize is None: | |
chunksize = min(lencorpus, self.chunksize) | |
self.state.numdocs += lencorpus | |
if update_every: | |
updatetype = "online" | |
updateafter = min(lencorpus, update_every * self.numworkers * chunksize) | |
else: | |
updatetype = "batch" | |
updateafter = lencorpus | |
evalafter = min(lencorpus, (eval_every or 0) * self.numworkers * chunksize) | |
updates_per_pass = max(1, lencorpus / updateafter) | |
logger.info("running %s LDA training, %s topics, %i passes over " | |
"the supplied corpus of %i documents, updating model once " | |
"every %i documents, evaluating perplexity every %i documents, " | |
"iterating %ix with a convergence threshold of %f", | |
updatetype, self.num_topics, passes, lencorpus, | |
updateafter, evalafter, iterations, | |
gamma_threshold) | |
if updates_per_pass * passes < 10: | |
logger.warning("too few updates, training might not converge; consider " | |
"increasing the number of passes or iterations to improve accuracy") | |
# rho is the "speed" of updating; TODO try other fncs | |
# pass_ + num_updates handles increasing the starting t for each pass, | |
# while allowing it to "reset" on the first pass of each update | |
def rho(): | |
if __debug__: | |
logger.debug("Rho: decay %s, offset %s, pass_%s, self.num_updates %s, chunksize %s", decay, offset, pass_, self.num_updates, chunksize ) | |
return pow(offset + pass_ + (self.num_updates / chunksize), -decay) | |
for pass_ in xrange(passes): | |
if self.dispatcher: | |
logger.info('initializing %s workers' % self.numworkers) | |
self.dispatcher.reset(self.state) | |
else: | |
other = LdaState(self.eta, self.state.sstats.shape) | |
dirty = False | |
reallen = 0 | |
for chunk_no, chunk in enumerate(utils.grouper(corpus, chunksize, as_numpy=True)): | |
reallen += len(chunk) # keep track of how many documents we've processed so far | |
if eval_every and ((reallen == lencorpus) or ((chunk_no + 1) % (eval_every * self.numworkers) == 0)): | |
self.log_perplexity(chunk, total_docs=lencorpus) | |
if self.dispatcher: | |
# add the chunk to dispatcher's job queue, so workers can munch on it | |
logger.info('PROGRESS: pass %i, dispatching documents up to #%i/%i', | |
pass_, chunk_no * chunksize + len(chunk), lencorpus) | |
# this will eventually block until some jobs finish, because the queue has a small finite length | |
self.dispatcher.putjob(chunk) | |
else: | |
logger.info('PROGRESS: pass %i, at document #%i/%i', | |
pass_, chunk_no * chunksize + len(chunk), lencorpus) | |
gammat = self.do_estep(chunk, other) | |
if self.optimize_alpha: | |
self.update_alpha(gammat, rho()) | |
dirty = True | |
del chunk | |
# perform an M step. determine when based on update_every, don't do this after every chunk | |
if update_every and (chunk_no + 1) % (update_every * self.numworkers) == 0: | |
if self.dispatcher: | |
# distributed mode: wait for all workers to finish | |
logger.info("reached the end of input; now waiting for all remaining jobs to finish") | |
other = self.dispatcher.getstate() | |
self.do_mstep(rho(), other, pass_ > 0) | |
del other # frees up memory | |
if self.dispatcher: | |
logger.info('initializing workers') | |
self.dispatcher.reset(self.state) | |
else: | |
other = LdaState(self.eta, self.state.sstats.shape) | |
dirty = False | |
# endfor single corpus iteration | |
if reallen != lencorpus: | |
raise RuntimeError("input corpus size changed during training (don't use generators as input)") | |
if dirty: | |
# finish any remaining updates | |
if self.dispatcher: | |
# distributed mode: wait for all workers to finish | |
logger.info("reached the end of input; now waiting for all remaining jobs to finish") | |
other = self.dispatcher.getstate() | |
self.do_mstep(rho(), other, pass_ > 0) | |
del other | |
dirty = False | |
# endfor entire corpus update | |
def do_mstep(self, rho, other, extra_pass=False): | |
""" | |
M step: use linear interpolation between the existing topics and | |
collected sufficient statistics in `other` to update the topics. | |
""" | |
if __debug__: | |
logger.debug("Before M-step: components aka lambda %s", str(self.state.get_lambda())) | |
logger.debug("Before m step: expElogbeta %s", str(self.expElogbeta)) | |
logger.debug("Before m step: topic_word_prior aka eta %s", str(self.eta)) | |
logger.debug("Before m step: new sstats %s", other.sstats) | |
logger.debug("Before m step: rho %s", rho) | |
logger.debug("updating topics") | |
# update self with the new blend; also keep track of how much did | |
# the topics change through this update, to assess convergence | |
diff = numpy.log(self.expElogbeta) | |
self.state.blend(rho, other) | |
if __debug__: | |
logger.debug("After m step: sstats %s", self.state.sstats) | |
logger.debug("After M-step: components aka lambda %s", str(self.state.get_lambda())) | |
diff -= self.state.get_Elogbeta() | |
self.sync_state() | |
if __debug__: | |
logger.debug("After m step: expElogbeta %s", str(self.expElogbeta)) | |
# print out some debug info at the end of each EM iteration | |
self.print_topics(5) | |
logger.info("topic diff=%f, rho=%f", numpy.mean(numpy.abs(diff)), rho) | |
if not extra_pass: | |
# only update if this isn't an additional pass | |
self.num_updates += other.numdocs | |
def bound(self, corpus, gamma=None, subsample_ratio=1.0): | |
""" | |
Estimate the variational bound of documents from `corpus`: | |
E_q[log p(corpus)] - E_q[log q(corpus)] | |
`gamma` are the variational parameters on topic weights for each `corpus` | |
document (=2d matrix=what comes out of `inference()`). | |
If not supplied, will be inferred from the model. | |
""" | |
if __debug__: | |
logger.debug("Estimating approx bound") | |
score = 0.0 | |
_lambda = self.state.get_lambda() | |
if __debug__: | |
logger.debug("component aka lambda %s", _lambda) | |
Elogbeta = dirichlet_expectation(_lambda) | |
if __debug__: | |
logger.debug("dirichlet_component aka Elogbeta %s", Elogbeta) | |
for d, doc in enumerate(corpus): # stream the input doc-by-doc, in case it's too large to fit in RAM | |
if d % self.chunksize == 0: | |
logger.debug("bound: at document #%i", d) | |
if gamma is None: | |
gammad, _ = self.inference([doc]) | |
else: | |
gammad = gamma[d] | |
Elogthetad = dirichlet_expectation(gammad) | |
# E[log p(doc | theta, beta)] | |
score += numpy.sum(cnt * logsumexp(Elogthetad + Elogbeta[:, id]) for id, cnt in doc) | |
# E[log p(theta | alpha) - log q(theta | gamma)]; assumes alpha is a vector | |
score += numpy.sum((self.alpha - gammad) * Elogthetad) | |
score += numpy.sum(gammaln(gammad) - gammaln(self.alpha)) | |
score += gammaln(numpy.sum(self.alpha)) - gammaln(numpy.sum(gammad)) | |
if __debug__: | |
logger.debug("Doc_id %s, score %s", d,score) | |
# compensate likelihood for when `corpus` above is only a sample of the whole corpus | |
score *= subsample_ratio | |
if __debug__: | |
logger.debug("Multiplied by %s subsample ratio: score is %s", subsample_ratio, score) | |
# E[log p(beta | eta) - log q (beta | lambda)]; assumes eta is a scalar | |
score += numpy.sum((self.eta - _lambda) * Elogbeta) | |
score += numpy.sum(gammaln(_lambda) - gammaln(self.eta)) | |
if numpy.ndim(self.eta) == 0: | |
sum_eta = self.eta * self.num_terms | |
else: | |
sum_eta = numpy.sum(self.eta, 1) | |
score += numpy.sum(gammaln(sum_eta) - gammaln(numpy.sum(_lambda, 1))) | |
return score | |
def print_topics(self, num_topics=10, num_words=10): | |
return self.show_topics(num_topics, num_words, log=True) | |
def show_topics(self, num_topics=10, num_words=10, log=False, formatted=True): | |
""" | |
For `num_topics` number of topics, return `num_words` most significant words | |
(10 words per topic, by default). | |
The topics are returned as a list -- a list of strings if `formatted` is | |
True, or a list of (probability, word) 2-tuples if False. | |
If `log` is True, also output this result to log. | |
Unlike LSA, there is no natural ordering between the topics in LDA. | |
The returned `num_topics <= self.num_topics` subset of all topics is therefore | |
arbitrary and may change between two LDA training runs. | |
""" | |
if num_topics < 0 or num_topics >= self.num_topics: | |
num_topics = self.num_topics | |
chosen_topics = range(num_topics) | |
else: | |
num_topics = min(num_topics, self.num_topics) | |
# add a little random jitter, to randomize results around the same alpha | |
sort_alpha = self.alpha + 0.0001 * numpy.random.rand(len(self.alpha)) | |
sorted_topics = list(matutils.argsort(sort_alpha)) | |
chosen_topics = sorted_topics[:num_topics // 2] + sorted_topics[-num_topics // 2:] | |
shown = [] | |
for i in chosen_topics: | |
if formatted: | |
topic = self.print_topic(i, topn=num_words) | |
else: | |
topic = self.show_topic(i, topn=num_words) | |
shown.append(topic) | |
if log: | |
logger.info("topic #%i (%.3f): %s", i, self.alpha[i], topic) | |
return shown | |
def show_topic(self, topicid, topn=10): | |
""" | |
Return a list of `(words_probability, word)` 2-tuples for the most probable | |
words in topic `topicid`. | |
Only return 2-tuples for the topn most probable words (ignore the rest). | |
""" | |
topic = self.state.get_lambda()[topicid] | |
topic = topic / topic.sum() # normalize to probability distribution | |
bestn = matutils.argsort(topic, topn, reverse=True) | |
beststr = [(topic[id], self.id2word[id]) for id in bestn] | |
return beststr | |
def print_topic(self, topicid, topn=10): | |
"""Return the result of `show_topic`, but formatted as a single string.""" | |
return ' + '.join(['%.3f*%s' % v for v in self.show_topic(topicid, topn)]) | |
def top_topics(self, corpus, num_words=20): | |
""" | |
Calculate the Umass topic coherence for each topic. Algorithm from | |
**Mimno, Wallach, Talley, Leenders, McCallum: Optimizing Semantic Coherence in Topic Models, CEMNLP 2011.** | |
""" | |
is_corpus, corpus = utils.is_corpus(corpus) | |
if not is_corpus: | |
logger.warning("LdaModel.top_topics() called with an empty corpus") | |
return | |
topics = [] | |
str_topics = [] | |
for topic in self.state.get_lambda(): | |
topic = topic / topic.sum() # normalize to probability distribution | |
bestn = matutils.argsort(topic, topn=num_words, reverse=True) | |
topics.append(bestn) | |
beststr = [(topic[id], self.id2word[id]) for id in bestn] | |
str_topics.append(beststr) | |
# top_ids are limited to every topics top words. should not exceed the | |
# vocabulary size. | |
top_ids = set(chain.from_iterable(topics)) | |
# create a document occurence sparse matrix for each word | |
doc_word_list = {} | |
for id in top_ids: | |
id_list = set() | |
for n, document in enumerate(corpus): | |
if id in frozenset(x[0] for x in document): | |
id_list.add(n) | |
doc_word_list[id] = id_list | |
coherence_scores = [] | |
for t, top_words in enumerate(topics): | |
# Calculate each coherence score C(t, top_words) | |
coherence = 0.0 | |
# Sum of top words m=2..M | |
for m in top_words[1:]: | |
# m_docs is v_m^(t) | |
m_docs = doc_word_list[m] | |
# Sum of top words l=1..m-1 | |
# i.e., all words ranked higher than the current word m | |
for l in top_words[:m - 1]: | |
# l_docs is v_l^(t) | |
l_docs = doc_word_list[l] | |
# co_doc_frequency is D(v_m^(t), v_l^(t)) | |
co_doc_frequency = len(m_docs.intersection(l_docs)) | |
# add to the coherence sum for these two words m, l | |
coherence += numpy.log((co_doc_frequency + 1.0) / len(l_docs)) | |
coherence_scores.append((str_topics[t], coherence)) | |
top_topics = sorted(coherence_scores, key=lambda t: t[1], reverse=True) | |
return top_topics | |
def get_document_topics(self, bow, minimum_probability=None): | |
""" | |
Return topic distribution for the given document `bow`, as a list of | |
(topic_id, topic_probability) 2-tuples. | |
Ignore topics with very low probability (below `minimum_probability`). | |
""" | |
if minimum_probability is None: | |
minimum_probability = self.minimum_probability | |
# if the input vector is a corpus, return a transformed corpus | |
is_corpus, corpus = utils.is_corpus(bow) | |
if is_corpus: | |
return self._apply(corpus) | |
gamma, _ = self.inference([bow]) | |
topic_dist = gamma[0] / sum(gamma[0]) # normalize distribution | |
return [(topicid, topicvalue) for topicid, topicvalue in enumerate(topic_dist) | |
if topicvalue >= minimum_probability] | |
def __getitem__(self, bow, eps=None): | |
""" | |
Return topic distribution for the given document `bow`, as a list of | |
(topic_id, topic_probability) 2-tuples. | |
Ignore topics with very low probability (below `eps`). | |
""" | |
return self.get_document_topics(bow, eps) | |
def save(self, fname, ignore=['state', 'dispatcher'], *args, **kwargs): | |
""" | |
Save the model to file. | |
Large internal arrays may be stored into separate files, with `fname` as prefix. | |
`separately` can be used to define which arrays should be stored in separate files. | |
`ignore` parameter can be used to define which variables should be ignored, i.e. left | |
out from the pickled lda model. By default the internal `state` is ignored as it uses | |
its own serialisation not the one provided by `LdaModel`. The `state` and `dispatcher | |
will be added to any ignore parameter defined. | |
Note: do not save as a compressed file if you intend to load the file back with `mmap`. | |
Note: If you intend to use models across Python 2/3 versions there are a few things to | |
keep in mind: | |
1. The pickled Python dictionaries will not work across Python versions | |
2. The `save` method does not automatically save all NumPy arrays using NumPy, only | |
those ones that exceed `sep_limit` set in `gensim.utils.SaveLoad.save`. The main | |
concern here is the `alpha` array if for instance using `alpha='auto'`. | |
Please refer to the wiki recipes section (https://github.com/piskvorky/gensim/wiki/Recipes-&-FAQ#q9-how-do-i-load-a-model-in-python-3-that-was-trained-and-saved-using-python-2) | |
for an example on how to work around these issues. | |
""" | |
if self.state is not None: | |
self.state.save(utils.smart_extension(fname, '.state'), *args, **kwargs) | |
# make sure 'state' and 'dispatcher' are ignored from the pickled object, even if | |
# someone sets the ignore list themselves | |
if ignore is not None and ignore: | |
if isinstance(ignore, six.string_types): | |
ignore = [ignore] | |
ignore = [e for e in ignore if e] # make sure None and '' are not in the list | |
ignore = list(set(['state', 'dispatcher']) | set(ignore)) | |
else: | |
ignore = ['state', 'dispatcher'] | |
super(LdaModel, self).save(fname, *args, ignore=ignore, **kwargs) | |
@classmethod | |
def load(cls, fname, *args, **kwargs): | |
""" | |
Load a previously saved object from file (also see `save`). | |
Large arrays can be memmap'ed back as read-only (shared memory) by setting `mmap='r'`: | |
>>> LdaModel.load(fname, mmap='r') | |
""" | |
kwargs['mmap'] = kwargs.get('mmap', None) | |
result = super(LdaModel, cls).load(fname, *args, **kwargs) | |
state_fname = utils.smart_extension(fname, '.state') | |
try: | |
result.state = super(LdaModel, cls).load(state_fname, *args, **kwargs) | |
except Exception as e: | |
logging.warning("failed to load state from %s: %s", state_fname, e) | |
return result | |
# endclass LdaModel |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Author: Jan Zikes, Radim Rehurek | |
# Copyright (C) 2014 Radim Rehurek <[email protected]> | |
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html | |
""" | |
Latent Dirichlet Allocation (LDA) in Python, using all CPU cores to parallelize and | |
speed up model training. | |
The parallelization uses multiprocessing; in case this doesn't work for you for | |
some reason, try the :class:`gensim.models.ldamodel.LdaModel` class which is an | |
equivalent, but more straightforward and single-core implementation. | |
The training algorithm: | |
* is **streamed**: training documents may come in sequentially, no random access required, | |
* runs in **constant memory** w.r.t. the number of documents: size of the | |
training corpus does not affect memory footprint, can process corpora larger than RAM | |
Wall-clock `performance on the English Wikipedia <http://radimrehurek.com/gensim/wiki.html>`_ | |
(2G corpus positions, 3.5M documents, 100K features, 0.54G non-zero entries in the final | |
bag-of-words matrix), requesting 100 topics: | |
====================================================== ============== | |
algorithm training time | |
====================================================== ============== | |
LdaMulticore(workers=1) 2h30m | |
LdaMulticore(workers=2) 1h24m | |
LdaMulticore(workers=3) 1h6m | |
old LdaModel() 3h44m | |
simply iterating over input corpus = I/O overhead 20m | |
====================================================== ============== | |
(Measured on `this i7 server <http://www.hetzner.de/en/hosting/produkte_rootserver/ex40ssd>`_ | |
with 4 physical cores, so that optimal `workers=3`, one less than the number of cores.) | |
This module allows both LDA model estimation from a training corpus and inference of topic | |
distribution on new, unseen documents. The model can also be updated with new documents | |
for online training. | |
The core estimation code is based on the `onlineldavb.py` script by M. Hoffman [1]_, see | |
**Hoffman, Blei, Bach: Online Learning for Latent Dirichlet Allocation, NIPS 2010.** | |
.. [1] http://www.cs.princeton.edu/~mdhoffma | |
""" | |
import logging | |
from gensim import utils | |
from gensim.models.ldamodel import LdaModel, LdaState | |
from six.moves import queue, xrange | |
from multiprocessing import Pool, Queue, cpu_count | |
from numpy import exp | |
logger = logging.getLogger(__name__) | |
class LdaMulticore(LdaModel): | |
""" | |
The constructor estimates Latent Dirichlet Allocation model parameters based | |
on a training corpus: | |
>>> lda = LdaMulticore(corpus, num_topics=10) | |
You can then infer topic distributions on new, unseen documents, with | |
>>> doc_lda = lda[doc_bow] | |
The model can be updated (trained) with new documents via | |
>>> lda.update(other_corpus) | |
Model persistency is achieved through its `load`/`save` methods. | |
""" | |
def __init__(self, corpus=None, num_topics=100, id2word=None, workers=None, | |
chunksize=2000, passes=1, batch=False, alpha='symmetric', | |
eta=None, decay=0.5, offset=1.0, eval_every=10, iterations=50, | |
gamma_threshold=0.001, updateafter=None): | |
""" | |
If given, start training from the iterable `corpus` straight away. If not given, | |
the model is left untrained (presumably because you want to call `update()` manually). | |
`num_topics` is the number of requested latent topics to be extracted from | |
the training corpus. | |
`id2word` is a mapping from word ids (integers) to words (strings). It is | |
used to determine the vocabulary size, as well as for debugging and topic | |
printing. | |
`workers` is the number of extra processes to use for parallelization. Uses | |
all available cores by default: `workers=cpu_count()-1`. **Note**: for | |
hyper-threaded CPUs, `cpu_count()` returns a useless number -- set `workers` | |
directly to the number of your **real** cores (not hyperthreads) minus one, | |
for optimal performance. | |
If `batch` is not set, perform online training by updating the model once | |
every `workers * chunksize` documents (online training). Otherwise, | |
run batch LDA, updating model only once at the end of each full corpus pass. | |
`alpha` and `eta` are hyperparameters that affect sparsity of the document-topic | |
(theta) and topic-word (lambda) distributions. Both default to a symmetric | |
1.0/num_topics prior. | |
`alpha` can be set to an explicit array = prior of your choice. It also | |
support special values of 'asymmetric' and 'auto': the former uses a fixed | |
normalized asymmetric 1.0/topicno prior, the latter learns an asymmetric | |
prior directly from your data. | |
`eta` can be a scalar for a symmetric prior over topic/word | |
distributions, or a matrix of shape num_topics x num_words, | |
which can be used to impose asymmetric priors over the word | |
distribution on a per-topic basis. This may be useful if you | |
want to seed certain topics with particular words by boosting | |
the priors for those words. | |
Calculate and log perplexity estimate from the latest mini-batch once every | |
`eval_every` documents. Set to `None` to disable perplexity estimation (faster), | |
or to `0` to only evaluate perplexity once, at the end of each corpus pass. | |
`decay` and `offset` parameters are the same as Kappa and Tau_0 in | |
Hoffman et al, respectively. | |
Example: | |
>>> lda = LdaMulticore(corpus, id2word=id2word, num_topics=100) # train model | |
>>> print(lda[doc_bow]) # get topic probability distribution for a document | |
>>> lda.update(corpus2) # update the LDA model with additional documents | |
>>> print(lda[doc_bow]) | |
""" | |
self.workers = max(1, cpu_count() - 1) if workers is None else workers | |
self.batch = batch | |
self.updateafter = updateafter | |
if alpha == 'auto': | |
raise NotImplementedError("auto-tuning alpha not implemented in multicore LDA; use plain LdaModel.") | |
super(LdaMulticore, self).__init__(corpus=corpus, num_topics=num_topics, | |
id2word=id2word, chunksize=chunksize, passes=passes, alpha=alpha, eta=eta, | |
decay=decay, offset=offset, eval_every=eval_every, iterations=iterations, | |
gamma_threshold=gamma_threshold) | |
def update(self, corpus): | |
""" | |
Train the model with new documents, by EM-iterating over `corpus` until | |
the topics converge (or until the maximum number of allowed iterations | |
is reached). `corpus` must be an iterable (repeatable stream of documents), | |
The E-step is distributed into the several processes. | |
This update also supports updating an already trained model (`self`) | |
with new documents from `corpus`; the two models are then merged in | |
proportion to the number of old vs. new documents. This feature is still | |
experimental for non-stationary input streams. | |
For stationary input (no topic drift in new documents), on the other hand, | |
this equals the online update of Hoffman et al. and is guaranteed to | |
converge for any `decay` in (0.5, 1.0>. | |
""" | |
try: | |
lencorpus = len(corpus) | |
except: | |
logger.warning("input corpus stream has no len(); counting documents") | |
lencorpus = sum(1 for _ in corpus) | |
if lencorpus == 0: | |
logger.warning("LdaMulticore.update() called with an empty corpus") | |
return | |
self.state.numdocs += lencorpus | |
if not self.batch: | |
updatetype = "online" | |
# !!! test by Lev, would be good to merge in to compare with sklearn | |
if self.updateafter is None: | |
self.updateafter = self.chunksize * self.workers | |
else: | |
updatetype = "batch" | |
self.updateafter = lencorpus | |
evalafter = min(lencorpus, (self.eval_every or 0) * self.updateafter) | |
updates_per_pass = max(1, lencorpus / self.updateafter) | |
logger.info("running %s LDA training, %s topics, %i passes over the" | |
" supplied corpus of %i documents, updating every %i documents," | |
" evaluating every ~%i documents, iterating %ix with a convergence threshold of %f", | |
updatetype, self.num_topics, self.passes, lencorpus, self.updateafter, evalafter, | |
self.iterations, self.gamma_threshold) | |
if updates_per_pass * self.passes < 10: | |
logger.warning("too few updates, training might not converge; consider " | |
"increasing the number of passes or iterations to improve accuracy") | |
job_queue = Queue(maxsize=2 * self.workers) | |
result_queue = Queue() | |
# rho is the "speed" of updating; TODO try other fncs | |
# pass_ + num_updates handles increasing the starting t for each pass, | |
# while allowing it to "reset" on the first pass of each update | |
def rho(): | |
if __debug__: | |
logger.debug("Rho: decay %s, offset %s, pass_%s, self.num_updates %s, chunksize %s", self.decay, self.offset, pass_, self.num_updates, self.chunksize ) | |
return pow(self.offset + pass_ + (self.num_updates / self.chunksize), -self.decay) | |
logger.info("training LDA model using %i processes", self.workers) | |
pool = Pool(self.workers, worker_e_step, (job_queue, result_queue,)) | |
for pass_ in xrange(self.passes): | |
queue_size, reallen = [0], 0 | |
other = LdaState(self.eta, self.state.sstats.shape) | |
def process_result_queue(force=False): | |
""" | |
Clear the result queue, merging all intermediate results, and update the | |
LDA model if necessary. | |
""" | |
merged_new = False | |
while not result_queue.empty(): | |
other.merge(result_queue.get()) | |
queue_size[0] -= 1 | |
merged_new = True | |
if __debug__: | |
if merged_new: | |
logger.debug("merged new result") | |
if (force and merged_new and queue_size[0] == 0) or (not self.batch and (other.numdocs >= self.updateafter)): | |
self.do_mstep(rho(), other, pass_ > 0) | |
other.reset() | |
if self.eval_every is not None and ((force and queue_size[0] == 0) or (self.eval_every != 0 and (self.num_updates / self.updateafter) % self.eval_every == 0)): | |
self.log_perplexity(chunk, total_docs=lencorpus) | |
chunk_stream = utils.grouper(corpus, self.chunksize, as_numpy=True) | |
for chunk_no, chunk in enumerate(chunk_stream): | |
reallen += len(chunk) # keep track of how many documents we've processed so far | |
# put the chunk into the workers' input job queue | |
chunk_put = False | |
while not chunk_put: | |
try: | |
job_queue.put((chunk_no, chunk, self), block=False, timeout=0.1) | |
chunk_put = True | |
queue_size[0] += 1 | |
logger.info('PROGRESS: pass %i, dispatched chunk #%i = ' | |
'documents up to #%i/%i, outstanding queue size %i', | |
pass_, chunk_no, chunk_no * self.chunksize + len(chunk), lencorpus, queue_size[0]) | |
except queue.Full: | |
# in case the input job queue is full, keep clearing the | |
# result queue, to make sure we don't deadlock | |
process_result_queue() | |
process_result_queue() | |
#endfor single corpus pass | |
# wait for all outstanding jobs to finish | |
while queue_size[0] > 0: | |
process_result_queue(force=True) | |
if reallen != lencorpus: | |
raise RuntimeError("input corpus size changed during training (don't use generators as input)") | |
#endfor entire update | |
pool.terminate() | |
def worker_e_step(input_queue, result_queue): | |
""" | |
Perform E-step for each (chunk_no, chunk, model) 3-tuple from the | |
input queue, placing the resulting state into the result queue. | |
""" | |
logger.debug("worker process entering E-step loop") | |
while True: | |
logger.debug("getting a new job") | |
chunk_no, chunk, worker_lda = input_queue.get() | |
logger.debug("processing chunk #%i of %i documents", chunk_no, len(chunk)) | |
worker_lda.state.reset() | |
worker_lda.do_estep(chunk) # TODO: auto-tune alpha? | |
del chunk | |
logger.debug("processed chunk, queuing the result") | |
result_queue.put(worker_lda.state) | |
if __debug__: | |
logger.debug("the result that just queued has ssstats %s", exp(worker_lda.state.sstats)) | |
del worker_lda # free up some memory | |
logger.debug("result put") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
============================================================= | |
Online Latent Dirichlet Allocation with variational inference | |
============================================================= | |
This implementation is modified from Matthew D. Hoffman's onlineldavb code | |
Link: http://www.cs.princeton.edu/~mdhoffma/code/onlineldavb.tar | |
""" | |
# Author: Chyi-Kwei Yau | |
# Author: Matthew D. Hoffman (original onlineldavb implementation) | |
import logging | |
import numpy as np | |
import scipy.sparse as sp | |
from scipy.special import gammaln | |
from ..base import BaseEstimator, TransformerMixin | |
from ..utils import (check_random_state, check_array, | |
gen_batches, gen_even_slices, _get_n_jobs) | |
from ..utils.validation import NotFittedError, check_non_negative | |
from ..utils.extmath import logsumexp | |
from ..externals.joblib import Parallel, delayed | |
from ..externals.six.moves import xrange | |
from ._online_lda import (mean_change, _dirichlet_expectation_1d, | |
_dirichlet_expectation_2d) | |
EPS = np.finfo(np.float).eps | |
logger = logging.getLogger('sklearn.online_lda') | |
logger.setLevel(logging.DEBUG) | |
def _update_doc_distribution(X, exp_topic_word_distr, doc_topic_prior, | |
max_iters, | |
mean_change_tol, cal_sstats, random_state): | |
"""E-step: update document-topic distribution. | |
Parameters | |
---------- | |
X : array-like or sparse matrix, shape=(n_samples, n_features) | |
Document word matrix. | |
exp_topic_word_distr : dense matrix, shape=(n_topics, n_features) | |
Exponential value of expection of log topic word distribution. | |
In the literature, this is `exp(E[log(beta)])`. | |
doc_topic_prior : float | |
Prior of document topic distribution `theta`. | |
max_iters : int | |
Max number of iterations for updating document topic distribution in | |
the E-step. | |
mean_change_tol : float | |
Stopping tolerance for updating document topic distribution in E-setp. | |
cal_sstats : boolean | |
Parameter that indicate to calculate sufficient statistics or not. | |
Set `cal_sstats` to `True` when we need to run M-step. | |
random_state : RandomState instance or None | |
Parameter that indicate how to initialize document topic distribution. | |
Set `random_state` to None will initialize document topic distribution | |
to a constant number. | |
Returns | |
------- | |
(doc_topic_distr, suff_stats) : | |
`doc_topic_distr` is unnormalized topic distribution for each document. | |
In the literature, this is `gamma`. we can calcuate `E[log(theta)]` | |
from it. | |
`suff_stats` is expected sufficient statistics for the M-step. | |
When `cal_sstats == False`, this will be None. | |
""" | |
is_sparse_x = sp.issparse(X) | |
n_samples, n_features = X.shape | |
n_topics = exp_topic_word_distr.shape[0] | |
if __debug__: | |
logger.debug("(n_samples = %s, n_topics =%s))", n_samples, n_topics) | |
if random_state: | |
doc_topic_distr = random_state.gamma(100., 0.01, (n_samples, n_topics)) | |
else: | |
doc_topic_distr = np.ones((n_samples, n_topics)) | |
if __debug__: | |
logger.debug("Initialised doc_topic_distr aka q(theta|gamma) %s", doc_topic_distr) | |
# In the literature, this is `exp(E[log(theta)])` | |
exp_doc_topic = np.exp(_dirichlet_expectation_2d(doc_topic_distr)) | |
if __debug__: | |
logger.debug("Initialised exp_doc_topic aka Elogtheta %s", exp_doc_topic) | |
# diff on `component_` (only calculate it when `cal_diff` is True) | |
suff_stats = np.zeros(exp_topic_word_distr.shape) if cal_sstats else None | |
if __debug__: | |
logger.debug("Initialised suff_stats aka sstats %s", suff_stats) | |
if is_sparse_x: | |
X_data = X.data | |
X_indices = X.indices | |
X_indptr = X.indptr | |
for idx_d in xrange(n_samples): | |
if is_sparse_x: | |
ids = X_indices[X_indptr[idx_d]:X_indptr[idx_d + 1]] | |
cnts = X_data[X_indptr[idx_d]:X_indptr[idx_d + 1]] | |
else: | |
ids = np.nonzero(X[idx_d, :])[0] | |
cnts = X[idx_d, ids] | |
doc_topic_d = doc_topic_distr[idx_d, :] | |
exp_doc_topic_d = exp_doc_topic[idx_d, :] | |
exp_topic_word_d = exp_topic_word_distr[:, ids] | |
# Iterate between `doc_topic_d` and `norm_phi` until convergence | |
for _ in xrange(0, max_iters): | |
last_d = doc_topic_d | |
# The optimal phi_{dwk} is proportional to | |
# exp(E[log(theta_{dk})]) * exp(E[log(beta_{dw})]). | |
norm_phi = np.dot(exp_doc_topic_d, exp_topic_word_d) + EPS | |
if __debug__: | |
logger.debug("document %i, iteration %i, exp_doc_topic_d aka expElogthetad %s" , idx_d, _, exp_doc_topic_d) | |
logger.debug("document %i, iteration %i, exp_topic_word_d aka expElogbetad %s" , idx_d, _, exp_topic_word_d) | |
logger.debug("document %i, iteration %i, norm_phi aka phinorm %s" , idx_d, _, norm_phi) | |
doc_topic_d = (doc_topic_prior + exp_doc_topic_d * | |
np.dot(cnts / norm_phi, exp_topic_word_d.T)) | |
if __debug__: | |
logger.debug("document %i, iteration %i, new doc_topic_d aka gammad %s", idx_d, _, str(doc_topic_d)) | |
exp_doc_topic_d = _dirichlet_expectation_1d(doc_topic_d) | |
if __debug__: | |
logger.debug("document %i, iteration %i, new expElogthetad aka exp_doc_topic_d %s", idx_d, _, str(doc_topic_d)) | |
if mean_change(last_d, doc_topic_d) < mean_change_tol: | |
break | |
doc_topic_distr[idx_d, :] = doc_topic_d | |
# Contribution of document d to the expected sufficient | |
# statistics for the M step. | |
if cal_sstats: | |
norm_phi = np.dot(exp_doc_topic_d, exp_topic_word_d) + EPS | |
if __debug__: | |
logger.debug("document %i, final, norm_phi aka phinorm %s" , idx_d, norm_phi) | |
logger.debug("document %i, final, norm_phi shape %s", idx_d, norm_phi.shape) | |
suff_stats[:, ids] += np.outer(exp_doc_topic_d.T, cnts / norm_phi) | |
if __debug__: | |
logger.debug("document %i, final, suff_stats aka sstats %s" , idx_d, str(suff_stats)) | |
return (doc_topic_distr, suff_stats) | |
class LatentDirichletAllocation(BaseEstimator, TransformerMixin): | |
"""Latent Dirichlet Allocation with online variational Bayes algorithm | |
Parameters | |
---------- | |
n_topics : int, optional (default=10) | |
Number of topics. | |
doc_topic_prior : float, optional (default=None) | |
Prior of document topic distribution `theta`. If the value is None, | |
defaults to `1 / n_topics`. | |
In the literature, this is called `alpha`. | |
topic_word_prior : float, optional (default=None) | |
Prior of topic word distribution `beta`. If the value is None, defaults | |
to `1 / n_topics`. | |
In the literature, this is called `eta`. | |
learning_method : 'batch' | 'online', default='online' | |
Method used to update `_component`. Only used in `fit` method. | |
In general, if the data size is large, the online update will be much | |
faster than the batch update. | |
Valid options:: | |
'batch': Batch variational Bayes method. Use all training data in | |
each EM update. | |
Old `components_` will be overwritten in each iteration. | |
'online': Online variational Bayes method. In each EM update, use | |
mini-batch of training data to update the ``components_`` | |
variable incrementally. The learning rate is controlled by the | |
``learning_decay`` and the ``learning_offset`` parameters. | |
learning_decay : float, optional (default=0.7) | |
It is a parameter that control learning rate in the online learning | |
method. The value should be set between (0.5, 1.0] to guarantee | |
asymptotic convergence. When the value is 0.0 and batch_size is | |
``n_samples``, the update method is same as batch learning. In the | |
literature, this is called kappa. | |
learning_offset : float, optional (default=10.) | |
A (positive) parameter that downweights early iterations in online | |
learning. It should be greater than 1.0. In the literature, this is | |
called tau_0. | |
max_iter : integer, optional (default=10) | |
The maximum number of iterations. | |
total_samples : int, optional (default=1e6) | |
Total number of documents. Only used in the `partial_fit` method. | |
batch_size : int, optional (default=128) | |
Number of documents to use in each EM iteration. Only used in online | |
learning. | |
evaluate_every : int optional (default=0) | |
How often to evaluate perplexity. Only used in `fit` method. | |
set it to 0 or and negative number to not evalute perplexity in | |
training at all. Evaluating perplexity can help you check convergence | |
in training process, but it will also increase total training time. | |
Evaluating perplexity in every iteration might increase training time | |
up to two-fold. | |
perp_tol : float, optional (default=1e-1) | |
Perplexity tolerance in batch learning. Only used when | |
``evaluate_every`` is greater than 0. | |
mean_change_tol : float, optional (default=1e-3) | |
Stopping tolerance for updating document topic distribution in E-step. | |
max_doc_update_iter : int (default=100) | |
Max number of iterations for updating document topic distribution in | |
the E-step. | |
n_jobs : int, optional (default=1) | |
The number of jobs to use in the E-step. If -1, all CPUs are used. For | |
``n_jobs`` below -1, (n_cpus + 1 + n_jobs) are used. | |
verbose : int, optional (default=0) | |
Verbosity level. | |
random_state : int or RandomState instance or None, optional (default=None) | |
Pseudo-random number generator seed control. | |
Attributes | |
---------- | |
components_ : array, [n_topics, n_features] | |
Topic word distribution. ``components_[i, j]`` represents word j in | |
topic `i`. In the literature, this is called lambda. | |
n_batch_iter_ : int | |
Number of iterations of the EM step. | |
n_iter_ : int | |
Number of passes over the dataset. | |
References | |
---------- | |
[1] "Online Learning for Latent Dirichlet Allocation", Matthew D. Hoffman, | |
David M. Blei, Francis Bach, 2010 | |
[2] "Stochastic Variational Inference", Matthew D. Hoffman, David M. Blei, | |
Chong Wang, John Paisley, 2013 | |
[3] Matthew D. Hoffman's onlineldavb code. Link: | |
http://www.cs.princeton.edu/~mdhoffma/code/onlineldavb.tar | |
""" | |
def __init__(self, n_topics=10, doc_topic_prior=None, | |
topic_word_prior=None, learning_method='online', | |
learning_decay=.7, learning_offset=10., max_iter=10, | |
batch_size=128, evaluate_every=-1, total_samples=1e6, | |
perp_tol=1e-1, mean_change_tol=1e-3, max_doc_update_iter=100, | |
n_jobs=1, verbose=0, random_state=None): | |
self.n_topics = n_topics | |
self.doc_topic_prior = doc_topic_prior | |
self.topic_word_prior = topic_word_prior | |
self.learning_method = learning_method | |
self.learning_decay = learning_decay | |
self.learning_offset = learning_offset | |
self.max_iter = max_iter | |
self.batch_size = batch_size | |
self.evaluate_every = evaluate_every | |
self.total_samples = total_samples | |
self.perp_tol = perp_tol | |
self.mean_change_tol = mean_change_tol | |
self.max_doc_update_iter = max_doc_update_iter | |
self.n_jobs = n_jobs | |
self.verbose = verbose | |
self.random_state = random_state | |
def _check_params(self): | |
"""Check model parameters.""" | |
if self.n_topics <= 0: | |
raise ValueError("Invalid 'n_topics' parameter: %r" | |
% self.n_topics) | |
if self.total_samples <= 0: | |
raise ValueError("Invalid 'total_samples' parameter: %r" | |
% self.total_samples) | |
if self.learning_offset < 0: | |
raise ValueError("Invalid 'learning_offset' parameter: %r" | |
% self.learning_offset) | |
if self.learning_method not in ("batch", "online"): | |
raise ValueError("Invalid 'learning_method' parameter: %r" | |
% self.learning_method) | |
def _init_latent_vars(self, n_features): | |
"""Initialize latent variables.""" | |
self.random_state_ = check_random_state(self.random_state) | |
self.n_batch_iter_ = 1 | |
self.n_iter_ = 0 | |
if self.doc_topic_prior is None: | |
self.doc_topic_prior_ = 1. / self.n_topics | |
else: | |
self.doc_topic_prior_ = self.doc_topic_prior | |
if self.topic_word_prior is None: | |
self.topic_word_prior_ = 1. / self.n_topics | |
else: | |
self.topic_word_prior_ = self.topic_word_prior | |
init_gamma = 100. | |
init_var = 1. / init_gamma | |
# In the literature, this is called `lambda` | |
self.components_ = self.random_state_.gamma( | |
init_gamma, init_var, (self.n_topics, n_features)) | |
if __debug__: | |
logger.debug("Initial components aka lambda %s", str(self.components_)) | |
# In the literature, this is `exp(E[log(beta)])` | |
self.exp_dirichlet_component_ = np.exp( | |
_dirichlet_expectation_2d(self.components_)) | |
if __debug__: | |
logger.debug("Initial exp_dirichlet_component aka expElogbeta %s", str(self.exp_dirichlet_component_)) | |
def _e_step(self, X, cal_sstats, random_init): | |
"""E-step in EM update. | |
Parameters | |
---------- | |
X : array-like or sparse matrix, shape=(n_samples, n_features) | |
Document word matrix. | |
cal_sstats : boolean | |
Parameter that indicate whether to calculate sufficient statistics | |
or not. Set ``cal_sstats`` to True when we need to run M-step. | |
random_init : boolean | |
Parameter that indicate whether to initialize document topic | |
distribution randomly in the E-step. Set it to True in training | |
steps. | |
Returns | |
------- | |
(doc_topic_distr, suff_stats) : | |
`doc_topic_distr` is unnormailzed topic distribution for each | |
document. In the literature, this is called `gamma`. | |
`suff_stats` is expected sufficient statistics for the M-step. | |
When `cal_sstats == False`, it will be None. | |
""" | |
# Run e-step in parallel | |
n_jobs = _get_n_jobs(self.n_jobs) | |
random_state = self.random_state_ if random_init else None | |
results = Parallel(n_jobs=n_jobs, verbose=self.verbose)( | |
delayed(_update_doc_distribution)(X[idx_slice, :], | |
self.exp_dirichlet_component_, | |
self.doc_topic_prior_, | |
self.max_doc_update_iter, | |
self.mean_change_tol, cal_sstats, | |
random_state) | |
for idx_slice in gen_even_slices(X.shape[0], n_jobs)) | |
# merge result | |
doc_topics, sstats_list = zip(*results) | |
doc_topic_distr = np.vstack(doc_topics) | |
if cal_sstats: | |
# This step finishes computing the sufficient statistics for the | |
# M-step. | |
suff_stats = np.zeros(self.components_.shape) | |
for sstats in sstats_list: | |
suff_stats += sstats | |
suff_stats *= self.exp_dirichlet_component_ | |
if __debug__: | |
logger.debug("all documents, (suff_stats aka sstats) multiplied by expElogbeta %s", str(suff_stats)) | |
else: | |
suff_stats = None | |
return (doc_topic_distr, suff_stats) | |
def _em_step(self, X, total_samples, batch_update): | |
"""EM update for 1 iteration. | |
update `_component` by batch VB or online VB. | |
Parameters | |
---------- | |
X : array-like or sparse matrix, shape=(n_samples, n_features) | |
Document word matrix. | |
total_samples : integer | |
Total umber of documents. It is only used when | |
batch_update is `False`. | |
batch_update : boolean | |
Parameter that controls updating method. | |
`True` for batch learning, `False` for online learning. | |
Returns | |
------- | |
doc_topic_distr : array, shape=(n_samples, n_topics) | |
Unnormalized document topic distribution. | |
""" | |
# E-step | |
_, suff_stats = self._e_step(X, cal_sstats=True, random_init=True) | |
if __debug__: | |
logger.debug("Before M-step: components aka lambda %s", str(self.components_)) | |
logger.debug("Before m step: expElogbeta %s", str(self.exp_dirichlet_component_)) | |
logger.debug("Before m step: topic_word_prior aka eta %s", str(self.topic_word_prior_)) | |
logger.debug("Before m step: new sstats %s", suff_stats) | |
# M-step | |
if batch_update: | |
self.components_ = self.topic_word_prior_ + suff_stats | |
if __debug__: | |
logger.debug("After M-step: components aka lambda %s", str(self.components_)) | |
else: | |
# online update | |
# In the literature, the weight is `rho` | |
if __debug__: | |
logger.debug("Rho: decay %s, offset %s, self.batch_iter %s", self.learning_decay, self.learning_offset, self.n_batch_iter_ ) | |
weight = np.power(self.learning_offset + self.n_batch_iter_, | |
-self.learning_decay) | |
if __debug__: | |
logger.debug("Before m step: rho %s", weight) | |
doc_ratio = float(total_samples) / X.shape[0] | |
self.components_ *= (1 - weight) | |
self.components_ += (weight * (self.topic_word_prior_ | |
+ doc_ratio * suff_stats)) | |
if __debug__: | |
logger.debug("After M-step: components aka lambda %s", self.components_) | |
# update `component_` related variables | |
self.exp_dirichlet_component_ = np.exp( | |
_dirichlet_expectation_2d(self.components_)) | |
if __debug__: | |
logger.debug("After m step: expElogbeta %s", str(self.exp_dirichlet_component_)) | |
self.n_batch_iter_ += 1 | |
return | |
def _check_non_neg_array(self, X, whom): | |
"""check X format | |
check X format and make sure no negative value in X. | |
Parameters | |
---------- | |
X : array-like or sparse matrix | |
""" | |
X = check_array(X, accept_sparse='csr') | |
check_non_negative(X, whom) | |
return X | |
def partial_fit(self, X, y=None): | |
"""Online VB with Mini-Batch update. | |
Parameters | |
---------- | |
X : array-like or sparse matrix, shape=(n_samples, n_features) | |
Document word matrix. | |
Returns | |
------- | |
self | |
""" | |
self._check_params() | |
X = self._check_non_neg_array(X, | |
"LatentDirichletAllocation.partial_fit") | |
n_samples, n_features = X.shape | |
batch_size = self.batch_size | |
# initialize parameters or check | |
if not hasattr(self, 'components_'): | |
self._init_latent_vars(n_features) | |
if n_features != self.components_.shape[1]: | |
raise ValueError( | |
"The provided data has %d dimensions while " | |
"the model was trained with feature size %d." % | |
(n_features, self.components_.shape[1])) | |
for idx_slice in gen_batches(n_samples, batch_size): | |
self._em_step(X[idx_slice, :], total_samples=self.total_samples, | |
batch_update=False) | |
return self | |
def fit(self, X, y=None): | |
"""Learn model for the data X with variational Bayes method. | |
When `learning_method` is 'online', use mini-batch update. | |
Otherwise, use batch update. | |
Parameters | |
---------- | |
X : array-like or sparse matrix, shape=(n_samples, n_features) | |
Document word matrix. | |
Returns | |
------- | |
self | |
""" | |
self._check_params() | |
X = self._check_non_neg_array(X, "LatentDirichletAllocation.fit") | |
n_samples, n_features = X.shape | |
max_iter = self.max_iter | |
evaluate_every = self.evaluate_every | |
learning_method = self.learning_method | |
batch_size = self.batch_size | |
# initialize parameters | |
self._init_latent_vars(n_features) | |
# change to perplexity later | |
last_bound = None | |
for i in xrange(max_iter): | |
if learning_method == 'online': | |
for idx_slice in gen_batches(n_samples, batch_size): | |
self._em_step(X[idx_slice, :], total_samples=n_samples, | |
batch_update=False) | |
else: | |
# batch update | |
self._em_step(X, total_samples=n_samples, batch_update=True) | |
# check perplexity | |
if evaluate_every > 0 and (i + 1) % evaluate_every == 0: | |
if __debug__: | |
logger.debug("Checking perplexity from fit") | |
doc_topics_distr, _ = self._e_step(X, cal_sstats=False, | |
random_init=False) | |
bound = self.perplexity(X, doc_topics_distr, | |
sub_sampling=False) | |
if self.verbose: | |
print('iteration: %d, perplexity: %.4f' % (i + 1, bound)) | |
if last_bound and abs(last_bound - bound) < self.perp_tol: | |
break | |
last_bound = bound | |
self.n_iter_ += 1 | |
return self | |
def transform(self, X): | |
"""Transform data X according to the fitted model. | |
Parameters | |
---------- | |
X : array-like or sparse matrix, shape=(n_samples, n_features) | |
Document word matrix. | |
Returns | |
------- | |
doc_topic_distr : shape=(n_samples, n_topics) | |
Document topic distribution for X. | |
""" | |
if __debug__: | |
logger.debug("Transforming X according to the fitted model") | |
if not hasattr(self, 'components_'): | |
raise NotFittedError("no 'components_' attribute in model." | |
" Please fit model first.") | |
# make sure feature size is the same in fitted model and in X | |
X = self._check_non_neg_array(X, "LatentDirichletAllocation.transform") | |
n_samples, n_features = X.shape | |
if n_features != self.components_.shape[1]: | |
raise ValueError( | |
"The provided data has %d dimensions while " | |
"the model was trained with feature size %d." % | |
(n_features, self.components_.shape[1])) | |
doc_topic_distr, _ = self._e_step(X, cal_sstats=False, | |
random_init=False) | |
return doc_topic_distr | |
def _approx_bound(self, X, doc_topic_distr, sub_sampling): | |
"""Estimate the variational bound. | |
Estimate the variational bound over "all documents" using only the | |
documents passed in as X. Since log-likelihood of each word cannot | |
be computed directly, we use this bound to estimate it. | |
Parameters | |
---------- | |
X : array-like or sparse matrix, shape=(n_samples, n_features) | |
Document word matrix. | |
doc_topic_distr : array, shape=(n_samples, n_topics) | |
Document topic distribution. In the literature, this is called | |
gamma. | |
sub_sampling : boolean, optional, (default=False) | |
Compensate for subsampling of documents. | |
It is used in calcuate bound in online learning. | |
Returns | |
------- | |
score : float | |
""" | |
if __debug__: | |
logger.debug("Estimating approx bound") | |
def _loglikelihood(prior, distr, dirichlet_distr, size): | |
# calculate log-likelihood | |
score = np.sum((prior - distr) * dirichlet_distr) | |
score += np.sum(gammaln(distr) - gammaln(prior)) | |
score += np.sum(gammaln(prior * size) - gammaln(np.sum(distr, 1))) | |
return score | |
is_sparse_x = sp.issparse(X) | |
n_samples, n_topics = doc_topic_distr.shape | |
n_features = self.components_.shape[1] | |
score = 0 | |
dirichlet_doc_topic = _dirichlet_expectation_2d(doc_topic_distr) | |
if __debug__: | |
logger.debug("Initialised dirichlet_doc_topic aka Elogtheta %s", dirichlet_doc_topic) | |
dirichlet_component_ = _dirichlet_expectation_2d(self.components_) | |
if __debug__: | |
logger.debug("component aka lambda %s", self.components_) | |
if __debug__: | |
logger.debug("dirichlet_component aka Elogbeta %s", dirichlet_component_) | |
doc_topic_prior = self.doc_topic_prior_ | |
topic_word_prior = self.topic_word_prior_ | |
if is_sparse_x: | |
X_data = X.data | |
X_indices = X.indices | |
X_indptr = X.indptr | |
# E[log p(docs | theta, beta)] | |
for idx_d in xrange(0, n_samples): | |
if is_sparse_x: | |
ids = X_indices[X_indptr[idx_d]:X_indptr[idx_d + 1]] | |
cnts = X_data[X_indptr[idx_d]:X_indptr[idx_d + 1]] | |
else: | |
ids = np.nonzero(X[idx_d, :])[0] | |
cnts = X[idx_d, ids] | |
temp = (dirichlet_doc_topic[idx_d, :, np.newaxis] | |
+ dirichlet_component_[:, ids]) | |
norm_phi = logsumexp(temp) | |
score += np.dot(cnts, norm_phi) | |
if __debug__: | |
logger.debug("Doc_id %s, score %s", idx_d,score) | |
# compute E[log p(theta | alpha) - log q(theta | gamma)] | |
score += _loglikelihood(doc_topic_prior, doc_topic_distr, | |
dirichlet_doc_topic, self.n_topics) | |
if __debug__: | |
logger.debug("Added E[log p(theta | alpha) - log q(theta | gamma)]=%s", score) | |
# Compensate for the subsampling of the population of documents | |
if sub_sampling: | |
doc_ratio = float(self.total_samples) / n_samples | |
score *= doc_ratio | |
if __debug__: | |
logger.debug("Multiplied by %s subsample ratio", doc_ratio, score) | |
# E[log p(beta | eta) - log q (beta | lambda)] | |
score += _loglikelihood(topic_word_prior, self.components_, | |
dirichlet_component_, n_features) | |
return score | |
def score(self, X, y=None): | |
"""Calculate approximate log-likelihood as score. | |
Parameters | |
---------- | |
X : array-like or sparse matrix, shape=(n_samples, n_features) | |
Document word matrix. | |
Returns | |
------- | |
score : float | |
Use approximate bound as score. | |
""" | |
X = self._check_non_neg_array(X, "LatentDirichletAllocation.score") | |
doc_topic_distr = self.transform(X) | |
score = self._approx_bound(X, doc_topic_distr, sub_sampling=False) | |
return score | |
def perplexity(self, X, doc_topic_distr=None, sub_sampling=False): | |
"""Calculate approximate perplexity for data X. | |
Perplexity is defined as exp(-1. * log-likelihood per word) | |
Parameters | |
---------- | |
X : array-like or sparse matrix, [n_samples, n_features] | |
Document word matrix. | |
doc_topic_distr : None or array, shape=(n_samples, n_topics) | |
Document topic distribution. | |
If it is None, it will be generated by applying transform on X. | |
Returns | |
------- | |
score : float | |
Perplexity score. | |
""" | |
if __debug__: | |
logger.debug("calculating perplexity") | |
if not hasattr(self, 'components_'): | |
raise NotFittedError("no 'components_' attribute in model." | |
" Please fit model first.") | |
X = self._check_non_neg_array(X, | |
"LatentDirichletAllocation.perplexity") | |
if doc_topic_distr is None: | |
if __debug__: | |
logger.debug("Initialising doc topic distr by calling transform") | |
doc_topic_distr = self.transform(X) | |
else: | |
n_samples, n_topics = doc_topic_distr.shape | |
if n_samples != X.shape[0]: | |
raise ValueError("Number of samples in X and doc_topic_distr" | |
" do not match.") | |
if n_topics != self.n_topics: | |
raise ValueError("Number of topics does not match.") | |
current_samples = X.shape[0] | |
bound = self._approx_bound(X, doc_topic_distr, sub_sampling) | |
if sub_sampling: | |
word_cnt = X.sum() * (float(self.total_samples) / current_samples) | |
else: | |
word_cnt = X.sum() | |
perword_bound = bound / word_cnt | |
if __debug__: | |
logger.debug("bound %s, word_cnt %s, perword_bound %s", bound, word_cnt, perword_bound) | |
return np.exp(-1.0 * perword_bound) | |
# | |
# # !!! checking if numerical diff is due to python | |
# # from ._online_lda import (mean_change, _dirichlet_expectation_1d, | |
# # _dirichlet_expectation_2d) | |
# | |
# from scipy.special import gammaln, psi # gamma function utils | |
# from scipy.special import polygamma | |
# | |
# def _dirichlet_expectation_1d(alpha): | |
# """ | |
# For a vector `theta~Dir(alpha)`, compute `E[log(theta)]`. | |
# | |
# """ | |
# if (len(alpha.shape) == 1): | |
# result = psi(alpha) - psi(np.sum(alpha)) | |
# else: | |
# result = psi(alpha) - psi(np.sum(alpha, 1))[:, np.newaxis] | |
# return (np.exp(result)).astype(alpha.dtype) # keep the same precision as input | |
# | |
# def _dirichlet_expectation_2d(alpha): | |
# """ | |
# For a vector `theta~Dir(alpha)`, compute `E[log(theta)]`. | |
# | |
# """ | |
# if (len(alpha.shape) == 1): | |
# result = psi(alpha) - psi(np.sum(alpha)) | |
# else: | |
# result = psi(alpha) - psi(np.sum(alpha, 1))[:, np.newaxis] | |
# return result.astype(alpha.dtype) # keep the same precision as input | |
# | |
# def mean_change(last_d, doc_topic_d): | |
# return np.mean(abs(last_d - doc_topic_d)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment