Last active
April 28, 2023 17:04
-
-
Save sjtalkar/65ce7a2f6df2c820acf1a6e485b9af1b to your computer and use it in GitHub Desktop.
Dynamic Programming Cost Matrix
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def dp(dist_mat): | |
""" | |
Find minimum-cost path through matrix `dist_mat` using dynamic programming. | |
The cost of a path is defined as the sum of the matrix entries on that | |
path. See the following for details of the algorithm: | |
- http://en.wikipedia.org/wiki/Dynamic_time_warping | |
- https://www.ee.columbia.edu/~dpwe/resources/matlab/dtw/dp.m | |
The notation in the first reference was followed, while Dan Ellis's code | |
(second reference) was used to check for correctness. Returns a list of | |
path indices and the cost matrix. | |
""" | |
N, M = dist_mat.shape | |
# Initialize the cost matrix | |
cost_mat = np.zeros((N + 1, M + 1)) | |
for i in range(1, N + 1): | |
cost_mat[i, 0] = np.inf | |
for i in range(1, M + 1): | |
cost_mat[0, i] = np.inf | |
# Fill the cost matrix while keeping traceback information | |
traceback_mat = np.zeros((N, M)) | |
for i in range(N): | |
for j in range(M): | |
penalty = [ | |
cost_mat[i, j], # match (0) | |
cost_mat[i, j + 1], # insertion (1) | |
cost_mat[i + 1, j]] # deletion (2) | |
i_penalty = np.argmin(penalty) | |
cost_mat[i + 1, j + 1] = dist_mat[i, j] + penalty[i_penalty] | |
traceback_mat[i, j] = i_penalty | |
# Traceback from bottom right | |
i = N - 1 | |
j = M - 1 | |
path = [(i, j)] | |
while i > 0 or j > 0: | |
tb_type = traceback_mat[i, j] | |
if tb_type == 0: | |
# Match | |
i = i - 1 | |
j = j - 1 | |
elif tb_type == 1: | |
# Insertion | |
i = i - 1 | |
elif tb_type == 2: | |
# Deletion | |
j = j - 1 | |
path.append((i, j)) | |
# Strip infinity edges from cost_mat before returning | |
cost_mat = cost_mat[1:, 1:] | |
return (path[::-1], cost_mat) | |
### From Data mining 2 | |
import math | |
def calc_pairwise_dtw_cost(x, y, ret_matrix=False): | |
""" | |
Takes in two series. If ret_matrix=True, returns the full DTW cost matrix; | |
otherwise, returns only the overall DTW cost | |
""" | |
cost_matrix = np.zeros((len(y), len(x))) | |
dtw_cost = None | |
dist_fn = lambda a, b: (a - b) ** 2 # Optional helper function | |
parallel_cost_matrix = np.zeros((len(y), len(x))) | |
# Initialize a matrix with the raw distance between x and y | |
for i in range(len(x)): | |
for j in range(len(y)): | |
#parallel_cost_matrix[i,j] = dist_fn(x[i], y[j]) | |
parallel_cost_matrix[i,j] = dist_fn(y[i], x[j]) | |
cost_matrix[0,0] = parallel_cost_matrix[0,0] | |
# Now for all elements in cost matrix | |
# 1. Skip the first row and first column element | |
# 2. For the first row (where i is 0, check only j-1 | |
# 3. For the first column (where j is 0, check only i-1 | |
# 4. For all other elements you can go back both by one column and one row | |
for i in range(len(x)): | |
for j in range(len(y)): | |
#Take the value of i,j from the paallel_cost_matrix and add the min DTW | |
if not (i == 0 and j == 0): | |
if i == 0: | |
dtw_i_j1 = cost_matrix[i, j-1] | |
cost_matrix[i,j] = parallel_cost_matrix[i, j] + dtw_i_j1 | |
elif j == 0: | |
dtw_i1_j = cost_matrix[i-1, j] | |
cost_matrix[i,j] = parallel_cost_matrix[i, j] + dtw_i1_j | |
else: | |
dtw_i_j1 = cost_matrix[i, j-1] | |
dtw_i1_j = cost_matrix[i-1, j] | |
dtw_i1_j1 = cost_matrix[i-1, j-1] | |
min_dtw = min(dtw_i_j1, dtw_i1_j, dtw_i1_j1) | |
cost_matrix[i,j] = parallel_cost_matrix[i, j] + min_dtw | |
dtw_cost = cost_matrix[len(x) - 1, len(y) - 1] | |
return cost_matrix if ret_matrix else dtw_cost |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment