-
-
Save dongqunxi/c3f9273b0ef19c5c7d38 to your computer and use it in GitHub Desktop.
Try to realize the model order estimation across trials
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""Implements Partial Directed Coherence and Direct Transfer Function | |
using multi-trials' MVAR processes. | |
Reference | |
--------- | |
[1] Luiz A. Baccala and Koichi Sameshima. Partial directed coherence: | |
a new concept in neural structure determination. | |
Biological Cybernetics, 84(6):463:474, 2001. | |
[2] Mingzhou Ding, Yonghong Chen. Granger Causality: Basic Theory and Application | |
to Neuroscience.Elsevier Science, 7 February 2008. | |
""" | |
# Authors: Alexandre Gramfort <[email protected]> | |
# Dong Qunxi <[email protected]> | |
# License: BSD (3-clause) | |
import math | |
import numpy as np | |
from scipy import linalg, fftpack | |
import matplotlib.pyplot as plt | |
def mvar_generate(A, n, sigma, burnin=500): | |
"""Simulate MVAR process | |
Parameters | |
---------- | |
A : ndarray, shape (p, N, N) | |
The AR coefficients where N is the number of signals | |
and p the order of the model. | |
n : int | |
The number of time samples. | |
sigma : array, shape (N,) | |
The noise for each time series | |
burnin : int | |
The length of the burnin period (in samples). | |
Returns | |
------- | |
X : ndarray, shape (N, n) | |
The N time series of length n | |
""" | |
p, N, N = A.shape | |
A_2d = np.concatenate(A, axis=1) | |
Y = np.zeros((n + burnin, N)) | |
sigma = np.diag(sigma) | |
mu = np.zeros(N) | |
# itération du processus | |
for i in range(p, n): | |
w = np.random.multivariate_normal(mu, sigma) | |
Y[i] = np.dot(A_2d, Y[i - p:i][::-1, :].ravel()) + w | |
return Y[burnin:].T | |
def normalize_data(data, factor): | |
""" | |
1)Decrease sampling rate by integer factor n with included offset phase. | |
2)Decrease the linear trend of data across trials | |
3)Reject the temporal mean of data | |
4)Reject the ensemble mean of data | |
Parameters | |
---------- | |
data: array-like, shape=(channels, points, trials) | |
Representative STCs for each ROI. | |
factor: int | |
Factor for down sampling. | |
Return | |
------- | |
C : array | |
Normalized data | |
""" | |
#Downsampling signals based on factor | |
from scipy import signal | |
data = signal.decimate(x=data, q=factor, axis=1) | |
#Detrend the signals | |
channels, points, trials= data.shape | |
pdeg = 1 | |
pdeg = pdeg * np.ones(channels) | |
x = np.arange(points) + 1 | |
mu = np.mean(x) | |
sig = np.std(x,ddof=1) | |
x = (x - mu)/sig | |
P = np.zeros((channels,points)) | |
p = [] | |
for i in xrange(channels): | |
d = pdeg[i] | |
d1 = int(d + 1) | |
V = np.zeros((d1,points)) | |
V[d1-1, :] = np.ones(points) | |
j = int(d) | |
while j > 0: | |
V[j-1, :] = x * V[j, :] | |
j = j - 1 | |
temp = np.mean(data[i, :, :], axis=-1) | |
p.append(np.linalg.lstsq(V.T, temp)[0].T) | |
P[i, :] = p[i][0] * np.ones((1,points)) | |
for j in range(1, d1): | |
P[i, :] = x * P[i, :] + p[i][j] | |
Y = np.zeros(data.shape) | |
for r in xrange(trials): | |
Y[:, :, r] = data[:, :, r] - P | |
#Reject the temporal mean of data | |
C = Y.transpose(1, 0, 2) | |
points, channels, trials = C.shape | |
for i in xrange(channels): | |
b = C[:, i, :] | |
c = np.mean(b, axis=0) | |
# Reject the temporal mean within each trial | |
for j in xrange(points): | |
C[j, i, :] = C[j, i, :] - c | |
#Reject the ensemble mean of data | |
for i in xrange(channels): | |
b = C[:, i, :] | |
d = np.mean(b, axis=-1) | |
# Reject the ensemble mean across trails | |
for j in xrange(trials): | |
C[:, i, j] = C[:, i, j] - d | |
C = C.transpose(1, 0, 2) | |
return C | |
def cov(X, p): | |
"""vector autocovariance up to order p | |
Parameters | |
---------- | |
X : ndarray, shape (N, n) | |
The N time series of length n | |
Returns | |
------- | |
R : ndarray, shape (p + 1, N, N) | |
The autocovariance up to order p | |
""" | |
N, n = X.shape | |
R = np.zeros((p + 1, N, N)) | |
for k in range(p + 1): | |
R[k] = (1. / float(n - k)) * np.dot(X[:, :n - k], X[:, k:].T) | |
return R | |
def mvar_fit(X, p): | |
"""Fit MVAR model of order p using Yule Walker | |
Parameters | |
---------- | |
X : ndarray, shape (N, n, m) | |
The first index references variables, the second observations | |
and the third trials. | |
p : int | |
The candidate of model order. | |
Returns | |
------- | |
A : ndarray, shape (p, N, N) | |
The AR coefficients where N is the number of signals | |
and p the order of the model. | |
sigma : array, shape (N,) | |
The noise for each time series | |
""" | |
N, n, m = X.shape | |
gammas = np.zeros((m,p+1,N,N)) | |
#import pdb | |
#pdb.set_trace() | |
for i in xrange(m): | |
gammas[i] = cov(X[:,:,i], p) | |
gamma = gammas.mean(axis=0) | |
G = np.zeros((p * N, p * N)) | |
gamma2 = np.concatenate(gamma, axis=0) | |
gamma2[:N, :N] /= 2. | |
for i in range(p): | |
G[N * i:, N * i:N * (i + 1)] = gamma2[:N * (p - i)] | |
G = G + G.T # big block matrix | |
gamma4 = np.concatenate(gamma[1:], axis=0) | |
phi = linalg.solve(G, gamma4) # solve Yule Walker | |
tmp = np.dot(gamma4[:N * p].T, phi) | |
sigma = gamma[0] - tmp - tmp.T + np.dot(phi.T, np.dot(G, phi)) | |
phi = np.reshape(phi, (p, N, N)) | |
for k in range(p): | |
phi[k] = phi[k].T | |
return phi, sigma | |
def compute_order(X, p_max): | |
"""Estimate AR order with BIC | |
Parameters | |
---------- | |
X : ndarray, shape (N, n, m) | |
The first index references variables, the second observations | |
and the third trials | |
p_max : int | |
The maximum model order to test | |
Reference | |
--------- | |
[2] provides the equation:BIC(m) = 2*log[det(Σ)]+ 2*(N**2)*p*log(n * m)/(n * m), | |
Σ is the noise covariance matrix, N is the variable, p is the model order, n | |
is the trial length, m is the trials. | |
Returns | |
------- | |
p : int | |
Estimated order | |
bic : ndarray, shape (p_max + 1,) | |
The BIC for the orders from 0 to p_max. | |
""" | |
N, n, m = X.shape | |
bic = np.empty(p_max + 1) | |
bic[0] = np.inf # XXX | |
#Y = X.T | |
for p in range(1, p_max + 1): | |
print p | |
A, sigma = mvar_fit(X, p) | |
bic[p] = 2 * np.log(linalg.det(sigma)) | |
bic[p] += 2 * p * (N ** 2) * np.log(n * m) / (n * m) | |
p = np.argmin(bic) | |
return p, bic | |
def spectral_density(A, n_fft=None): | |
"""Estimate PSD from AR coefficients | |
Parameters | |
---------- | |
A : ndarray, shape (p, N, N) | |
The AR coefficients where N is the number of signals | |
and p the order of the model. | |
n_fft : int | |
The length of the FFT | |
Returns | |
------- | |
fA : ndarray, shape (n_fft, N, N) | |
The estimated spectral density. | |
""" | |
p, N, N = A.shape | |
if n_fft is None: | |
n_fft = max(int(2 ** math.ceil(np.log2(p))), 512) | |
A2 = np.zeros((n_fft, N, N)) | |
A2[1:p + 1, :, :] = A # start at 1 ! | |
fA = fftpack.fft(A2, axis=0) | |
freqs = fftpack.fftfreq(n_fft) | |
I = np.eye(N) | |
for i in range(n_fft): | |
fA[i] = linalg.inv(I - fA[i]) | |
return fA, freqs | |
def DTF(A, sigma=None, n_fft=None): | |
"""Direct Transfer Function (DTF) | |
Parameters | |
---------- | |
A : ndarray, shape (p, N, N) | |
The AR coefficients where N is the number of signals | |
and p the order of the model. | |
sigma : array, shape (N, ) | |
The noise for each time series | |
n_fft : int | |
The length of the FFT | |
Returns | |
------- | |
D : ndarray, shape (n_fft, N, N) | |
The estimated DTF | |
""" | |
p, N, N = A.shape | |
if n_fft is None: | |
n_fft = max(int(2 ** math.ceil(np.log2(p))), 512) | |
H, freqs = spectral_density(A, n_fft) | |
D = np.zeros((n_fft, N, N)) | |
if sigma is None: | |
sigma = np.ones(N) | |
for i in range(n_fft): | |
S = H[i] | |
V = (S * sigma[None, :]).dot(S.T.conj()) | |
V = np.abs(np.diag(V)) | |
D[i] = np.abs(S * np.sqrt(sigma[None, :])) / np.sqrt(V)[:, None] | |
return D, freqs | |
def PDC(A, sigma=None, n_fft=None): | |
"""Partial directed coherence (PDC) | |
Parameters | |
---------- | |
A : ndarray, shape (p, N, N) | |
The AR coefficients where N is the number of signals | |
and p the order of the model. | |
sigma : array, shape (N,) | |
The noise for each time series. | |
n_fft : int | |
The length of the FFT. | |
Returns | |
------- | |
P : ndarray, shape (n_fft, N, N) | |
The estimated PDC. | |
""" | |
p, N, N = A.shape | |
if n_fft is None: | |
n_fft = max(int(2 ** math.ceil(np.log2(p))), 512) | |
H, freqs = spectral_density(A, n_fft) | |
P = np.zeros((n_fft, N, N)) | |
if sigma is None: | |
sigma = np.ones(N) | |
for i in range(n_fft): | |
B = H[i] | |
B = linalg.inv(B) | |
V = np.abs(np.dot(B.T.conj(), B * (1. / sigma[:, None]))) | |
V = np.diag(V) # denominator squared | |
P[i] = np.abs(B * (1. / np.sqrt(sigma))[None, :]) / np.sqrt(V)[None, :] | |
return P, freqs | |
def plot_all(freqs, P, name): | |
"""Plot grid of subplots | |
""" | |
m, N, N = P.shape | |
pos_freqs = freqs[freqs >= 0] | |
f, axes = plt.subplots(N, N) | |
for i in range(N): | |
for j in range(N): | |
axes[i, j].fill_between(pos_freqs, P[freqs >= 0, i, j], 0) | |
axes[i, j].set_xlim([0, np.max(pos_freqs)]) | |
axes[i, j].set_ylim([0, 1]) | |
plt.suptitle(name) | |
plt.tight_layout() | |
if __name__ == '__main__': | |
plt.close('all') | |
# example from the paper | |
A = np.zeros((3, 5, 5)) | |
A[0, 0, 0] = 0.95 * math.sqrt(2) | |
A[1, 0, 0] = -0.9025 | |
A[1, 1, 0] = 0.5 | |
A[2, 2, 0] = -0.4 | |
A[1, 3, 0] = -0.5 | |
A[0, 3, 3] = 0.25 * math.sqrt(2) | |
A[0, 3, 4] = 0.25 * math.sqrt(2) | |
A[0, 4, 3] = -0.25 * math.sqrt(2) | |
A[0, 4, 4] = 0.25 * math.sqrt(2) | |
# simulate processes | |
n = 10 ** 3 | |
# sigma = np.array([0.0001, 1, 1, 1, 1]) | |
# sigma = np.array([0.01, 1, 1, 1, 1]) | |
sigma = np.array([1., 1., 1., 1., 1.]) | |
trials = 50 | |
Y = np.zeros((5,n,trials)) | |
for i in xrange(trials): | |
Y[:, :, i] = mvar_generate(A, n, sigma) | |
#import pdb | |
#pdb.set_trace() | |
X = normalize_data(Y, factor=1) | |
# estimate AR order with BIC | |
if 1: | |
p_max = 20 | |
p, bic = compute_order(X, p_max=p_max) | |
plt.figure() | |
plt.plot(np.arange(p_max + 1), bic) | |
plt.xlabel('order') | |
plt.ylabel('BIC') | |
plt.show() | |
else: | |
p = 3 | |
A_est, sigma = mvar_fit(X, p) | |
sigma = np.diag(sigma) # DTF + PDC support diagonal noise | |
# sigma = None | |
# compute DTF | |
D, freqs = DTF(A_est, sigma) | |
plot_all(freqs, D, 'DTF') | |
# compute PDC | |
P, freqs = PDC(A_est, sigma) | |
plot_all(freqs, P, 'PDC') | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment