Last active
December 17, 2015 07:59
-
-
Save dashesy/5577051 to your computer and use it in GitHub Desktop.
sklearn GMM suitable for prediction at the last stage of pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Gaussian Mixture Models. | |
This implementation corresponds to frequentist (non-Bayesian) formulation | |
of Gaussian Mixture Models. | |
""" | |
# Author: Ron Weiss <[email protected]> | |
# Fabian Pedregosa <[email protected]> | |
# Bertrand Thirion <[email protected]> | |
import numpy as np | |
from ..base import BaseEstimator | |
from ..utils import check_random_state | |
from ..utils.extmath import logsumexp, pinvh | |
from .. import cluster | |
EPS = np.finfo(float).eps | |
def log_multivariate_normal_density(X, means, covars, covariance_type='diag'): | |
"""Compute the log probability under a multivariate Gaussian distribution. | |
Parameters | |
---------- | |
X : array_like, shape (n_samples, n_features) | |
List of n_features-dimensional data points. Each row corresponds to a | |
single data point. | |
means : array_like, shape (n_components, n_features) | |
List of n_features-dimensional mean vectors for n_components Gaussians. | |
Each row corresponds to a single mean vector. | |
covars : array_like | |
List of n_components covariance parameters for each Gaussian. The shape | |
depends on `covariance_type`: | |
(n_components, n_features) if 'spherical', | |
(n_features, n_features) if 'tied', | |
(n_components, n_features) if 'diag', | |
(n_components, n_features, n_features) if 'full' | |
covariance_type : string | |
Type of the covariance parameters. Must be one of | |
'spherical', 'tied', 'diag', 'full'. Defaults to 'diag'. | |
Returns | |
------- | |
lpr : array_like, shape (n_samples, n_components) | |
Array containing the log probabilities of each data point in | |
X under each of the n_components multivariate Gaussian distributions. | |
""" | |
log_multivariate_normal_density_dict = { | |
'spherical': _log_multivariate_normal_density_spherical, | |
'tied': _log_multivariate_normal_density_tied, | |
'diag': _log_multivariate_normal_density_diag, | |
'full': _log_multivariate_normal_density_full} | |
return log_multivariate_normal_density_dict[covariance_type]( | |
X, means, covars) | |
def sample_gaussian(mean, covar, covariance_type='diag', n_samples=1, | |
random_state=None): | |
"""Generate random samples from a Gaussian distribution. | |
Parameters | |
---------- | |
mean : array_like, shape (n_features,) | |
Mean of the distribution. | |
covars : array_like, optional | |
Covariance of the distribution. The shape depends on `covariance_type`: | |
scalar if 'spherical', | |
(n_features) if 'diag', | |
(n_features, n_features) if 'tied', or 'full' | |
covariance_type : string, optional | |
Type of the covariance parameters. Must be one of | |
'spherical', 'tied', 'diag', 'full'. Defaults to 'diag'. | |
n_samples : int, optional | |
Number of samples to generate. Defaults to 1. | |
Returns | |
------- | |
X : array, shape (n_features, n_samples) | |
Randomly generated sample | |
""" | |
rng = check_random_state(random_state) | |
n_dim = len(mean) | |
rand = rng.randn(n_dim, n_samples) | |
if n_samples == 1: | |
rand.shape = (n_dim,) | |
if covariance_type == 'spherical': | |
rand *= np.sqrt(covar) | |
elif covariance_type == 'diag': | |
rand = np.dot(np.diag(np.sqrt(covar)), rand) | |
else: | |
from scipy import linalg | |
U, s, V = linalg.svd(covar) | |
sqrtS = np.diag(np.sqrt(s)) | |
sqrt_covar = np.dot(U, np.dot(sqrtS, V)) | |
rand = np.dot(sqrt_covar, rand) | |
return (rand.T + mean).T | |
class GMM(BaseEstimator): | |
"""Gaussian Mixture Model | |
Representation of a Gaussian mixture model probability distribution. | |
This class allows for easy evaluation of, sampling from, and | |
maximum-likelihood estimation of the parameters of a GMM distribution. | |
Initializes parameters such that every mixture component has zero | |
mean and identity covariance. | |
Parameters | |
---------- | |
n_components : int, optional | |
Number of mixture components. Defaults to 1. | |
covariance_type : string, optional | |
String describing the type of covariance parameters to | |
use. Must be one of 'spherical', 'tied', 'diag', 'full'. | |
Defaults to 'diag'. | |
random_state: RandomState or an int seed (0 by default) | |
A random number generator instance | |
min_covar : float, optional | |
Floor on the diagonal of the covariance matrix to prevent | |
overfitting. Defaults to 1e-3. | |
thresh : float, optional | |
Convergence threshold. | |
n_iter : int, optional | |
Number of EM iterations to perform. | |
n_init : int, optional | |
Number of initializations to perform. the best results is kept | |
params : string, optional | |
Controls which parameters are updated in the training | |
process. Can contain any combination of 'w' for weights, | |
'm' for means, and 'c' for covars. Defaults to 'wmc'. | |
init_params : string, optional | |
Controls which parameters are updated in the initialization | |
process. Can contain any combination of 'w' for weights, | |
'm' for means, and 'c' for covars. Defaults to 'wmc'. | |
Attributes | |
---------- | |
`weights_` : array, shape (`n_components`,) | |
This attribute stores the mixing weights for each mixture component. | |
`means_` : array, shape (`n_components`, `n_features`) | |
Mean parameters for each mixture component. | |
`covars_` : array | |
Covariance parameters for each mixture component. The shape | |
depends on `covariance_type`:: | |
(n_components, n_features) if 'spherical', | |
(n_features, n_features) if 'tied', | |
(n_components, n_features) if 'diag', | |
(n_components, n_features, n_features) if 'full' | |
`converged_` : bool | |
True when convergence was reached in fit(), False otherwise. | |
See Also | |
-------- | |
DPGMM : Ininite gaussian mixture model, using the dirichlet | |
process, fit with a variational algorithm | |
VBGMM : Finite gaussian mixture model fit with a variational | |
algorithm, better for situations where there might be too little | |
data to get a good estimate of the covariance matrix. | |
Examples | |
-------- | |
>>> import numpy as np | |
>>> from sklearn import mixture | |
>>> np.random.seed(1) | |
>>> g = mixture.GMM(n_components=2) | |
>>> # Generate random observations with two modes centered on 0 | |
>>> # and 10 to use for training. | |
>>> obs = np.concatenate((np.random.randn(100, 1), | |
... 10 + np.random.randn(300, 1))) | |
>>> g.fit(obs) # doctest: +NORMALIZE_WHITESPACE | |
GMM(covariance_type='diag', init_params='wmc', min_covar=0.001, | |
n_components=2, n_init=1, n_iter=100, params='wmc', | |
random_state=None, thresh=0.01) | |
>>> np.round(g.weights_, 2) | |
array([ 0.75, 0.25]) | |
>>> np.round(g.means_, 2) | |
array([[ 10.05], | |
[ 0.06]]) | |
>>> np.round(g.covars_, 2) #doctest: +SKIP | |
array([[[ 1.02]], | |
[[ 0.96]]]) | |
>>> g.predict([[0], [2], [9], [10]]) #doctest: +ELLIPSIS | |
array([1, 1, 0, 0]...) | |
>>> np.round(g.score([[0], [2], [9], [10]]), 2) | |
array([-2.19, -4.58, -1.75, -1.21]) | |
>>> # Refit the model on new data (initial parameters remain the | |
>>> # same), this time with an even split between the two modes. | |
>>> g.fit(20 * [[0]] + 20 * [[10]]) # doctest: +NORMALIZE_WHITESPACE | |
GMM(covariance_type='diag', init_params='wmc', min_covar=0.001, | |
n_components=2, n_init=1, n_iter=100, params='wmc', | |
random_state=None, thresh=0.01) | |
>>> np.round(g.weights_, 2) | |
array([ 0.5, 0.5]) | |
""" | |
def __init__(self, n_components=1, covariance_type='diag', | |
random_state=None, thresh=1e-2, min_covar=1e-3, | |
n_iter=100, n_init=1, params='wmc', init_params='wmc'): | |
self.n_components = n_components | |
self.covariance_type = covariance_type | |
self.thresh = thresh | |
self.min_covar = min_covar | |
self.random_state = random_state | |
self.n_iter = n_iter | |
self.n_init = n_init | |
self.params = params | |
self.init_params = init_params | |
if not covariance_type in ['spherical', 'tied', 'diag', 'full']: | |
raise ValueError('Invalid value for covariance_type: %s' % | |
covariance_type) | |
if n_init < 1: | |
raise ValueError('GMM estimation requires at least one run') | |
self.weights_ = np.ones(self.n_components) / self.n_components | |
# flag to indicate exit status of fit() method: converged (True) or | |
# n_iter reached (False) | |
self.converged_ = False | |
def _get_covars(self): | |
"""Covariance parameters for each mixture component. | |
The shape depends on `cvtype`:: | |
(`n_states`, 'n_features') if 'spherical', | |
(`n_features`, `n_features`) if 'tied', | |
(`n_states`, `n_features`) if 'diag', | |
(`n_states`, `n_features`, `n_features`) if 'full' | |
""" | |
if self.covariance_type == 'full': | |
return self.covars_ | |
elif self.covariance_type == 'diag': | |
return [np.diag(cov) for cov in self.covars_] | |
elif self.covariance_type == 'tied': | |
return [self.covars_] * self.n_components | |
elif self.covariance_type == 'spherical': | |
return [np.diag(cov) for cov in self.covars_] | |
def _set_covars(self, covars): | |
"""Provide values for covariance""" | |
covars = np.asarray(covars) | |
_validate_covars(covars, self.covariance_type, self.n_components) | |
self.covars_ = covars | |
def eval(self, X): | |
"""Evaluate the model on data | |
Compute the log probability of X under the model and | |
return the posterior distribution (responsibilities) of each | |
mixture component for each element of X. | |
Parameters | |
---------- | |
X: array_like, shape (n_samples, n_features) | |
List of n_features-dimensional data points. Each row | |
corresponds to a single data point. | |
Returns | |
------- | |
logprob: array_like, shape (n_samples,) | |
Log probabilities of each data point in X | |
responsibilities: array_like, shape (n_samples, n_components) | |
Posterior probabilities of each mixture component for each | |
observation | |
""" | |
X = np.asarray(X) | |
if X.ndim == 1: | |
X = X[:, np.newaxis] | |
if X.size == 0: | |
return np.array([]), np.empty((0, self.n_components)) | |
if X.shape[1] != self.means_.shape[1]: | |
raise ValueError('the shape of X is not compatible with self') | |
lpr = (log_multivariate_normal_density(X, self.means_, self.covars_, | |
self.covariance_type) | |
+ np.log(self.weights_)) | |
logprob = logsumexp(lpr, axis=1) | |
responsibilities = np.exp(lpr - logprob[:, np.newaxis]) | |
return logprob, responsibilities | |
def score(self, X): | |
"""Compute the log probability under the model. | |
Parameters | |
---------- | |
X : array_like, shape (n_samples, n_features) | |
List of n_features-dimensional data points. Each row | |
corresponds to a single data point. | |
Returns | |
------- | |
logprob : array_like, shape (n_samples,) | |
Log probabilities of each data point in X | |
""" | |
logprob, _ = self.eval(X) | |
return logprob | |
def predict(self, X): | |
"""Predict label for data. | |
Parameters | |
---------- | |
X : array-like, shape = [n_samples, n_features] | |
Returns | |
------- | |
C : array, shape = (n_samples,) | |
""" | |
logprob, responsibilities = self.eval(X) | |
return responsibilities.argmax(axis=1) | |
def predict_proba(self, X): | |
"""Predict posterior probability of data under each Gaussian | |
in the model. | |
Parameters | |
---------- | |
X : array-like, shape = [n_samples, n_features] | |
Returns | |
------- | |
responsibilities : array-like, shape = (n_samples, n_components) | |
Returns the probability of the sample for each Gaussian | |
(state) in the model. | |
""" | |
logprob, responsibilities = self.eval(X) | |
return responsibilities | |
def sample(self, n_samples=1, random_state=None): | |
"""Generate random samples from the model. | |
Parameters | |
---------- | |
n_samples : int, optional | |
Number of samples to generate. Defaults to 1. | |
Returns | |
------- | |
X : array_like, shape (n_samples, n_features) | |
List of samples | |
""" | |
if random_state is None: | |
random_state = self.random_state | |
random_state = check_random_state(random_state) | |
weight_cdf = np.cumsum(self.weights_) | |
X = np.empty((n_samples, self.means_.shape[1])) | |
rand = random_state.rand(n_samples) | |
# decide which component to use for each sample | |
comps = weight_cdf.searchsorted(rand) | |
# for each component, generate all needed samples | |
for comp in xrange(self.n_components): | |
# occurrences of current component in X | |
comp_in_X = (comp == comps) | |
# number of those occurrences | |
num_comp_in_X = comp_in_X.sum() | |
if num_comp_in_X > 0: | |
if self.covariance_type == 'tied': | |
cv = self.covars_ | |
elif self.covariance_type == 'spherical': | |
cv = self.covars_[comp][0] | |
else: | |
cv = self.covars_[comp] | |
X[comp_in_X] = sample_gaussian( | |
self.means_[comp], cv, self.covariance_type, | |
num_comp_in_X, random_state=random_state).T | |
return X | |
def fit(self, X, y = None): | |
"""Estimate model parameters with the expectation-maximization | |
algorithm. | |
A initialization step is performed before entering the em | |
algorithm. If you want to avoid this step, set the keyword | |
argument init_params to the empty string '' when creating the | |
GMM object. Likewise, if you would like just to do an | |
initialization, set n_iter=0. | |
Parameters | |
---------- | |
X : array_like, shape (n, n_features) | |
List of n_features-dimensional data points. Each row | |
corresponds to a single data point. | |
y : (optional) array, shape = [n] | |
if provided 'm' in init_params will be ignored | |
n_components and _means will be re-calculated | |
""" | |
## initialization step | |
X = np.asarray(X, dtype=np.float) | |
if X.ndim == 1: | |
X = X[:, np.newaxis] | |
if y is not None: | |
n_classes = len(np.unique(y)) | |
self.n_components = n_classes | |
if X.shape[0] < self.n_components: | |
raise ValueError( | |
'GMM estimation with %s components, but got only %s samples' % | |
(self.n_components, X.shape[0])) | |
max_log_prob = -np.infty | |
for _ in range(self.n_init): | |
if y is not None: | |
self.means_ = np.array([X[y == i, :].mean(axis=0) for i in np.unique(y)]) | |
elif 'm' in self.init_params or not hasattr(self, 'means_'): | |
self.means_ = cluster.KMeans( | |
n_clusters=self.n_components, | |
random_state=self.random_state).fit(X).cluster_centers_ | |
if 'w' in self.init_params or not hasattr(self, 'weights_'): | |
self.weights_ = np.tile(1.0 / self.n_components, | |
self.n_components) | |
if 'c' in self.init_params or not hasattr(self, 'covars_'): | |
cv = np.cov(X.T) + self.min_covar * np.eye(X.shape[1]) | |
if not cv.shape: | |
cv.shape = (1, 1) | |
self.covars_ = \ | |
distribute_covar_matrix_to_match_covariance_type( | |
cv, self.covariance_type, self.n_components) | |
# EM algorithms | |
log_likelihood = [] | |
# reset self.converged_ to False | |
self.converged_ = False | |
for i in xrange(self.n_iter): | |
# Expectation step | |
curr_log_likelihood, responsibilities = self.eval(X) | |
log_likelihood.append(curr_log_likelihood.sum()) | |
# Check for convergence. | |
if i > 0 and abs(log_likelihood[-1] - log_likelihood[-2]) < \ | |
self.thresh: | |
self.converged_ = True | |
break | |
# Maximization step | |
self._do_mstep(X, responsibilities, self.params, | |
self.min_covar) | |
# if the results are better, keep it | |
if self.n_iter: | |
if log_likelihood[-1] > max_log_prob: | |
max_log_prob = log_likelihood[-1] | |
best_params = {'weights': self.weights_, | |
'means': self.means_, | |
'covars': self.covars_} | |
# check the existence of an init param that was not subject to | |
# likelihood computation issue. | |
if np.isneginf(max_log_prob) and self.n_iter: | |
raise RuntimeError( | |
"EM algorithm was never able to compute a valid likelihood " + | |
"given initial parameters. Try different init parameters " + | |
"(or increasing n_init) or check for degenerate data.") | |
# self.n_iter == 0 occurs when using GMM within HMM | |
if self.n_iter: | |
self.covars_ = best_params['covars'] | |
self.means_ = best_params['means'] | |
self.weights_ = best_params['weights'] | |
return self | |
def _do_mstep(self, X, responsibilities, params, min_covar=0): | |
""" Perform the Mstep of the EM algorithm and return the class weihgts. | |
""" | |
weights = responsibilities.sum(axis=0) | |
weighted_X_sum = np.dot(responsibilities.T, X) | |
inverse_weights = 1.0 / (weights[:, np.newaxis] + 10 * EPS) | |
if 'w' in params: | |
self.weights_ = (weights / (weights.sum() + 10 * EPS) + EPS) | |
if 'm' in params: | |
self.means_ = weighted_X_sum * inverse_weights | |
if 'c' in params: | |
covar_mstep_func = _covar_mstep_funcs[self.covariance_type] | |
self.covars_ = covar_mstep_func( | |
self, X, responsibilities, weighted_X_sum, inverse_weights, | |
min_covar) | |
return weights | |
def _n_parameters(self): | |
"""Return the number of free parameters in the model.""" | |
ndim = self.means_.shape[1] | |
if self.covariance_type == 'full': | |
cov_params = self.n_components * ndim * (ndim + 1) / 2. | |
elif self.covariance_type == 'diag': | |
cov_params = self.n_components * ndim | |
elif self.covariance_type == 'tied': | |
cov_params = ndim * (ndim + 1) / 2. | |
elif self.covariance_type == 'spherical': | |
cov_params = self.n_components | |
mean_params = ndim * self.n_components | |
return int(cov_params + mean_params + self.n_components - 1) | |
def bic(self, X): | |
"""Bayesian information criterion for the current model fit | |
and the proposed data | |
Parameters | |
---------- | |
X : array of shape(n_samples, n_dimensions) | |
Returns | |
------- | |
bic: float (the lower the better) | |
""" | |
return (-2 * self.score(X).sum() + | |
self._n_parameters() * np.log(X.shape[0])) | |
def aic(self, X): | |
"""Akaike information criterion for the current model fit | |
and the proposed data | |
Parameters | |
---------- | |
X : array of shape(n_samples, n_dimensions) | |
Returns | |
------- | |
aic: float (the lower the better) | |
""" | |
return - 2 * self.score(X).sum() + 2 * self._n_parameters() | |
######################################################################### | |
## some helper routines | |
######################################################################### | |
def _log_multivariate_normal_density_diag(X, means=0.0, covars=1.0): | |
"""Compute Gaussian log-density at X for a diagonal model""" | |
n_samples, n_dim = X.shape | |
lpr = -0.5 * (n_dim * np.log(2 * np.pi) + np.sum(np.log(covars), 1) | |
+ np.sum((means ** 2) / covars, 1) | |
- 2 * np.dot(X, (means / covars).T) | |
+ np.dot(X ** 2, (1.0 / covars).T)) | |
return lpr | |
def _log_multivariate_normal_density_spherical(X, means=0.0, covars=1.0): | |
"""Compute Gaussian log-density at X for a spherical model""" | |
cv = covars.copy() | |
if covars.ndim == 1: | |
cv = cv[:, np.newaxis] | |
if covars.shape[1] == 1: | |
cv = np.tile(cv, (1, X.shape[-1])) | |
return _log_multivariate_normal_density_diag(X, means, cv) | |
def _log_multivariate_normal_density_tied(X, means, covars): | |
"""Compute Gaussian log-density at X for a tied model""" | |
from scipy import linalg | |
n_samples, n_dim = X.shape | |
icv = pinvh(covars) | |
lpr = -0.5 * (n_dim * np.log(2 * np.pi) + np.log(linalg.det(covars) + 0.1) | |
+ np.sum(X * np.dot(X, icv), 1)[:, np.newaxis] | |
- 2 * np.dot(np.dot(X, icv), means.T) | |
+ np.sum(means * np.dot(means, icv), 1)) | |
return lpr | |
def _log_multivariate_normal_density_full(X, means, covars, min_covar=1.e-7): | |
"""Log probability for full covariance matrices. | |
""" | |
from scipy import linalg | |
import itertools | |
if hasattr(linalg, 'solve_triangular'): | |
# only in scipy since 0.9 | |
solve_triangular = linalg.solve_triangular | |
else: | |
# slower, but works | |
solve_triangular = linalg.solve | |
n_samples, n_dim = X.shape | |
nmix = len(means) | |
log_prob = np.empty((n_samples, nmix)) | |
for c, (mu, cv) in enumerate(itertools.izip(means, covars)): | |
try: | |
cv_chol = linalg.cholesky(cv, lower=True) | |
except linalg.LinAlgError: | |
# The model is most probabily stuck in a component with too | |
# few observations, we need to reinitialize this components | |
cv_chol = linalg.cholesky(cv + min_covar * np.eye(n_dim), | |
lower=True) | |
cv_log_det = 2 * np.sum(np.log(np.diagonal(cv_chol))) | |
cv_sol = solve_triangular(cv_chol, (X - mu).T, lower=True).T | |
log_prob[:, c] = - .5 * (np.sum(cv_sol ** 2, axis=1) + | |
n_dim * np.log(2 * np.pi) + cv_log_det) | |
return log_prob | |
def _validate_covars(covars, covariance_type, n_components): | |
"""Do basic checks on matrix covariance sizes and values | |
""" | |
from scipy import linalg | |
if covariance_type == 'spherical': | |
if len(covars) != n_components: | |
raise ValueError("'spherical' covars have length n_components") | |
elif np.any(covars <= 0): | |
raise ValueError("'spherical' covars must be non-negative") | |
elif covariance_type == 'tied': | |
if covars.shape[0] != covars.shape[1]: | |
raise ValueError("'tied' covars must have shape (n_dim, n_dim)") | |
elif (not np.allclose(covars, covars.T) | |
or np.any(linalg.eigvalsh(covars) <= 0)): | |
raise ValueError("'tied' covars must be symmetric, " | |
"positive-definite") | |
elif covariance_type == 'diag': | |
if len(covars.shape) != 2: | |
raise ValueError("'diag' covars must have shape" | |
"(n_components, n_dim)") | |
elif np.any(covars <= 0): | |
raise ValueError("'diag' covars must be non-negative") | |
elif covariance_type == 'full': | |
if len(covars.shape) != 3: | |
raise ValueError("'full' covars must have shape " | |
"(n_components, n_dim, n_dim)") | |
elif covars.shape[1] != covars.shape[2]: | |
raise ValueError("'full' covars must have shape " | |
"(n_components, n_dim, n_dim)") | |
for n, cv in enumerate(covars): | |
if (not np.allclose(cv, cv.T) | |
or np.any(linalg.eigvalsh(cv) <= 0)): | |
raise ValueError("component %d of 'full' covars must be " | |
"symmetric, positive-definite" % n) | |
else: | |
raise ValueError("covariance_type must be one of " + | |
"'spherical', 'tied', 'diag', 'full'") | |
def distribute_covar_matrix_to_match_covariance_type( | |
tied_cv, covariance_type, n_components): | |
"""Create all the covariance matrices from a given template | |
""" | |
if covariance_type == 'spherical': | |
cv = np.tile(tied_cv.mean() * np.ones(tied_cv.shape[1]), | |
(n_components, 1)) | |
elif covariance_type == 'tied': | |
cv = tied_cv | |
elif covariance_type == 'diag': | |
cv = np.tile(np.diag(tied_cv), (n_components, 1)) | |
elif covariance_type == 'full': | |
cv = np.tile(tied_cv, (n_components, 1, 1)) | |
else: | |
raise ValueError("covariance_type must be one of " + | |
"'spherical', 'tied', 'diag', 'full'") | |
return cv | |
def _covar_mstep_diag(gmm, X, responsibilities, weighted_X_sum, norm, | |
min_covar): | |
"""Performing the covariance M step for diagonal cases""" | |
avg_X2 = np.dot(responsibilities.T, X * X) * norm | |
avg_means2 = gmm.means_ ** 2 | |
avg_X_means = gmm.means_ * weighted_X_sum * norm | |
return avg_X2 - 2 * avg_X_means + avg_means2 + min_covar | |
def _covar_mstep_spherical(*args): | |
"""Performing the covariance M step for spherical cases""" | |
cv = _covar_mstep_diag(*args) | |
return np.tile(cv.mean(axis=1)[:, np.newaxis], (1, cv.shape[1])) | |
def _covar_mstep_full(gmm, X, responsibilities, weighted_X_sum, norm, | |
min_covar): | |
"""Performing the covariance M step for full cases""" | |
# Eq. 12 from K. Murphy, "Fitting a Conditional Linear Gaussian | |
# Distribution" | |
n_features = X.shape[1] | |
cv = np.empty((gmm.n_components, n_features, n_features)) | |
for c in xrange(gmm.n_components): | |
post = responsibilities[:, c] | |
# Underflow Errors in doing post * X.T are not important | |
np.seterr(under='ignore') | |
avg_cv = np.dot(post * X.T, X) / (post.sum() + 10 * EPS) | |
mu = gmm.means_[c][np.newaxis] | |
cv[c] = (avg_cv - np.dot(mu.T, mu) + min_covar * np.eye(n_features)) | |
return cv | |
def _covar_mstep_tied(gmm, X, responsibilities, weighted_X_sum, norm, | |
min_covar): | |
# Eq. 15 from K. Murphy, "Fitting a Conditional Linear Gaussian | |
n_features = X.shape[1] | |
avg_X2 = np.dot(X.T, X) | |
avg_means2 = np.dot(gmm.means_.T, weighted_X_sum) | |
return (avg_X2 - avg_means2 + min_covar * np.eye(n_features)) / X.shape[0] | |
_covar_mstep_funcs = {'spherical': _covar_mstep_spherical, | |
'diag': _covar_mstep_diag, | |
'tied': _covar_mstep_tied, | |
'full': _covar_mstep_full, | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment