Skip to content

Instantly share code, notes, and snippets.

@alexlib
Forked from swang225/kalman_filter.py
Created February 18, 2022 14:20
Show Gist options
  • Save alexlib/6f97132a2fcacb8f20dda74be811e47b to your computer and use it in GitHub Desktop.
Save alexlib/6f97132a2fcacb8f20dda74be811e47b to your computer and use it in GitHub Desktop.
Kalman Filter
import numpy as np
import pandas as pd
# dropping tennis ball
def problem1():
dt = 1
F = np.matrix([[1, dt, dt**2/2],
[0, 1, dt],
[0, 0, 1]])
G = np.matrix([[dt ** 2 / 2],
[dt],
[1]])
sa = 1
Q = G.dot(G.transpose()) * (sa ** 2)
H = np.matrix([1, 0, 0])
# estimate
sz = 1
R = sz ** 2
# initial values
x0 = np.matrix([[20000],
[0],
[-9.81]]) # gravity
vc0 = np.matrix([[0, 0, 0],
[0, 0, 0],
[0, 0, 0]])
return sa, sz, F, G, Q, H, R, x0, vc0
def xk(F, xk1, G, sa):
_xk = F.dot(xk1) + G * np.random.normal(loc=0, scale=sa)
return _xk
def zk(xk, H, sz):
_zk = H.dot(xk) + np.random.normal(loc=0, scale=sz)
return _zk
def pxk_k1(F, pxk1):
''' predict x(k) given predicted x(k-1)'''
_pxk = F.dot(pxk1)
return _pxk
def pvck_k1(F, pvck1, Q):
''' predict vc(k) given predicted vc(k-1)'''
_pvck = F.dot(pvck1.dot(F.transpose())) + Q
return _pvck
def gaink(pvck, H, R):
# scale applied to gain, if noise is high, gain smaller
S = H.dot(pvck.dot(H.transpose())) + R
def inverse(A):
if isinstance(A, np.matrix):
return np.linalg.inv(A)
return 1/A
# calc gain
_gk = pvck.dot(H.transpose().dot(inverse(S)))
return _gk
def uxk(pxk, H, zk, gk):
_yk = zk - H.dot(pxk)
_uxk = pxk + gk.dot(_yk)
_uyk = zk - H.dot(_uxk)
return _uxk, _uyk
def uvck(pvck, gk, H):
_uvck = (np.identity(len(pvck)) - gk.dot(H)).dot(pvck)
return _uvck
def test001(problem):
sa, sz, F, G, Q, H, R, x0, vc0 = problem()
# generate experiment
x = [x0]
z = [H.dot(x0)]
t_range = 100
for i in range(1, t_range + 1):
_xk = xk(F, x[-1], G, sa)
if _xk[0].item() < 0:
t_range = i - 1
break
x.append(_xk)
z.append(zk(x[-1], H, sz))
df_x = pd.DataFrame(data={'t': range(t_range + 1),
'x': [v[0].item() for v in x]}).set_index('t')
df_z = pd.DataFrame(data={'t': range(t_range + 1),
'z': [v.item() if v is not None else None for v in z]}).set_index('t')
# for each observation frequency, generate prediction
freqs = [5, 10, 50]
df_pxs = []
for freq in freqs:
pvc = [vc0] # predicted vc matrix
px = [x0] # predicted position
uvc = [vc0] # updated vc matrix
ux = [x0] # updated position
uy = [np.matrix(0)] # updated difference between prediction and observation
for i in range(1, t_range + 1):
_pxk = pxk_k1(F, ux[-1])
px.append(_pxk)
_pvck = pvck_k1(F, uvc[-1], Q)
pvc.append(_pvck)
_gk = gaink(_pvck, H, R)
if i % freq != 0:
ux.append(_pxk)
uy.append(None)
uvc.append(_pvck)
continue
_uxk, _uyk = uxk(_pxk, H, z[i], _gk)
ux.append(_uxk)
uy.append(_uyk)
_uvck = uvck(_pvck, _gk, H)
uvc.append(_uvck)
df_px = pd.DataFrame(data={'t': range(t_range + 1),
'predicted ' + str(freq): [v[0].item() for v in px]}).set_index('t')
df_pxs.append(df_px)
df = pd.concat([df_x, df_z] + df_pxs, axis=1)
return df
df = test001(problem1)
print(df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment