-
-
Save kastnerkyle/75483d51641a0c03bf7c to your computer and use it in GitHub Desktop.
# (C) Kyle Kastner, June 2014 | |
# License: BSD 3 clause | |
import scipy.stats as st | |
import numpy as np | |
class gmmhmm: | |
#This class converted with modifications from https://code.google.com/p/hmm-speech-recognition/source/browse/Word.m | |
def __init__(self, n_states): | |
self.n_states = n_states | |
self.random_state = np.random.RandomState(0) | |
#Normalize random initial state | |
self.prior = self._normalize(self.random_state.rand(self.n_states, 1)) | |
self.A = self._stochasticize(self.random_state.rand(self.n_states, self.n_states)) | |
self.mu = None | |
self.covs = None | |
self.n_dims = None | |
def _forward(self, B): | |
log_likelihood = 0. | |
T = B.shape[1] | |
alpha = np.zeros(B.shape) | |
for t in range(T): | |
if t == 0: | |
alpha[:, t] = B[:, t] * self.prior.ravel() | |
else: | |
alpha[:, t] = B[:, t] * np.dot(self.A.T, alpha[:, t - 1]) | |
alpha_sum = np.sum(alpha[:, t]) | |
alpha[:, t] /= alpha_sum | |
log_likelihood = log_likelihood + np.log(alpha_sum) | |
return log_likelihood, alpha | |
def _backward(self, B): | |
T = B.shape[1] | |
beta = np.zeros(B.shape); | |
beta[:, -1] = np.ones(B.shape[0]) | |
for t in range(T - 1)[::-1]: | |
beta[:, t] = np.dot(self.A, (B[:, t + 1] * beta[:, t + 1])) | |
beta[:, t] /= np.sum(beta[:, t]) | |
return beta | |
def _state_likelihood(self, obs): | |
obs = np.atleast_2d(obs) | |
B = np.zeros((self.n_states, obs.shape[1])) | |
for s in range(self.n_states): | |
#Needs scipy 0.14 | |
B[s, :] = st.multivariate_normal.pdf(obs.T, mean=self.mu[:, s].T, cov=self.covs[:, :, s].T) | |
#This function can (and will!) return values >> 1 | |
#See the discussion here for the equivalent matlab function | |
#https://groups.google.com/forum/#!topic/comp.soft-sys.matlab/YksWK0T74Ak | |
#Key line: "Probabilities have to be less than 1, | |
#Densities can be anything, even infinite (at individual points)." | |
#This is evaluating the density at individual points... | |
return B | |
def _normalize(self, x): | |
return (x + (x == 0)) / np.sum(x) | |
def _stochasticize(self, x): | |
return (x + (x == 0)) / np.sum(x, axis=1) | |
def _em_init(self, obs): | |
#Using this _em_init function allows for less required constructor args | |
if self.n_dims is None: | |
self.n_dims = obs.shape[0] | |
if self.mu is None: | |
subset = self.random_state.choice(np.arange(self.n_dims), size=self.n_states, replace=False) | |
self.mu = obs[:, subset] | |
if self.covs is None: | |
self.covs = np.zeros((self.n_dims, self.n_dims, self.n_states)) | |
self.covs += np.diag(np.diag(np.cov(obs)))[:, :, None] | |
return self | |
def _em_step(self, obs): | |
obs = np.atleast_2d(obs) | |
B = self._state_likelihood(obs) | |
T = obs.shape[1] | |
log_likelihood, alpha = self._forward(B) | |
beta = self._backward(B) | |
xi_sum = np.zeros((self.n_states, self.n_states)) | |
gamma = np.zeros((self.n_states, T)) | |
for t in range(T - 1): | |
partial_sum = self.A * np.dot(alpha[:, t], (beta[:, t] * B[:, t + 1]).T) | |
xi_sum += self._normalize(partial_sum) | |
partial_g = alpha[:, t] * beta[:, t] | |
gamma[:, t] = self._normalize(partial_g) | |
partial_g = alpha[:, -1] * beta[:, -1] | |
gamma[:, -1] = self._normalize(partial_g) | |
expected_prior = gamma[:, 0] | |
expected_A = self._stochasticize(xi_sum) | |
expected_mu = np.zeros((self.n_dims, self.n_states)) | |
expected_covs = np.zeros((self.n_dims, self.n_dims, self.n_states)) | |
gamma_state_sum = np.sum(gamma, axis=1) | |
#Set zeros to 1 before dividing | |
gamma_state_sum = gamma_state_sum + (gamma_state_sum == 0) | |
for s in range(self.n_states): | |
gamma_obs = obs * gamma[s, :] | |
expected_mu[:, s] = np.sum(gamma_obs, axis=1) / gamma_state_sum[s] | |
partial_covs = np.dot(gamma_obs, obs.T) / gamma_state_sum[s] - np.dot(expected_mu[:, s], expected_mu[:, s].T) | |
#Symmetrize | |
partial_covs = np.triu(partial_covs) + np.triu(partial_covs).T - np.diag(partial_covs) | |
#Ensure positive semidefinite by adding diagonal loading | |
expected_covs += .01 * np.eye(self.n_dims)[:, :, None] | |
self.prior = expected_prior | |
self.mu = expected_mu | |
self.covs = expected_covs | |
self.A = expected_A | |
return log_likelihood | |
def fit(self, obs, n_iter=15): | |
#Support for 2D and 3D arrays | |
#2D should be n_features, n_dims | |
#3D should be n_examples, n_features, n_dims | |
#For example, with 6 features per speech segment, 105 different words | |
#this array should be size | |
#(105, 6, X) where X is the number of frames with features extracted | |
#For a single example file, the array should be size (6, X) | |
if len(obs.shape) == 2: | |
for i in range(n_iter): | |
self._em_init(obs) | |
log_likelihood = self._em_step(obs) | |
elif len(obs.shape) == 3: | |
count = obs.shape[0] | |
for n in range(count): | |
for i in range(n_iter): | |
self._em_init(obs[n, :, :]) | |
log_likelihood = self._em_step(obs[n, :, :]) | |
return self | |
def transform(self, obs): | |
#Support for 2D and 3D arrays | |
#2D should be n_features, n_dims | |
#3D should be n_examples, n_features, n_dims | |
#For example, with 6 features per speech segment, 105 different words | |
#this array should be size | |
#(105, 6, X) where X is the number of frames with features extracted | |
#For a single example file, the array should be size (6, X) | |
if len(obs.shape) == 2: | |
B = self._state_likelihood(obs) | |
log_likelihood, _ = self._forward(B) | |
return log_likelihood | |
elif len(obs.shape) == 3: | |
count = obs.shape[0] | |
out = np.zeros((count,)) | |
for n in range(count): | |
B = self._state_likelihood(obs[n, :, :]) | |
log_likelihood, _ = self._forward(B) | |
out[n] = log_likelihood | |
return out | |
if __name__ == "__main__": | |
rstate = np.random.RandomState(0) | |
t1 = np.ones((4, 40)) + .001 * rstate.rand(4, 40) | |
t1 /= t1.sum(axis=0) | |
t2 = rstate.rand(*t1.shape) | |
t2 /= t2.sum(axis=0) | |
m1 = gmmhmm(2) | |
m1.fit(t1) | |
m2 = gmmhmm(2) | |
m2.fit(t2) | |
m1t1 = m1.transform(t1) | |
m2t1 = m2.transform(t1) | |
print "Likelihoods for test set 1" | |
print "M1:",m1t1 | |
print "M2:",m2t1 | |
print "Prediction for test set 1" | |
print "Model", np.argmax([m1t1, m2t1]) + 1 | |
m1t2 = m1.transform(t2) | |
m2t2 = m2.transform(t2) | |
print "Likelihoods for test set 2" | |
print "M1:",m1t2 | |
print "M2:",m2t2 | |
print "Prediction for test set 2" | |
print "Model", np.argmax([m1t2, m2t2]) + 1 |
Hi kastnerkyle,
I found that the python code above is a GaussianHMM instead of a GMMHMM as the emission distribution for one dimension has only one center, so there is no "mixture". I wonder if it is right. Thank you.
Hi,
I think this is NOT a GMM-HMM, the code has no "mixture" of multiple Gaussian components for each state, but rather a multivariate Gaussian for each state. Thanks.
I Think there is something wrong with this declaration
partial_sum = self.A * np.dot(alpha[:, t], (beta[:, t] * B[:, t + 1]).T)
I think it should be
partial_sum = self.A * np.dot(alpha[:, t], (beta[:, t+1] * B[:, t + 1]).T)
Please tell me what am I missing if I am wrong with this declaration.
I am new to statistical analysis. I will give a detailed description of my problem as follows: I have a 4D data set as follows:
ObjectID | The number feature of every Object | The length of every feature | Types |
---|---|---|---|
1 | 100 | 64 | 1 |
2 | 100 | 64 | 1 |
3 | 100 | 64 | 2 |
4 | 100 | 64 | 3 |
··· | ···· | ····· |
The shape likes (objID, num_feature of every Object, len_feature, Types),e.g.(274500,100,64,10). How should I use hmmlearn or pomegranate HMM for recognition on these 4D data?
This is NOT a GMMHMM; it is GaussianHMM.
This is NOT a GMMHMM; it is GaussianHMM.
Right
@kbagalo
You might want to look at the help of HMMLEARN for this purpose: https://github.com/hmmlearn/hmmlearn/blob/master/doc/tutorial.rst
Just copy paste of the thing you need (if I understood well):
Working with multiple sequences
All of the examples so far were using a single sequence of observations. The input format in the case of multiple sequences is a bit involved and is best understood by example.
Consider two 1D sequences:
To pass both sequences to :meth:
~base._BaseHMM.fit
or :meth:~base._BaseHMM.predict
first concatenate them into a single array and then compute an array of sequence lengths:Finally just call the desired method with X and lengths: