Last active
April 21, 2023 23:29
-
-
Save ctralie/6b754f858ae58fda59d8e7f0170296c6 to your computer and use it in GitHub Desktop.
Dynamic Time Warping Animation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Programmer: Chris Tralie | |
Purpose: To illustrate the dynamic time warping algorithm | |
to synchronize two 2D trajectories | |
""" | |
import numpy as np | |
import matplotlib.pyplot as plt | |
def getCSM(X, Y): | |
""" | |
Return the Euclidean cross-similarity matrix between X and Y | |
Parameters | |
---------- | |
X: ndarray | |
An Mxd array of M points in d dimensions for first point cloud | |
Y: ndarray | |
A Nxd array of N points in d dimensions for the second point cloud | |
Return | |
------ | |
D: ndarray | |
An MxN Euclidean cross-similarity matrix | |
""" | |
XSqr = np.sum(X**2, 1) | |
YSqr = np.sum(Y**2, 1) | |
C = XSqr[:, None] + YSqr[None, :] - 2*X.dot(Y.T) | |
C[C < 0] = 0 | |
return np.sqrt(C) | |
def DTWCSM(CSM, backtrace = False): | |
""" | |
Perform dynamic time warping on a cross-similarity matrix | |
Parameters | |
---------- | |
CSM: ndarray | |
An MxN cross-similarity matrix | |
backtrace: boolean | |
Whether to backtrace for the optimal path | |
Return | |
------ | |
res: dictionary | |
{'minD': DTW cost, 'D': Dynamic programming matrix, | |
'path': If requested, an NPath x 2 array of indices | |
in an optimal time-ordered correspondence} | |
""" | |
M = CSM.shape[0]+1 | |
N = CSM.shape[1]+1 | |
D = np.zeros((M, N)) | |
D[1::, 0] = np.inf | |
D[0, 1::] = np.inf | |
if backtrace: | |
B = np.zeros((M, N), dtype = np.int64) #Backpointer indices | |
pointers = [[-1, 0], [0, -1], [-1, -1], None] #Backpointer directions | |
minD = np.inf | |
for i in range(1, M): | |
for j in range(1, N): | |
d = CSM[i-1, j-1] | |
d1 = D[i-1, j] + d | |
d2 = D[i, j-1] + d | |
d3 = D[i-1, j-1] + d | |
arr = np.array([d1, d2, d3]) | |
D[i, j] = np.min(arr) | |
if backtrace: | |
B[i, j] = np.argmin(arr) | |
if (D[i, j] < minD): | |
minD = D[i, j] | |
res = {'minD':minD, 'D':D} | |
#Backtrace starting at the largest index | |
if backtrace: | |
res['B'] = B | |
path = [(M-1, N-1)] | |
idx = path[-1] | |
while B[idx[0], idx[1]] < 3: | |
i = B[idx[0], idx[1]] | |
idx = [idx[0]+pointers[i][0], idx[1] + pointers[i][1]] | |
if idx[0] < 1 or idx[1] < 1: | |
break | |
path.append(idx) | |
res['path'] = np.flipud(np.array(path)-1) | |
return res | |
def animateDTW(): | |
""" | |
Make dynamic time warping example and animate the results | |
""" | |
np.random.seed(100) | |
t1 = np.linspace(0, 1, 50) | |
t1 = t1 | |
t2 = np.sqrt(t1) | |
t1 = t1**2 | |
N = len(t1) | |
X = np.zeros((N, 2)) | |
X[:, 0] = t1 | |
X[:, 1] = np.cos(4*np.pi*t1) + t1 | |
Y = np.zeros((N, 2)) | |
Y[:, 0] = t2 | |
Y[:, 1] = np.cos(4*np.pi*t2) + t2 + 0.6 | |
CSM = getCSM(X, Y) | |
res = DTWCSM(CSM, backtrace=True) | |
cost, path = res['minD'], res['path'] | |
c1 = 'Reds' | |
c2 = 'Blues' | |
fac = 1.2 | |
plt.figure(figsize=(fac*12, fac*4)) | |
for i in range(path.shape[0]): | |
plt.clf() | |
plt.subplot2grid((1, 5), (0, 0), colspan=3) | |
plt.scatter(X[:, 0], X[:, 1], 40, np.arange(N), cmap = c1) | |
plt.plot(X[:, 0], X[:, 1], 'k') | |
plt.scatter(Y[:, 0], Y[:, 1], 40, np.arange(N), cmap = c2) | |
plt.plot(Y[:, 0], Y[:, 1], 'k') | |
for k in range(0, i+1): | |
x1 = X[path[k, 0], :] | |
x2 = Y[path[k, 1], :] | |
x = np.concatenate((x1[None, :], x2[None, :]), 0) | |
if k < i: | |
plt.plot(x[:, 0], x[:, 1], 'k') | |
else: | |
plt.plot(x[:, 0], x[:, 1], 'r', linewidth=4) | |
plt.axis('off') | |
plt.subplot2grid((1, 5), (0, 3), colspan=2) | |
plt.imshow(CSM, cmap = 'afmhot', interpolation = 'nearest') | |
plt.scatter(-1*np.ones(N), np.arange(N), 40, np.arange(N), cmap=c1, edgecolor = 'none') | |
plt.scatter(np.arange(N), -1*np.ones(N), 40, np.arange(N), cmap=c2, edgecolor = 'none') | |
plt.scatter(path[0:i+1, 1], path[0:i+1, 0], 10, 'c', edgecolor = 'none') | |
plt.scatter(path[i, 1], path[i, 0], 30, 'r', edgecolor = 'none') | |
plt.xlim([-2, N]) | |
plt.ylim([N, -2]) | |
ax = plt.gca() | |
ax.set_facecolor((0.15, 0.15, 0.15)) | |
ax.set_xticks([]) | |
ax.set_yticks([]) | |
plt.savefig("%i.jpg"%i, bbox_inches = 'tight') | |
if __name__ == '__main__': | |
animateDTW() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The algorithm can be thought of as two frogs hopping from left to right along each sampled curve. Each frog can either stay still or move one spot forward at any point in time (a so-called "warping path" or "time-ordered correspondence"). We want to minimize the sum of distances between the frogs over all time. This boils down to finding the lowest cost path from the upper left to the lower right of an MxN "Euclidean cross-similarity matrix" C between the two sampled curves X = [X1, X2, ..., XM] and Y = [Y1, Y2, ..., YN], where Cij = ||Xi - Yj||_2
But note that the curves must be well-aligned for this algorithm to work. Even a simple rotation of the blue curve about its starting point will completely break the algorithm