Skip to content

Instantly share code, notes, and snippets.

@ctralie
Last active April 21, 2023 23:29
Show Gist options
  • Save ctralie/6b754f858ae58fda59d8e7f0170296c6 to your computer and use it in GitHub Desktop.
Save ctralie/6b754f858ae58fda59d8e7f0170296c6 to your computer and use it in GitHub Desktop.
Dynamic Time Warping Animation
"""
Programmer: Chris Tralie
Purpose: To illustrate the dynamic time warping algorithm
to synchronize two 2D trajectories
"""
import numpy as np
import matplotlib.pyplot as plt
def getCSM(X, Y):
"""
Return the Euclidean cross-similarity matrix between X and Y
Parameters
----------
X: ndarray
An Mxd array of M points in d dimensions for first point cloud
Y: ndarray
A Nxd array of N points in d dimensions for the second point cloud
Return
------
D: ndarray
An MxN Euclidean cross-similarity matrix
"""
XSqr = np.sum(X**2, 1)
YSqr = np.sum(Y**2, 1)
C = XSqr[:, None] + YSqr[None, :] - 2*X.dot(Y.T)
C[C < 0] = 0
return np.sqrt(C)
def DTWCSM(CSM, backtrace = False):
"""
Perform dynamic time warping on a cross-similarity matrix
Parameters
----------
CSM: ndarray
An MxN cross-similarity matrix
backtrace: boolean
Whether to backtrace for the optimal path
Return
------
res: dictionary
{'minD': DTW cost, 'D': Dynamic programming matrix,
'path': If requested, an NPath x 2 array of indices
in an optimal time-ordered correspondence}
"""
M = CSM.shape[0]+1
N = CSM.shape[1]+1
D = np.zeros((M, N))
D[1::, 0] = np.inf
D[0, 1::] = np.inf
if backtrace:
B = np.zeros((M, N), dtype = np.int64) #Backpointer indices
pointers = [[-1, 0], [0, -1], [-1, -1], None] #Backpointer directions
minD = np.inf
for i in range(1, M):
for j in range(1, N):
d = CSM[i-1, j-1]
d1 = D[i-1, j] + d
d2 = D[i, j-1] + d
d3 = D[i-1, j-1] + d
arr = np.array([d1, d2, d3])
D[i, j] = np.min(arr)
if backtrace:
B[i, j] = np.argmin(arr)
if (D[i, j] < minD):
minD = D[i, j]
res = {'minD':minD, 'D':D}
#Backtrace starting at the largest index
if backtrace:
res['B'] = B
path = [(M-1, N-1)]
idx = path[-1]
while B[idx[0], idx[1]] < 3:
i = B[idx[0], idx[1]]
idx = [idx[0]+pointers[i][0], idx[1] + pointers[i][1]]
if idx[0] < 1 or idx[1] < 1:
break
path.append(idx)
res['path'] = np.flipud(np.array(path)-1)
return res
def animateDTW():
"""
Make dynamic time warping example and animate the results
"""
np.random.seed(100)
t1 = np.linspace(0, 1, 50)
t1 = t1
t2 = np.sqrt(t1)
t1 = t1**2
N = len(t1)
X = np.zeros((N, 2))
X[:, 0] = t1
X[:, 1] = np.cos(4*np.pi*t1) + t1
Y = np.zeros((N, 2))
Y[:, 0] = t2
Y[:, 1] = np.cos(4*np.pi*t2) + t2 + 0.6
CSM = getCSM(X, Y)
res = DTWCSM(CSM, backtrace=True)
cost, path = res['minD'], res['path']
c1 = 'Reds'
c2 = 'Blues'
fac = 1.2
plt.figure(figsize=(fac*12, fac*4))
for i in range(path.shape[0]):
plt.clf()
plt.subplot2grid((1, 5), (0, 0), colspan=3)
plt.scatter(X[:, 0], X[:, 1], 40, np.arange(N), cmap = c1)
plt.plot(X[:, 0], X[:, 1], 'k')
plt.scatter(Y[:, 0], Y[:, 1], 40, np.arange(N), cmap = c2)
plt.plot(Y[:, 0], Y[:, 1], 'k')
for k in range(0, i+1):
x1 = X[path[k, 0], :]
x2 = Y[path[k, 1], :]
x = np.concatenate((x1[None, :], x2[None, :]), 0)
if k < i:
plt.plot(x[:, 0], x[:, 1], 'k')
else:
plt.plot(x[:, 0], x[:, 1], 'r', linewidth=4)
plt.axis('off')
plt.subplot2grid((1, 5), (0, 3), colspan=2)
plt.imshow(CSM, cmap = 'afmhot', interpolation = 'nearest')
plt.scatter(-1*np.ones(N), np.arange(N), 40, np.arange(N), cmap=c1, edgecolor = 'none')
plt.scatter(np.arange(N), -1*np.ones(N), 40, np.arange(N), cmap=c2, edgecolor = 'none')
plt.scatter(path[0:i+1, 1], path[0:i+1, 0], 10, 'c', edgecolor = 'none')
plt.scatter(path[i, 1], path[i, 0], 30, 'r', edgecolor = 'none')
plt.xlim([-2, N])
plt.ylim([N, -2])
ax = plt.gca()
ax.set_facecolor((0.15, 0.15, 0.15))
ax.set_xticks([])
ax.set_yticks([])
plt.savefig("%i.jpg"%i, bbox_inches = 'tight')
if __name__ == '__main__':
animateDTW()
@ctralie
Copy link
Author

ctralie commented May 9, 2018

The algorithm can be thought of as two frogs hopping from left to right along each sampled curve. Each frog can either stay still or move one spot forward at any point in time (a so-called "warping path" or "time-ordered correspondence"). We want to minimize the sum of distances between the frogs over all time. This boils down to finding the lowest cost path from the upper left to the lower right of an MxN "Euclidean cross-similarity matrix" C between the two sampled curves X = [X1, X2, ..., XM] and Y = [Y1, Y2, ..., YN], where Cij = ||Xi - Yj||_2

out

But note that the curves must be well-aligned for this algorithm to work. Even a simple rotation of the blue curve about its starting point will completely break the algorithm

isometry

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment