Skip to content

Instantly share code, notes, and snippets.

@lan496
Created May 17, 2016 06:16
Show Gist options
  • Select an option

  • Save lan496/0f1c0f8b01cb75a59568bcb421c0ef09 to your computer and use it in GitHub Desktop.

Select an option

Save lan496/0f1c0f8b01cb75a59568bcb421c0ef09 to your computer and use it in GitHub Desktop.
import numpy as np
import matplotlib.pyplot as plt
#################################################
F = 8.0
J = 40 # site
dt = 0.05 # 6 hours
dt_forcast = 0.05
delta_TLM = 1e-9
alpha = 3.0 ** 2 # P^a_0 = alpha * I
#################################################
def RMSE(x_o, x_t):
return np.linalg.norm(x_o - x_t) / np.sqrt(20)
def lorenz96(x):
# flaot, np.array -> np.array
v = np.array([(x[(i + 1) % J] - x[i - 2]) * x[i - 1] - x[i] + F for i in np.arange(J)])
return v
def runge_kutta(f, x, h):
k1 = f(x)
k2 = f(x + k1 * 0.5 * h)
k3 = f(x + k2 * 0.5 * h)
k4 = f(x + k3 * h)
return x + h / 6.0 * (k1 + 2.0 * k2 + 2.0 * k3 + k4)
def M(x):
# np.array -> np.array
res = x.copy()
step = int(dt / dt_forcast)
for i in np.arange(step):
res = runge_kutta(lorenz96, res, dt_forcast)
return res
def TLM(x):
# np.array(J) -> np.array((J, J))
M_j = []
for j in np.arange(J):
e_j = np.zeros(J)
e_j[j] = 1.0
M_j.append([(M(x + delta_TLM * e_j) - M(x)) / delta_TLM])
res = np.vstack(tuple(M_j[j] for j in np.arange(J)))
return res.T
def H(x):
# np.array(J) -> np.array(J)
return x.copy()
def H_jacobi(x):
# np.array(J) -> np.array((J, J))
return np.identity(J)
def kalman_filter(x_f, P_f, y_o, R, inflation):
# np.array(J), np.array((J, J)), np.array(J), np.array((J, J))
# -> np.array(J), np.array(J)
# Hj_f = H_jacobi(x_f)
A = R + P_f
# A = R + Hj_f.dot(P_f.dot(Hj_f.T))
K = P_f.dot(np.linalg.inv(A))
# K = P_f.dot((Hj_f.T).dot(np.linalg.inv(A)))
x_a = x_f + np.dot(K, y_o - x_f)
# x_a = x_f + np.dot(K, y_o - H(x_f))
P_a = (np.identity(J) - K).dot(P_f)
# P_a = (np.identity(J) - K.dot(Hj_f)).dot(P_f)
Mj_a = TLM(x_a)
x_f_next = M(x_a)
P_f_next = Mj_a.dot(P_a.dot(Mj_a.T)) * (1.0 + inflation)
# P_f_next = np.identity(J)
return x_f_next, P_f_next
def test(x_f_0, P_f_0, R, ylst_o, inflation):
x_f = [x_f_0]
P_f = [P_f_0]
for i in np.arange(len(ylst_o)):
x_f_next, P_f_next = kalman_filter(x_f[i], P_f[i], ylst_o[i], R, inflation)
x_f.append(x_f_next)
P_f.append(P_f_next)
print inflation, i
return x_f, P_f
def delta_plot():
f1 = open("raw_data.txt", "r")
xlst_t = []
for line in f1:
x_t = map(float, line.split())
xlst_t.append(np.array(x_t))
f1.close()
f2 = open("observational_data.txt", "r")
xlst_o = []
for line in f2:
x_o = map(float, line.split())
xlst_o.append(np.array(x_o))
f2.close()
# x_f_0 = np.array([F for i in np.arange(J)])
x_f_0 = xlst_o[200]
P_f_0 = alpha * np.identity(J)
R = 1.0 * np.identity(J)
x_f_if00, P_f_if00 = test(x_f_0, P_f_0, R, xlst_o, 0.0)
x_f_if10, P_f_if10 = test(x_f_0, P_f_0, R, xlst_o, 0.1)
x_f_if20, P_f_if20 = test(x_f_0, P_f_0, R, xlst_o, 0.20)
x_f_if30, P_f_if30 = test(x_f_0, P_f_0, R, xlst_o, 0.30)
x_f_if40, P_f_if40 = test(x_f_0, P_f_0, R, xlst_o, 0.40)
x_f_if50, P_f_if50 = test(x_f_0, P_f_0, R, xlst_o, 0.50)
plt.xlabel('time')
plt.ylabel('RMSE')
# plt.ylabel('$\mathrm{Tr} P^f$')
plt.title('$x^f$ RMSE')
m = len(xlst_t)
plt.plot([RMSE(x_f_if00[i], xlst_t[i]) for i in np.arange(m)], label='Delta=0.0')
plt.plot([RMSE(x_f_if10[i], xlst_t[i]) for i in np.arange(m)], label='Delta=0.10')
plt.plot([RMSE(x_f_if20[i], xlst_t[i]) for i in np.arange(m)], label='Delta=0.20')
plt.plot([RMSE(x_f_if30[i], xlst_t[i]) for i in np.arange(m)], label='Delta=0.30')
plt.plot([RMSE(x_f_if40[i], xlst_t[i]) for i in np.arange(m)], label='Delta=0.40')
plt.plot([RMSE(x_f_if50[i], xlst_t[i]) for i in np.arange(m)], label='Delta=0.50')
# plt.plot([np.trace(P_f[i]) for i in np.arange(50)])
plt.yscale('log')
plt.legend()
plt.show()
def main():
f1 = open("raw_data.txt", "r")
xlst_t = []
for line in f1:
x_t = map(float, line.split())
xlst_t.append(np.array(x_t))
f1.close()
f2 = open("observational_data.txt", "r")
xlst_o = []
for line in f2:
x_o = map(float, line.split())
xlst_o.append(np.array(x_o))
f2.close()
# x_f_0 = np.array([F for i in np.arange(J)])
x_f_0 = xlst_o[200]
P_f_0 = alpha * np.identity(J)
R = 1.0 * np.identity(J)
x_f, P_f = test(x_f_0, P_f_0, R, xlst_o, 0.2)
plt.plot([RMSE(x_f[i], xlst_t[i]) for i in np.arange(len(x_f))], label='Delta=0.0')
plt.show()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment