Created
March 15, 2016 16:16
-
-
Save rjw57/b6d31dbad8d5239fafdb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Track observations outputted by reproject-obs.py | |
Usage: | |
track-reproj-observations.py [options] <observations> <output> | |
Options: | |
--max-frame=FRAME Only process up to frame FRAME. | |
<observations> is a npz file generated by scripts/reproject-obs.py | |
<output> is the .MAT file to write the output to. | |
The output has a single variable, smoothed, where each row is the following: | |
- frame index | |
- track index | |
- world x | |
- world y | |
- world x velocity | |
- world y velocity | |
- image space width | |
- image space height | |
""" | |
from __future__ import division, print_function | |
import itertools | |
from collections import namedtuple | |
import docopt | |
import numpy as np | |
import scipy.io as sio | |
Track = namedtuple('Track', | |
'state_estimates state_covariances observations birth_time') | |
def as_square_array(X): | |
"""A utility function to return X massaged into a square array. Raises ValueError if | |
X cannot be so massaged. | |
""" | |
X = np.atleast_2d(X) | |
if len(X.shape) != 2 or X.shape[0] != X.shape[1]: | |
raise ValueError('Expected square matrix') | |
return X | |
# ## The Kalman filter | |
# | |
# We make use of the standard Kalman filter to both interpolate missing | |
# observations for a given track and predict observations for the next frame. | |
# | |
# Let's have a brief reminder of the Kalman filter. At each time instant, $k$, | |
# we assert that there is an *a priori* estimate of state, $\hat{x}_{k|k-1}$, | |
# and state estimate covariance, $P_{k|k-1}$. The initial values of these | |
# parameters are given when the Kalman filter is initialised. Our goal is to | |
# form an *a posteriori* estimate of state, $\hat{x}_{k|k}$, and state estimate | |
# covariance, $P_{k|k}$ given an observation, $z_k$. | |
# | |
# ### *A Priori* Prediction | |
# | |
# At time $k$ we are given a state transition matrix, $F_k$, and estimate of the | |
# *process noise*, $Q_k$. Our *a priori* estimates are then given by: | |
# | |
# $$ \hat{x}_{k|k-1} = F_k \hat{x}_{k-1|k-1}, \quad P_{k|k-1} = F_k P_{k-1|k-1} | |
# F_k^T + Q_k. $$ | |
# | |
# ### Innovation | |
# | |
# At time $k$ we are given a matrix, $H_k$, which gives the "true" observation | |
# given the state and some estimate of the observation noise covariance, $R_k$. | |
# We may now compute the innovation, $y_k$, of the observation from the | |
# predicted observation and our expected innovation covariance, $S_k$: | |
# | |
# $$ y_k = z_k - H_k \hat{x}_{k|k-1}, \quad S_k = H_k P_{k|k-1} H_k^T + R_k. $$ | |
# | |
# ### Update | |
# | |
# We now merge our observation with the state estimate via the so-called *Kalman | |
# gain*, $K_k$: | |
# | |
# $$ K_k = P_{k|k-1} H_k^T S_k^{-1}. $$ | |
# | |
# Merging is straightforward. Note that if we have no observation, our *a | |
# posteriori* estimate reduces to the *a priori* one: | |
# | |
# $$ \hat{x}_{k|k} = \hat{x}_{k|k-1} + K_k y_k, \quad P_{k|k} = P_{k|k-1} - K_k | |
# H_k P_{k|k-1}. $$ | |
# We implement the Kalman filter as a class which maintains a list of *a priori* | |
# and *a posteriori* state estimates and state estimate covariances. Each filter | |
# has a ``predict`` and ``update`` method corresponding to the steps above. | |
# Additionally we add a convenience ``innovation_squared_distance`` methof which | |
# returns the squared Euclidean innovation magnitudes for a set of candidate | |
# observations normalised by the inverse of the innovation covariance. | |
class KalmanFilter(object): | |
def __init__(self, initial_mean, initial_covariance, birth_time=0): | |
self.prior_state_means = [np.copy(initial_mean)] | |
self.prior_state_covariances = [as_square_array(np.copy(initial_covariance))] | |
self.posterior_state_means = [self.prior_state_means[-1]] | |
self.posterior_state_covariances = [self.prior_state_covariances[-1]] | |
# Used by RTS smoother first transition matrix corresponding to initial | |
# mean and covariance is meaningless. Add placeholder of None. | |
self.process_matrices = [None] | |
self.process_noises = [None] | |
# The observations list holds records of observations associated with the filter at | |
# time k. If no observations are recorded, this will be an empty list. Otherwise it is | |
# a list of observation, observation_matrix, observation_covariance triples. | |
self.observations = [[]] | |
self.birth_time = birth_time | |
self.last_observation_idx = None # Index at which the last observation was associated | |
def predict(self, process_matrix, process_noise): | |
"""Predict the next prior state mean and covariance given the last | |
posterior. After calling this method, prior_state_{means,covariances} | |
has an element appended. | |
""" | |
process_matrix = as_square_array(process_matrix) | |
process_noise = as_square_array(process_noise) | |
if process_matrix.shape[0] != process_noise.shape[0]: | |
raise ValueError("Process matrix and noise have incompatible shapes: {} vs {}".format( | |
process_matrix.shape, process_noise.shape)) | |
# Update state mean and covariance | |
self.prior_state_means.append(process_matrix.dot(self.posterior_state_means[-1])) | |
self.prior_state_covariances.append( | |
process_matrix.dot(self.posterior_state_covariances[-1]).dot(process_matrix.T) + | |
process_noise | |
) | |
# Record transition matrix | |
self.process_matrices.append(process_matrix) | |
self.process_noises.append(process_noise) | |
# Append empty list to observations for this time step | |
self.observations.append([]) | |
# Seed posterior estimates with *copies* of the prior ones | |
self.posterior_state_means.append(np.copy(self.prior_state_means[-1])) | |
self.posterior_state_covariances.append(np.copy(self.prior_state_covariances[-1])) | |
def update(self, observation, observation_matrix, observation_noise): | |
"""After each predict(), update() may be called repeatedly to provide additional observations | |
for each time step. | |
""" | |
observation_matrix = np.atleast_2d(observation_matrix) | |
observation_noise = as_square_array(observation_noise) | |
observation = np.atleast_1d(observation) | |
expected_obs_mat_shape = (observation_noise.shape[0], self.state_dimension) | |
if observation_matrix.shape != expected_obs_mat_shape: | |
raise ValueError('Observation matrix is wrong shape ({}). Expected: {}'.format( | |
observation_matrix.shape, expected_obs_mat_shape)) | |
if observation.shape != (observation_noise.shape[0],): | |
raise ValueError('Observation is wrong shape ({}). Expected: {}'.format( | |
observation.shape, (observation_noise.shape[0],) | |
)) | |
# Add observation triple to list | |
self.observations[-1].append((observation, observation_matrix, observation_noise)) | |
self.last_observation_idx = len(self.observations) - 1 | |
# "Prior" in this case means "before we've updated with this observation". | |
prior_mean = np.copy(self.posterior_state_means[-1]) | |
prior_covariance = np.copy(self.posterior_state_covariances[-1]) | |
# Can compute innovation covariance & Kalman gain without an observation | |
innovation = observation - observation_matrix.dot(prior_mean) | |
innovation_cov = observation_matrix.dot(prior_covariance).dot(observation_matrix.T) | |
innovation_cov += observation_noise | |
kalman_gain = prior_covariance.dot(observation_matrix.T).dot(np.linalg.inv(innovation_cov)) | |
# Update estimates | |
self.posterior_state_means[-1] += kalman_gain.dot(innovation) | |
self.posterior_state_covariances[-1] -= kalman_gain.dot(observation_matrix).dot( | |
prior_covariance) | |
@property | |
def last_idx(self): | |
return len(self.posterior_state_means) | |
@property | |
def state_dimension(self): | |
return self.prior_state_means[-1].shape[0] | |
@property | |
def observation_count(self): | |
return sum(len(o) for o in self.observations) | |
def innovation_squared_distance(self, observation_matrix, observation_noises, observations): | |
""" | |
Given a NxOBS_DIM array of observations, return normalised innovation | |
squared distances from the current prior mean. The observation_noises | |
matrix should be an NxOBS_DIMxOBS_DIM array giving observation | |
covariances for each observation | |
""" | |
observation_matrix = np.atleast_2d(observation_matrix) | |
observation_noises = np.atleast_3d(observation_noises) | |
expected_obs_mat_shape = (observation_noises.shape[1], self.state_dimension) | |
if observation_matrix.shape != expected_obs_mat_shape: | |
raise ValueError('Observation matrix is wrong shape ({}). Expected: {}'.format( | |
observation_matrix.shape, expected_obs_mat_shape)) | |
observations = np.atleast_2d(observations) | |
if observations.shape[1] != observation_noises.shape[1]: | |
raise ValueError('Observation of unexpected length {}. Expected: {}'.format( | |
observations.shape[1], observation_noises.shape[1])) | |
# Compute current posterior observation mean and covariance. | |
# Note: if update() has not yet been called, these are just the a priori estimates | |
posterior_obs_mean = observation_matrix.dot(self.posterior_state_means[-1]) | |
posterior_obs_cov = observation_matrix.dot(self.posterior_state_covariances[-1]).dot( | |
observation_matrix.T) | |
innovation_dists = [] | |
for observation_noise, observation in zip(observation_noises, observations): | |
observation_noise = as_square_array(observation_noise) | |
# Add in any observation noise | |
innovation_cov = posterior_obs_cov + observation_noise | |
innovation_cov_inv = np.linalg.inv(innovation_cov) | |
# Compute innovations | |
innovation = observation - posterior_obs_mean | |
innovation_dists.append(innovation.dot(innovation_cov_inv).dot(innovation)) | |
return np.array(innovation_dists) | |
# ### Merging of filters | |
# | |
# During tracking, some filters may end up converging on the same underlying | |
# car. This is usually due to multiple overlapping observations being generated | |
# by one true car. We counter this by merging tracks which "collide". Merging is | |
# done by creating a new filter from an existing filter by "replaying" the | |
# observations and predict steps. | |
def merge_kalman_filters(a, b, initial_mean, initial_covariance): | |
"""Merge KalmanFilter instances a and b into a single KalmanFilter. Return | |
the merged filter. | |
""" | |
if a.state_dimension != b.state_dimension: | |
raise ValueError('Filters have differing state dimension, {} vs {}'.format( | |
a.state_dimension, b.state_dimension | |
)) | |
initial_mean = np.atleast_1d(initial_mean) | |
if a.state_dimension != initial_mean.shape[0]: | |
raise ValueError('Initial mean has incorrect dimension {}. Expected: {}'.format( | |
initial_mean.shape[0], a.state_dimension | |
)) | |
# Start at minimum birth time | |
birth_time = min(a.birth_time, b.birth_time) | |
# Create new filter | |
new_f = KalmanFilter(initial_mean, initial_covariance, birth_time) | |
# Start moving through time | |
for t in itertools.count(birth_time): | |
# Work out which filters this time corresponds to | |
a_k, b_k = t - a.birth_time, t - b.birth_time | |
within_a = a_k >= 0 and a_k < a.last_idx | |
within_b = b_k >= 0 and b_k < b.last_idx | |
# If we've left both filters, return | |
if not within_a and not within_b: | |
return new_f | |
# If this is not the first time step, predict a priori states for | |
# new filter | |
if t > birth_time: | |
F = np.zeros((new_f.state_dimension, new_f.state_dimension)) | |
Q = np.zeros((new_f.state_dimension, new_f.state_dimension)) | |
nF = 0 | |
if within_a and a_k > 0: | |
F += a.process_matrices[a_k] | |
Q += a.process_noises[a_k] | |
nF += 1 | |
if within_b and b_k > 0: | |
F += b.process_matrices[b_k] | |
Q += b.process_noises[b_k] | |
nF += 1 | |
assert nF >= 1 | |
F /= nF | |
Q /= nF | |
new_f.predict(F, Q) | |
# Now simply update with a and b observations | |
all_obs = [] | |
if within_a: | |
all_obs.extend(a.observations[a_k]) | |
if within_b: | |
all_obs.extend(b.observations[b_k]) | |
for obs, obs_mat, obs_noise in all_obs: | |
new_f.update(obs, obs_mat, obs_noise) | |
# Should never be reached | |
assert False | |
# ## Tracking | |
# | |
# Setting the state transition and observation matrices is straightforward. | |
# There's a degree of finger-in-the-air estimation for the process and | |
# observation noises. | |
STATE_DIM = OBS_DIM = 6 | |
# State and observation are x, y, vx, vy, w, h | |
F = np.array([ | |
[1, 0, 1, 0, 0, 0], | |
[0, 1, 0, 1, 0, 0], | |
[0, 0, 1, 0, 0, 0], | |
[0, 0, 0, 1, 0, 0], | |
[0, 0, 0, 0, 1, 0], | |
[0, 0, 0, 0, 0, 1], | |
]) | |
# All state vector elements are directly observed | |
H = np.eye(OBS_DIM) | |
# We calculate process and observation noise based on the current state and observation | |
def process_noise_for_state(state): | |
# The process noise is assumed to be small (except for w and h which could | |
# do their own thing really) | |
v_mag = np.hypot(state[2], state[3]) | |
v_mag = min(0.2, v_mag) | |
w = state[4] | |
Q = np.diag([1e-1, 1e-1, 3e-4 + 1e-1*v_mag, 3e-4 + 1e-1*v_mag, | |
1e0 + 1e-2*w, 1e0 + 1e-2*w])**2 | |
return Q | |
def measurement_noise_for_observation(obs, camera_at): | |
# The measurement noise is somewhat loosely estimated. | |
v_mag = np.hypot(obs[2], obs[3]) | |
v_mag = min(0.2, v_mag) | |
dist = np.sqrt(np.sum(np.square(camera_at - np.array([obs[0], obs[1], 0])))) | |
w = obs[4] | |
R = np.diag([1e-1 + 1e-1*dist, 1e-1 + 1e-1*dist, 1e-2 + 1e-1*v_mag, 1e-2 + 1e-1*v_mag, | |
1e1 + 1e-1*w, 1e1 + 1e-1*w])**2 | |
return R | |
# Overlap radius (in car-plane) which tracks should be merged | |
MERGE_RADIUS = 0.8 | |
# Maximum number of frames a filter may live with no observations | |
MAX_GAP = 4 | |
# ### Scott and Longuet-Higgins feature association algorithm | |
# | |
# Each filter will predict which observation should be generated in the next | |
# frame. We need an algorithm to associate actual observations with predicted | |
# ones. The Scott and Longuet-Higgins (SLH) algorithm is a fairly neat method | |
# which requires only that we can construct some distance metric for candidate | |
# association pairs. Fortunately this is exactly what the innovation covariance | |
# in the Kalman filter gives us. | |
# | |
# When tracking we will form a Gaussian-weighted proximity matrix, $G$, where | |
# each element is given by $G_{ij} = \exp(-d_{ij}^2)$. The value $d_{ij}^2$ is | |
# the squared innovation distance returned by ``innovation_squared_distance``. | |
# These distances are already normalised with the inverse of the innovation | |
# covariance. | |
# | |
# The SLH algorithm proceeds by taking the SVD, $U \, S \, V^H = G$. The | |
# diagonal matrix $S$ contains the singular values along the diagonal. A similar | |
# matrix $E$ is created which is like $S$ except that the non-zero singular | |
# values are set to unity. The matrix $P = U \, E \, V^H$ is then computed. | |
# | |
# This has the effect of "amplifying" the proximities of closely associated | |
# pairs in a global way. | |
def slh_association_matrix(G): | |
"""Perform the association step of the Scott and Longuet-Higgins algorithm. | |
G is a Gaussian weighted proximity matrix where G_ij is the Gaussian | |
weighted proximity of feature i to feature j. Returns an "amplified" | |
proximity matrix, P, of the same shape as G where feature i is associated | |
with feature j iff P_ij is the maximal value in its row and column. | |
Source: http://www.bmva.org/bmvc/1997/papers/081/node3.htm | |
""" | |
# Take SVD of G | |
U, S, Vh = np.linalg.svd(G) | |
# How many non-zero singular values? | |
nsv = np.count_nonzero(S) | |
# Trim U and Vh to match this number | |
U, Vh = U[:, :nsv], Vh[:nsv, :] | |
# Compute new matrix P as if singular values were unity | |
P = U.dot(Vh) | |
return P | |
# Forming associations given the $P$ matrix is simple; we associate feature $i$ | |
# and $j$ is $P_{ij}$ is the maximal value in both its row and column. We can | |
# write a simple function to extract associations. | |
def slh_associations(P): | |
"""Given an association matrix P return a Nx2 matrix of associations where | |
each row is an i,j pair asserting an asssociation of features i and j. | |
""" | |
associations = [] | |
# Compute column-wise maxima | |
col_max_idxs = np.argmax(P, axis=0) | |
# For each row of P... | |
for row_idx, row in enumerate(P): | |
# ... compute index of maximum element | |
col_idx = np.argmax(row) | |
# Is this row also the maximum in that column? | |
if col_max_idxs[col_idx] == row_idx: | |
associations.append((row_idx, col_idx)) | |
if len(associations) == 0: | |
return np.zeros((0, 2)) | |
return np.vstack(associations) | |
# When tracking we may get into a situation where two filters are tracking the | |
# same underlying car with each filter getting one N-th of the observations. To | |
# avoid this we add the ability to merge tracks to the tracking. Two tracks, A | |
# and B, should be merged if *all* of the following hold: | |
# | |
# 1. The separation of the cars in the traffic-plane is within some merge radius. | |
# 2. The separation of A's mean w.r.t. B is large. | |
# 3. The separation of B's mean w.r.t. A is large. | |
# How are traffic-plane observations taken | |
MERGE_OBSERVATION_MATRIX = np.array([ | |
[1, 0, 0, 0, 0, 0], | |
[0, 1, 0, 0, 0, 0], | |
]) | |
def should_merge_filters(fa, fb, merge_radius=1, sigma_radius=3, | |
merge_obs_matrix=MERGE_OBSERVATION_MATRIX): | |
merge_obs_matrix = np.atleast_2d(merge_obs_matrix) | |
a_obs = merge_obs_matrix.dot(fa.posterior_state_means[-1]) | |
b_obs = merge_obs_matrix.dot(fb.posterior_state_means[-1]) | |
# Observations must be within merge radius | |
if np.sum(np.square(a_obs - b_obs)) > merge_radius * merge_radius: | |
return False | |
a_cov = fa.posterior_state_covariances[-1] | |
b_cov = fb.posterior_state_covariances[-1] | |
# State difference w.r.t. covariances | |
delta = fa.posterior_state_means[-1] - fb.posterior_state_means[-1] | |
delta = delta.dot(np.linalg.inv(a_cov + b_cov)).dot(delta) | |
# Deltas must be outside of sigma_radius | |
if delta <= sigma_radius*sigma_radius: | |
return False | |
return True | |
def track(observations_by_frame, min_frame, max_frame, | |
camera_at, max_gap, merge_radius): | |
# The tracking itself maintains a set of "live" tracks which have had an | |
# observation associated with them in the past ``max_gap`` frames. Tracks | |
# which don't get an observation within that time are considered to have | |
# "died". | |
# | |
# For each live filter, the normalised squared innovation distances to each | |
# observation is computed. From this matrix, the SLH $G$ and $P$ matrices | |
# are computed. The $P$ matrix is used to form filter, observation | |
# associations. | |
# | |
# For each association which is within a threshold distance, the filter is | |
# associated with the obsersvation and the ``update`` method is called. | |
# Filters which do not get an associated observation are updated with | |
# missing data. Observations with no filter associations are used to | |
# initialise new live filters. | |
# | |
# Finally, any live filter which has not received an observation for | |
# ``max_gap`` frames is killed. | |
# Maximum Gaussian-weighted distance for an observation pair which is | |
# considered associated | |
gaussian_dist_threshold = np.exp(-4*4) | |
# Various parameters updated during tracking | |
live_filters, dead_filters = set(), set() | |
state_dim = F.shape[0] | |
init_mean, init_cov = np.zeros(state_dim), np.diag(1e10 * np.ones(state_dim)) | |
meas_noise_cb = lambda obs: measurement_noise_for_observation(obs, camera_at) | |
for frame_idx in range(min_frame, max_frame): | |
# Predict observations for all live filters | |
for f in live_filters: | |
f.predict(F, process_noise_for_state(f.posterior_state_means[-1])) | |
# Get observations for this frame | |
frame_observations = observations_by_frame.get(frame_idx, np.zeros((0,))) | |
# Covert live filters to an ordered list | |
live_filter_list = list(live_filters) | |
# If we have some current filters and some current observations... | |
if len(live_filter_list) > 0 and frame_observations.shape[0] > 0: | |
# ...we implement the Scott and Longuet-Higgins algorithm for tracking. | |
# Compute matrix of Gaussian weighted proximities, G | |
G = np.exp(-np.vstack([ | |
f.innovation_squared_distance( | |
H, | |
np.vstack([meas_noise_cb(o)[np.newaxis, ...] | |
for o in frame_observations]), | |
frame_observations | |
) | |
for f in live_filter_list | |
])) | |
# Compute SLH association matrix | |
P = slh_association_matrix(G) | |
# Compute associations | |
associations = slh_associations(P) | |
# Filter associations by whether G is sufficiently large | |
associations = [ | |
(f_idx, o_idx) for f_idx, o_idx in associations | |
if G[f_idx, o_idx] > gaussian_dist_threshold | |
] | |
else: | |
# We have no associations | |
associations = [] | |
# Now process associations | |
filters_with_no_observation = set(range(len(live_filter_list))) | |
observations_with_no_filter = set(range(frame_observations.shape[0])) | |
for f_idx, o_idx in associations: | |
f = live_filter_list[f_idx] | |
o = frame_observations[o_idx] | |
f.update(o, H, meas_noise_cb(o)) | |
# This filter now has an association | |
filters_with_no_observation.remove(f_idx) | |
observations_with_no_filter.remove(o_idx) | |
# Create new filters for orphan observations | |
for o_idx in observations_with_no_filter: | |
new_f = KalmanFilter(init_mean, init_cov, frame_idx) | |
new_f.update(frame_observations[o_idx], H, | |
meas_noise_cb(frame_observations[o_idx])) | |
live_filters.add(new_f) | |
# Update ordered list of live filters | |
live_filter_list = list(live_filters) | |
# Now consider pairwise filters. We merge filters which overlap. | |
live_filters = set() | |
removed_filters = set() | |
for fa_idx, fa in enumerate(live_filter_list): | |
if fa in removed_filters: | |
continue | |
to_merge = set() | |
for fb in live_filter_list[fa_idx+1:]: | |
if fb in removed_filters: | |
continue | |
if should_merge_filters(fa, fb, merge_radius=merge_radius): | |
to_merge.add(fb) | |
removed_filters.add(fb) | |
for fb in to_merge: | |
fa = merge_kalman_filters(fa, fb, init_mean, init_cov) | |
live_filters.add(fa) | |
# Kill any filters which have no observations for the past max_gap frames | |
to_kill = set( | |
f for f in live_filters | |
if f.last_idx - f.last_observation_idx > max_gap | |
) | |
live_filters.difference_update(to_kill) | |
dead_filters.update(to_kill) | |
return live_filters, dead_filters | |
# ## Rauch–Tung–Striebel smoothing | |
# | |
# The Rauch–Tung–Striebel (RTS) smoother provides a method of computing the "all | |
# data" *a posteriori* estimate of states (as opposed to the "all previous data" | |
# estimate). Assuming there are $n$ time points in the filter, then the RTS | |
# computes the *a posteriori* state estimate at time $k$ after all the data for | |
# $n$ time steps are known, $\hat{x}_{k|n}$, and corresponding covariance, | |
# $P_{k|n}$, recursively: | |
# | |
# $$ \hat{x}_{k|n} = \hat{x}_{k|k} + C_k ( \hat{x}_{k+1|n} - \hat{x}_{k+1|k} ), | |
# \quad P_{k|n} = P_{k|k} + C_k ( P_{k+1|n} - P_{k+1|k} ) C_k^T $$ | |
# | |
# with $C_k = P_{k|k} F^T_{k+1} P_{k+1|k}^{-1}$. | |
def rts_smooth(kf, n=None): | |
"""Taking an instance of KalmanFilter, return a (states, state_covariances) | |
pair. states is a NxSTATE_DIM array of smoothed state estimates and | |
state_covariances is a NxSTATE_DIMxSTATE_DIM array of state estimate | |
covariances. | |
If N is not specified, it defaults to the last index with an observation. | |
""" | |
state_dim = kf.state_dimension | |
n_states = kf.last_observation_idx + 1 | |
states = np.nan * np.ones((n_states, state_dim)) | |
state_covariances = np.nan * np.ones((n_states, state_dim, state_dim)) | |
# Special case: asked for no states | |
if n_states == 0: | |
return states, state_covariances | |
# Initialise with final posterior estimate | |
states[-1, ...] = kf.posterior_state_means[-1] | |
state_covariances[-1, ...] = kf.posterior_state_covariances[-1] | |
# Work backwards from final state | |
for k in range(n_states-2, -1, -1): | |
F = kf.process_matrices[k+1] | |
C = kf.posterior_state_covariances[k].dot(F.T).dot( | |
np.linalg.inv(kf.prior_state_covariances[k+1]) | |
) | |
# Calculate smoothed state and covariance | |
states[k, ...] = kf.posterior_state_means[k] + \ | |
C.dot(states[k+1, ...] - kf.prior_state_means[k+1]) | |
state_covariances[k, ...] = kf.posterior_state_covariances[k] + C.dot( | |
state_covariances[k+1, ...] - kf.prior_state_covariances[k+1] | |
).dot(C.T) | |
return states, state_covariances | |
def main(): | |
opts = docopt.docopt(__doc__) | |
input_file = opts['<observations>'] | |
obs_dict = np.load(input_file) | |
observations = obs_dict['observations'] | |
camera_at = obs_dict['camera_at'] | |
observations_by_frame = dict( | |
(k, np.array(list(v))[:, 1:7]) | |
for k, v in itertools.groupby(observations, lambda o: o[0]) | |
) | |
frames_with_observations = np.asarray(list(observations_by_frame.keys())).astype(np.int) | |
min_frame = int(observations[:, 0].min()) | |
max_frame = int(observations[:, 0].max()) | |
if opts['--max-frame'] is not None: | |
max_frame = int(opts['--max-frame']) | |
live_filters, dead_filters = track(observations_by_frame, | |
min_frame=min_frame, max_frame=max_frame, | |
camera_at=camera_at, | |
max_gap=5, merge_radius=1.0) | |
print('After tracking, {} live filters and {} dead ones'.format( | |
len(live_filters), len(dead_filters))) | |
# Turn filters into tracks | |
tracks = list() | |
for f in dead_filters.union(live_filters): | |
t_observations = f.observations[:f.last_observation_idx] | |
t_state_estimates, t_state_covariances = rts_smooth(f) | |
# Only consider tracks with many observations and long length | |
if f.observation_count > 5 and t_state_estimates.shape[0] > 25: | |
tracks.append(Track(state_estimates=np.vstack(t_state_estimates), | |
state_covariances=np.vstack(t_state_covariances), | |
observations=t_observations, birth_time=f.birth_time)) | |
print('Obtained {} tracks'.format(len(tracks))) | |
# ## Writing the output | |
# | |
# We write the tracks to a MATLAB file. We firstly stack all the track | |
# states up into a single matrix called "smoothed" whose first column is | |
# frome index, second is track if and remaining are the state. For | |
# convenience of subsequent visualisation we then sort it by frame index. | |
# Build up smoothed matrix firstly as a list and then stack it | |
smoothed = [] | |
for t_idx, t in enumerate(tracks): | |
states = t.state_estimates | |
frame_idxs = (t.birth_time + np.arange(states.shape[0])).reshape((-1, 1)) | |
track_idxs = np.ones((states.shape[0], 1)) * t_idx | |
smoothed.append(np.hstack((frame_idxs, track_idxs, states))) | |
smoothed = np.vstack(smoothed) | |
# Sort smoothed tracks by frame index | |
smoothed = smoothed[np.argsort(smoothed[:, 0]), ...] | |
# Save | |
sio.savemat(opts['<output>'], dict(smoothed=smoothed)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment