Last active
March 19, 2017 06:50
-
-
Save nkt1546789/b24ecd9546479e13355a0377ffb6b286 to your computer and use it in GitHub Desktop.
A demo of Burst detection on artificial data.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import matplotlib.pyplot as plt | |
from scipy import stats | |
def generate_samples(n, n_states, p, lam): | |
A = generate_transition_matrix(n_states, p) | |
x = [] | |
y = [] | |
state = 0 | |
for rs in range(1, n+1): | |
state = stats.multinomial.rvs(1, A[state], size=1, random_state=rs).argmax() | |
x_i = stats.expon.rvs(0, 1.0 / lam[state], random_state=rs) | |
x.append(x_i) | |
y.append(state) | |
x = np.array(x) | |
y = np.array(y) | |
return x, y | |
def generate_transition_matrix(n_states, p): | |
A = np.diag([1-p] + [1-2*p for _ in range(n_states-2)] + [1-p]) | |
tmp1 = np.arange(1, n_states) | |
tmp2 = np.arange(0, n_states-1) | |
idx1 = np.concatenate([tmp1, tmp2]) | |
idx2 = np.concatenate([tmp2, tmp1]) | |
A[idx1, idx2] = p | |
return A | |
def estimate_labels(x, classes=None, transition_proba=0.001, n_states=2, max_iter=100, stopping_criterion=1e-4): | |
lam = estimate(x, n_states=n_states, max_iter=max_iter, stopping_criterion=stopping_criterion) | |
print(lam) | |
if classes is None: | |
tmp = np.arange(n_states) | |
classes = tmp[lam.argsort()] | |
print(classes) | |
P = np.log(lam) - lam * np.c_[x] | |
A = np.ones((n_states, n_states)) * transition_proba | |
A[np.arange(n_states), np.arange(n_states)] = 1.0 - transition_proba * (n_states-1) | |
A = np.log(A) | |
print(A) | |
y = viterbi(P, A) | |
return classes[y] | |
def estimate(x, n_states=2, max_iter=100, stopping_criterion=1e-4): | |
r = np.random.RandomState(1) | |
lam = np.abs(r.normal(n_states, size=n_states)) | |
r = np.random.RandomState(2) | |
alpha = np.abs(r.normal(n_states, size=n_states)) | |
for iteration in range(max_iter): | |
lam_prev = lam.copy() | |
alpha_prev = alpha.copy() | |
tmp = alpha * lam * np.exp(- np.c_[x] * lam) | |
Q = tmp / np.c_[tmp.sum(axis=1)] | |
lam = Q.sum(axis=0) / np.sum(Q * np.c_[x], axis=0) | |
alpha = Q.sum(axis=0) / Q.sum() | |
if np.sum((alpha - alpha_prev)**2) < stopping_criterion and np.sum((lam - lam_prev)**2) < stopping_criterion: | |
print("iteration:", iteration) | |
break | |
return lam | |
def viterbi(P, A): | |
""" | |
P: log probability matrix (n_samples by n_states) | |
A: log transition probability matrix (n_states by n_states) | |
""" | |
n_samples = P.shape[0] | |
n_states = P.shape[1] | |
states = np.arange(n_states) | |
V = np.zeros((n_samples, n_states)) | |
S = np.zeros((n_samples, n_states), dtype=int) | |
V[0] = P[0] | |
for t in range(1, n_samples): | |
values = V[t-1] + A | |
S[t-1] = np.argmax(values, axis=1) | |
V[t] = P[t] + np.max(values, axis=1) | |
y = np.zeros(n_samples, dtype=int) | |
y[-1] = V[-1].argmax() | |
for t in range(2, n_samples+1): | |
y[-t] = S[-t, y[-t+1]] | |
return y | |
def plot_data(x, y, window=300.0, title=""): | |
cumulative = 0 | |
bins_states = [] | |
bins = [] | |
n_states = len(np.unique(y)) | |
state_count = np.zeros(n_states, dtype=int) | |
count = 0 | |
for s_t, x_t in zip(y, x): | |
if cumulative >= window: | |
bins_states.append(state_count.argmax()) | |
state_count = np.zeros(n_states, dtype=int) | |
bins.append(count) | |
count = 0 | |
cumulative = 0 | |
count += 1 | |
cumulative += x_t | |
state_count[s_t] += 1 | |
bins = np.array(bins) | |
z = bins / bins.max() | |
bins_states = np.array(bins_states) | |
z_bins = bins_states / bins_states.max() | |
print(z_bins) | |
plt.plot(np.arange(len(z)), z, "b-") | |
plt.plot(np.arange(len(z_bins)), z_bins, "r-") | |
plt.title(title) | |
plt.ylim(0, 1.1) | |
if __name__ == '__main__': | |
n = 100000 | |
p = 0.0001 | |
n_states = 3 | |
lam = np.array([5, 50, 100]) | |
x, y = generate_samples(n, n_states, p, lam) | |
ypred = estimate_labels(x, n_states=2, transition_proba=p) | |
plt.figure() | |
plt.subplot(2, 1, 1) | |
plot_data(x, y, title="true") | |
plt.subplot(2, 1, 2) | |
plot_data(x, ypred, title="estimated") | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment