-
-
Save kastnerkyle/75483d51641a0c03bf7c to your computer and use it in GitHub Desktop.
| # (C) Kyle Kastner, June 2014 | |
| # License: BSD 3 clause | |
| import scipy.stats as st | |
| import numpy as np | |
| class gmmhmm: | |
| #This class converted with modifications from https://code.google.com/p/hmm-speech-recognition/source/browse/Word.m | |
| def __init__(self, n_states): | |
| self.n_states = n_states | |
| self.random_state = np.random.RandomState(0) | |
| #Normalize random initial state | |
| self.prior = self._normalize(self.random_state.rand(self.n_states, 1)) | |
| self.A = self._stochasticize(self.random_state.rand(self.n_states, self.n_states)) | |
| self.mu = None | |
| self.covs = None | |
| self.n_dims = None | |
| def _forward(self, B): | |
| log_likelihood = 0. | |
| T = B.shape[1] | |
| alpha = np.zeros(B.shape) | |
| for t in range(T): | |
| if t == 0: | |
| alpha[:, t] = B[:, t] * self.prior.ravel() | |
| else: | |
| alpha[:, t] = B[:, t] * np.dot(self.A.T, alpha[:, t - 1]) | |
| alpha_sum = np.sum(alpha[:, t]) | |
| alpha[:, t] /= alpha_sum | |
| log_likelihood = log_likelihood + np.log(alpha_sum) | |
| return log_likelihood, alpha | |
| def _backward(self, B): | |
| T = B.shape[1] | |
| beta = np.zeros(B.shape); | |
| beta[:, -1] = np.ones(B.shape[0]) | |
| for t in range(T - 1)[::-1]: | |
| beta[:, t] = np.dot(self.A, (B[:, t + 1] * beta[:, t + 1])) | |
| beta[:, t] /= np.sum(beta[:, t]) | |
| return beta | |
| def _state_likelihood(self, obs): | |
| obs = np.atleast_2d(obs) | |
| B = np.zeros((self.n_states, obs.shape[1])) | |
| for s in range(self.n_states): | |
| #Needs scipy 0.14 | |
| B[s, :] = st.multivariate_normal.pdf(obs.T, mean=self.mu[:, s].T, cov=self.covs[:, :, s].T) | |
| #This function can (and will!) return values >> 1 | |
| #See the discussion here for the equivalent matlab function | |
| #https://groups.google.com/forum/#!topic/comp.soft-sys.matlab/YksWK0T74Ak | |
| #Key line: "Probabilities have to be less than 1, | |
| #Densities can be anything, even infinite (at individual points)." | |
| #This is evaluating the density at individual points... | |
| return B | |
| def _normalize(self, x): | |
| return (x + (x == 0)) / np.sum(x) | |
| def _stochasticize(self, x): | |
| return (x + (x == 0)) / np.sum(x, axis=1) | |
| def _em_init(self, obs): | |
| #Using this _em_init function allows for less required constructor args | |
| if self.n_dims is None: | |
| self.n_dims = obs.shape[0] | |
| if self.mu is None: | |
| subset = self.random_state.choice(np.arange(self.n_dims), size=self.n_states, replace=False) | |
| self.mu = obs[:, subset] | |
| if self.covs is None: | |
| self.covs = np.zeros((self.n_dims, self.n_dims, self.n_states)) | |
| self.covs += np.diag(np.diag(np.cov(obs)))[:, :, None] | |
| return self | |
| def _em_step(self, obs): | |
| obs = np.atleast_2d(obs) | |
| B = self._state_likelihood(obs) | |
| T = obs.shape[1] | |
| log_likelihood, alpha = self._forward(B) | |
| beta = self._backward(B) | |
| xi_sum = np.zeros((self.n_states, self.n_states)) | |
| gamma = np.zeros((self.n_states, T)) | |
| for t in range(T - 1): | |
| partial_sum = self.A * np.dot(alpha[:, t], (beta[:, t] * B[:, t + 1]).T) | |
| xi_sum += self._normalize(partial_sum) | |
| partial_g = alpha[:, t] * beta[:, t] | |
| gamma[:, t] = self._normalize(partial_g) | |
| partial_g = alpha[:, -1] * beta[:, -1] | |
| gamma[:, -1] = self._normalize(partial_g) | |
| expected_prior = gamma[:, 0] | |
| expected_A = self._stochasticize(xi_sum) | |
| expected_mu = np.zeros((self.n_dims, self.n_states)) | |
| expected_covs = np.zeros((self.n_dims, self.n_dims, self.n_states)) | |
| gamma_state_sum = np.sum(gamma, axis=1) | |
| #Set zeros to 1 before dividing | |
| gamma_state_sum = gamma_state_sum + (gamma_state_sum == 0) | |
| for s in range(self.n_states): | |
| gamma_obs = obs * gamma[s, :] | |
| expected_mu[:, s] = np.sum(gamma_obs, axis=1) / gamma_state_sum[s] | |
| partial_covs = np.dot(gamma_obs, obs.T) / gamma_state_sum[s] - np.dot(expected_mu[:, s], expected_mu[:, s].T) | |
| #Symmetrize | |
| partial_covs = np.triu(partial_covs) + np.triu(partial_covs).T - np.diag(partial_covs) | |
| #Ensure positive semidefinite by adding diagonal loading | |
| expected_covs += .01 * np.eye(self.n_dims)[:, :, None] | |
| self.prior = expected_prior | |
| self.mu = expected_mu | |
| self.covs = expected_covs | |
| self.A = expected_A | |
| return log_likelihood | |
| def fit(self, obs, n_iter=15): | |
| #Support for 2D and 3D arrays | |
| #2D should be n_features, n_dims | |
| #3D should be n_examples, n_features, n_dims | |
| #For example, with 6 features per speech segment, 105 different words | |
| #this array should be size | |
| #(105, 6, X) where X is the number of frames with features extracted | |
| #For a single example file, the array should be size (6, X) | |
| if len(obs.shape) == 2: | |
| for i in range(n_iter): | |
| self._em_init(obs) | |
| log_likelihood = self._em_step(obs) | |
| elif len(obs.shape) == 3: | |
| count = obs.shape[0] | |
| for n in range(count): | |
| for i in range(n_iter): | |
| self._em_init(obs[n, :, :]) | |
| log_likelihood = self._em_step(obs[n, :, :]) | |
| return self | |
| def transform(self, obs): | |
| #Support for 2D and 3D arrays | |
| #2D should be n_features, n_dims | |
| #3D should be n_examples, n_features, n_dims | |
| #For example, with 6 features per speech segment, 105 different words | |
| #this array should be size | |
| #(105, 6, X) where X is the number of frames with features extracted | |
| #For a single example file, the array should be size (6, X) | |
| if len(obs.shape) == 2: | |
| B = self._state_likelihood(obs) | |
| log_likelihood, _ = self._forward(B) | |
| return log_likelihood | |
| elif len(obs.shape) == 3: | |
| count = obs.shape[0] | |
| out = np.zeros((count,)) | |
| for n in range(count): | |
| B = self._state_likelihood(obs[n, :, :]) | |
| log_likelihood, _ = self._forward(B) | |
| out[n] = log_likelihood | |
| return out | |
| if __name__ == "__main__": | |
| rstate = np.random.RandomState(0) | |
| t1 = np.ones((4, 40)) + .001 * rstate.rand(4, 40) | |
| t1 /= t1.sum(axis=0) | |
| t2 = rstate.rand(*t1.shape) | |
| t2 /= t2.sum(axis=0) | |
| m1 = gmmhmm(2) | |
| m1.fit(t1) | |
| m2 = gmmhmm(2) | |
| m2.fit(t2) | |
| m1t1 = m1.transform(t1) | |
| m2t1 = m2.transform(t1) | |
| print "Likelihoods for test set 1" | |
| print "M1:",m1t1 | |
| print "M2:",m2t1 | |
| print "Prediction for test set 1" | |
| print "Model", np.argmax([m1t1, m2t1]) + 1 | |
| m1t2 = m1.transform(t2) | |
| m2t2 = m2.transform(t2) | |
| print "Likelihoods for test set 2" | |
| print "M1:",m1t2 | |
| print "M2:",m2t2 | |
| print "Prediction for test set 2" | |
| print "Model", np.argmax([m1t2, m2t2]) + 1 |
Hi kastnerkyle,
I found that the python code above is a GaussianHMM instead of a GMMHMM as the emission distribution for one dimension has only one center, so there is no "mixture". I wonder if it is right. Thank you.
Hi,
I think this is NOT a GMM-HMM, the code has no "mixture" of multiple Gaussian components for each state, but rather a multivariate Gaussian for each state. Thanks.
I Think there is something wrong with this declaration
partial_sum = self.A * np.dot(alpha[:, t], (beta[:, t] * B[:, t + 1]).T)
I think it should be
partial_sum = self.A * np.dot(alpha[:, t], (beta[:, t+1] * B[:, t + 1]).T)
Please tell me what am I missing if I am wrong with this declaration.
I am new to statistical analysis. I will give a detailed description of my problem as follows: I have a 4D data set as follows:
| ObjectID | The number feature of every Object | The length of every feature | Types |
|---|---|---|---|
| 1 | 100 | 64 | 1 |
| 2 | 100 | 64 | 1 |
| 3 | 100 | 64 | 2 |
| 4 | 100 | 64 | 3 |
| ··· | ···· | ····· |
The shape likes (objID, num_feature of every Object, len_feature, Types),e.g.(274500,100,64,10). How should I use hmmlearn or pomegranate HMM for recognition on these 4D data?
This is NOT a GMMHMM; it is GaussianHMM.
This is NOT a GMMHMM; it is GaussianHMM.
Right
@kbagalo
You might want to look at the help of HMMLEARN for this purpose: https://github.com/hmmlearn/hmmlearn/blob/master/doc/tutorial.rst
Just copy paste of the thing you need (if I understood well):
Working with multiple sequences
All of the examples so far were using a single sequence of observations. The input format in the case of multiple sequences is a bit involved and is best understood by example.
Consider two 1D sequences:
To pass both sequences to :meth:
~base._BaseHMM.fitor :meth:~base._BaseHMM.predictfirst concatenate them into a single array and then compute an array of sequence lengths:Finally just call the desired method with X and lengths: