Created
August 28, 2017 09:26
-
-
Save giuseppebonaccorso/8102b09ab4de87d6c540d5fe058ad3cf to your computer and use it in GitHub Desktop.
Fixed-delay smoothing in HMM
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from Queue import Queue | |
class HMM: | |
def __init__(self, transition_matrix, observation_matrix, delay, initial_state): | |
self.TM = transition_matrix | |
self.OM = observation_matrix | |
self.delay = delay | |
self.initial_state = np.array(initial_state) | |
self.events = Queue() | |
self.t = 0 | |
self.B = np.ndarray(self.TM.shape) | |
self.forward = np.dot(self.initial_state, self.TM) | |
self.backward = np.dot(self.initial_state, self.TM) | |
def predict_state(self, event): | |
self.events.put_nowait(event) | |
# Compute diagonal matrix with current event | |
Ot = self.compute_O_matrix(event) | |
# Perform fixed-delay smoothing | |
if self.t > self.delay: | |
self.forward = self.compute_forward(Ot) | |
etp = self.events.get_nowait() | |
Otp = self.compute_O_matrix(etp) | |
self.B = reduce(np.dot, [np.linalg.inv(Otp), np.linalg.inv(self.TM), self.B, self.TM, Ot]) | |
else: | |
self.B = reduce(np.dot, [self.B, self.TM, Ot]) | |
self.t += 1 | |
if self.t > self.delay: | |
return self.normalize(np.dot(self.forward, self.B)) | |
else: | |
return None | |
def reset(self): | |
self.events = Queue() | |
self.t = 0 | |
self.B = np.ndarray(self.TM.shape) | |
self.forward = np.dot(self.initial_state, self.TM) | |
self.backward = np.dot(self.initial_state, self.TM) | |
def compute_O_matrix(self, event): | |
return np.diagflat(self.OM[event]) | |
def compute_forward(self, Ot): | |
return reduce(np.dot, [Ot, self.TM.T, self.forward.T]) | |
def compute_backward(self, Ot): | |
return reduce(np.dot, [self.TM, Ot, self.backward.T]) | |
@staticmethod | |
def normalize(x): | |
return x / np.sum(x) | |
def log_example(steps=50): | |
print('Log example') | |
# Transition probability matrix | |
# T(i,j) = P(Xt=j | Xt-1=i) | |
T = np.array([[0.6, 0.3, 0.1], | |
[0.4, 0.4, 0.2], | |
[0.1, 0.4, 0.5]]) | |
T_labels = ('Ok', 'Some issues', 'Out of order') | |
# Observation probability matrix | |
# O(i,j) = P(event | Xt=i) | |
O = np.array([[0.8, 0.15, 0.05], | |
[0.15, 0.7, 0.1], | |
[0.05, 0.2, 0.75]]) | |
O_labels = ('Green', 'Yellow', 'Red') | |
hmm = HMM(transition_matrix=T, observation_matrix=O, delay=1, initial_state=[0.5, 0.5, 0.5]) | |
# Prediction | |
for i in range(1, steps): | |
observation = np.random.randint(0, len(O_labels)) | |
prediction = hmm.predict_state(observation) | |
max_value = int(np.argmax(prediction)) | |
if prediction is not None: | |
print( | |
'O%d: %s -> %s (%1.3f%%)' % (i, O_labels[observation], T_labels[max_value], prediction[max_value] * 100)) | |
if __name__ == '__main__': | |
print('HMM simulation') | |
#rain_example() | |
log_example() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment