Created
November 6, 2017 04:01
-
-
Save superjax/16f1019f79071cb7d18d5712545f1718 to your computer and use it in GitHub Desktop.
EKF_SLAM implementation from Probabilistic Robotics by Sebastian Thrun et al. (look for the plot helper gist for covariance plotting)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import scipy.linalg | |
import scipy.stats | |
import matplotlib.pyplot as plt | |
import scipy.io | |
import scipy.sparse | |
from plot_helper import plot_cov_ellipse | |
from tqdm import tqdm | |
def R(theta): | |
return np.array([[np.cos(theta), -np.sin(theta)], | |
[np.sin(theta), np.cos(theta)]]) | |
class Robot(): | |
def __init__(self, x0, L, alphas, Q): | |
self.x = x0 | |
self.alphas = alphas | |
self.l = L | |
self.Q = Q | |
def propagate(self, u, dt): | |
v = u[0] + np.random.randn()*(self.alphas[0]*u[0]*u[0] + self.alphas[1]*u[1]*u[1])**0.5 | |
w = u[1] + np.random.randn()*(self.alphas[2]*u[0]*u[0] + self.alphas[3]*u[1]*u[1])**0.5 | |
theta = self.x[2] | |
self.x[0] += v/w * (np.sin(theta + w*dt) - np.sin(theta)) | |
self.x[1] += v/w * (np.cos(theta) - np.cos(theta + w*dt)) | |
self.x[2] += w*dt | |
if self.x[2] > np.pi: | |
self.x[2] -= 2*np.pi | |
elif self.x[2] < -np.pi: | |
self.x[2] += 2*np.pi | |
return self.x.copy() | |
def get_measurement(self): | |
dx = self.l - np.tile(self.x, (len(self.l),1))[:,0:2] | |
r = scipy.linalg.norm(dx, axis=1) + np.random.randn(len(self.l))*self.Q[0][0]**0.5 | |
theta = (np.arctan2(dx[:,1], dx[:,0]) - self.x[2] + np.random.randn(len(self.l))*self.Q[1][1]**0.5).flatten() | |
theta[theta > np.pi] -= 2 * np.pi | |
theta[theta <= -np.pi] += 2 * np.pi | |
r[np.abs(theta) > 90.0 * np.pi / 180.0] = np.nan | |
theta[np.abs(theta) > 90.0*np.pi/180.0] = np.nan | |
return r.copy(), theta.copy(), self.Q.copy() | |
def plot(self): | |
plt.ion() | |
plt.figure(99) | |
plt.clf() | |
ax = plt.subplot(111) | |
heading_line_end_point = self.x[0:2] + np.array([0.6, 0]).dot(R(self.x[2]).T) | |
circle = plt.Circle((self.x[1], self.x[0]), 0.5, color='y', alpha=0.7) | |
ax.add_artist(circle) | |
ax.plot([self.x[1], heading_line_end_point[1]], | |
[self.x[0], heading_line_end_point[0]], linewidth=2) | |
ax.plot(self.l[:,1], self.l[:,0], 'kx', markersize=10) | |
plt.axis('equal') | |
plt.show() | |
plt.pause(0.001) | |
class Controller(): | |
def control(self, t): | |
u = np.array([1 + 0.6*np.cos(2.*np.pi*0.2*t), | |
-0.2 + 2.0*np.cos(2.*np.pi*0.6*t)]) | |
return u | |
class EKF(): | |
def __init__(self, x0, alphas, P0_feat): | |
self.x = x0 | |
self.P = 0.1*np.eye(len(x0)) | |
self.feat_P0 = P0_feat | |
self.alphas = alphas | |
self.num_features = 0 | |
def add_feature(self, pos): | |
self.num_features += 1 | |
self.x = np.concatenate((self.x, pos), axis=0) | |
self.P = scipy.linalg.block_diag(self.P, self.feat_P0) | |
def propagate(self, u, dt): | |
v = u[0] | |
w = u[1] | |
Q_u = np.array( [[self.alphas[0]*u[0]*u[0], self.alphas[1]*u[1]*u[1]], | |
[self.alphas[2]*u[0]*u[0], self.alphas[3]*u[1]*u[1]]]) | |
theta = self.x[2] | |
ct = np.cos(theta) | |
st = np.sin(theta) | |
st_pw = np.sin(theta + w*dt) | |
ct_pw = np.cos(theta + w*dt) | |
self.x[0] += -v/w*st + v/w*st_pw | |
self.x[1] += v/w*ct - v/w*ct_pw | |
self.x[2] += w*dt | |
# wrap psi from pi to -pi | |
if self.x[2] > np.pi: | |
self.x[2] -= 2.*np.pi | |
elif self.x[2] <= - np.pi: | |
self.x[2] += 2.*np.pi | |
G = np.eye(3+2*self.num_features) | |
G[0:2,2] = np.array([-v/w*ct + v/w*ct_pw, | |
-v/w*st + v/w*st_pw]) | |
F = np.zeros((3+2*self.num_features, 2)) | |
F[0:3,0:2] = np.array([[1./w * (-st + st_pw), v/(w*w) * (st - st_pw) + v/w * ct_pw*dt], | |
[1./w * (ct-ct_pw), -v/(w*w)*(ct - ct_pw) + v/w * (st_pw*dt)], | |
[0., dt]]) | |
Qx = [0, 0, 0] | |
Qx.extend([0.001 for i in range(2 * self.num_features)]) | |
self.P = G.dot(self.P).dot(G.T) + F.dot(Q_u).dot(F.T) + np.diag(Qx) | |
return self.x.copy(), self.P.copy() | |
def update(self, r, theta, R): | |
est_landmarks = np.reshape(self.x[3:], (2,self.num_features), order='F') | |
dx = est_landmarks - self.x[0:2,None] | |
rhat = scipy.linalg.norm(dx, axis=0) | |
q = rhat*rhat | |
that = np.arctan2(dx[1], dx[0]) - self.x[2] | |
zhat = np.vstack((rhat, that)) | |
z = np.vstack((r, theta)) | |
residual = z - zhat | |
residual[1, residual[1, :] > np.pi] -= 2. * np.pi | |
residual[1, residual[1, :] <= -np.pi] += 2. * np.pi | |
residual[residual > 0.05] = 0.05 | |
residual[residual < -0.05] = -0.05 | |
for l in range(self.num_features): | |
if not (np.isfinite(r[l]) and np.isfinite(theta[l])): | |
continue | |
H = np.zeros((2, 3+2*self.num_features)) | |
H[0:2,0:2] = 1/q[l]*np.array([[-rhat[l]*dx[0,l], -rhat[l]*dx[1,l]], | |
[dx[1,l], -dx[0,l]]]) | |
H[0:2,3+2*l:5+2*l] = 1/q[l]*np.array([[rhat[l]*dx[0, l], rhat[l]*dx[1,l]], | |
[-dx[1, l], dx[0, l]]]) | |
K = self.P.dot(H.T).dot(scipy.linalg.inv(H.dot(self.P).dot(H.T) + R)) | |
self.x += K.dot(residual[:,l]) | |
self.P = (np.eye(3+2*self.num_features) - K.dot(H)).dot(self.P) | |
return zhat | |
def run(x0, L, alphas, Q, P0_feat): | |
robot = Robot(x0.copy(), L.copy(), alphas.copy(), Q) | |
controller = Controller() | |
filter = EKF(x0.copy(), alphas.copy(), P0_feat) | |
dt = 0.1 | |
tm = np.arange(0, 40, dt) | |
truth = np.nan*np.ones((len(tm), 3)) | |
estimate = np.nan*np.ones((len(tm),3 + 2*len(L))) | |
cov = np.nan*np.ones((len(tm),3+2*len(L), 3+2*len(L))) | |
range_measurements = np.nan*np.ones((len(tm), len(L))) | |
bearing_measurements = np.nan*np.ones((len(tm), len(L))) | |
zhats = np.nan*np.ones((len(tm), 2, len(L))) | |
for i in range(len(L)): | |
filter.add_feature(L[i] + np.random.multivariate_normal(np.zeros(2), P0_feat)) | |
for j, t in tqdm(enumerate(tm)): | |
u = controller.control(t) | |
truth[j] = robot.propagate(u, dt) | |
r, bearing, Q = robot.get_measurement() | |
range_measurements[j] = r | |
bearing_measurements[j] = bearing | |
# robot.plot() | |
xhat, P = filter.propagate(u, dt) | |
cov[j] = P | |
estimate[j] = xhat | |
zhat = filter.update(r, bearing, Q) | |
zhats[j] = zhat | |
if j % 10 == 0: | |
plt.ion() | |
plt.figure(1) | |
plt.clf() | |
plt.plot(robot.l[:, 1], robot.l[:, 0], 'kx', markersize=10) | |
for l in range(len(L)): | |
Xi = 3+2*l | |
plot_cov_ellipse(np.fliplr(np.flipud(cov[j,Xi:Xi+2,Xi:Xi+2])), np.flipud(estimate[j,Xi:Xi+2]), nstd=2, alpha=0.5) | |
plt.plot(truth[:, 1], truth[:, 0], 'b-', label='truth') | |
plt.plot(estimate[:, 1], estimate[:, 0], 'r-', label='estimate') | |
plot_cov_ellipse(np.fliplr(np.flipud(cov[j, 0:2, 0:2])), np.flipud(estimate[j, 0:2]), nstd=2, alpha=0.5) | |
plt.axis('equal') | |
plt.legend() | |
plt.show() | |
plt.savefig("images/"+str(j).zfill(5) + ".png") | |
plt.pause(0.005) | |
plt.ioff() | |
plt.figure(2) | |
plt.clf() | |
for i in range(3): | |
plt.subplot(3, 1, i+1) | |
plt.plot(tm, truth[:, i], label="truth") | |
plt.plot(tm, estimate[:, i], label="estimate") | |
plt.plot(tm, estimate[:,i] + 2.*np.sqrt(cov[:,i,i]), 'k', alpha=0.5) | |
plt.plot(tm, estimate[:, i] - 2. * np.sqrt(cov[:, i, i]), 'k', alpha=0.5) | |
if j == 0 and i == 0: | |
plt.legend() | |
plt.figure(5) | |
plt.clf() | |
for i in range(3): | |
plt.subplot(3, 1, i+1) | |
plt.plot(tm, np.abs(truth[:, i] - estimate[:, i]), label="error") | |
plt.plot(tm, 2.*np.sqrt(cov[:,i,i]), 'k', alpha=0.5) | |
if i == 0: | |
plt.legend() | |
plt.figure(3) | |
plt.clf() | |
for i in range(3): | |
plt.subplot(3, 1, i+1) | |
plt.plot(tm, np.abs(range_measurements[:, i] - zhats[:, 0, i]), label="range_error") | |
plt.plot(tm, np.abs(bearing_measurements[:, i] - zhats[:, 1, i]), label="bearing_error") | |
plt.plot(tm, 2. * np.sqrt(cov[:, 3+i, 3+i]), 'k', alpha=0.5) | |
if i == 0: | |
plt.legend() | |
plt.figure(4) | |
plt.clf() | |
for i in range(len(L)): | |
plt.subplot(len(L), 1, i + 1) | |
plt.plot([tm[0], tm[-1]], [L[i,0], L[i,0]], label="truth_x") | |
plt.plot(tm, estimate[:,3 + i], label="est_x") | |
plt.plot([tm[0], tm[-1]], [L[i,1], L[i,1]], label="truth_y") | |
plt.plot(tm, estimate[:, 4 + i], label="est_y") | |
# plt.ion() | |
plt.show() | |
# plt.pause(0.005) | |
# plt.ioff() | |
if __name__ == '__main__': | |
x0 = np.array([0, 0, 90.*np.pi/180.]) | |
L = np.array([[6.,4.], | |
[-7.,8.], | |
[6., -4.], | |
[-8., -10.], | |
[5., -3.], | |
[-1., 6.], | |
[0., -2.]]) | |
L = np.random.uniform(-20, 20, (25, 2)) | |
alphas = np.array([0.001, 0.001, 0.001, 0.001]) | |
Q = np.diag([0.0005, 0.0005]) | |
P0_feat = np.diag([0.1, 0.1]) | |
run(x0, L, alphas, Q, P0_feat) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://gist.github.com/superjax/16f1019f79071cb7d18d5712545f1718#file-ekf_slam-py-L92 is incorrect it should be more like (see page 204)