Last active
January 31, 2020 07:01
-
-
Save rileypeterson/5b7b78bbe4180da635dc1f85134ca8be to your computer and use it in GitHub Desktop.
Bottom Up Algorithm Demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import tempfile | |
import os | |
import numpy as np | |
import matplotlib.pyplot as plt | |
def r_sqr(yi, fi): | |
ss_res = np.sum((yi - fi) ** 2) | |
ss_tot = np.sum((yi - np.mean(yi)) ** 2) | |
if ss_tot == 0: | |
return 1 | |
r_squared = 1 - (ss_res / ss_tot) | |
return r_squared | |
class Segment(object): | |
""" A line segment """ | |
DEFAULT_ERROR = np.nan | |
def __init__(self, x1, y1, x2, y2, error=DEFAULT_ERROR): | |
self._ind = 0 | |
self.x1 = x1 | |
self.y1 = y1 | |
self.x2 = x2 | |
self.y2 = y2 | |
self.error = error | |
assert self.x1 != self.x2, "The start and end x coordinates of segment cannot be equivalent" | |
self.m = (self.y2 - self.y1) / (self.x2 - self.x1) | |
self.b = self.y1 - self.m*self.x1 | |
def interpolate(self, x): | |
""" The y value for an arbitrary point in the segment """ | |
x = x[(self.x1 <= x) & (x <= self.x2)] | |
return self.m*x + self.b | |
def merge(self, s, x, y): | |
y = y[(self.x1 <= x) & (x <= s.x2)] | |
x = x[(self.x1 <= x) & (x <= s.x2)] | |
x_bar = np.mean(x) | |
y_bar = np.mean(y) | |
m = np.sum((x - x_bar)*(y - y_bar)) / np.sum((x - x_bar)**2) | |
b = y_bar - m*x_bar | |
f = lambda x_: m*x_ + b | |
return Segment(self.x1, f(self.x1), s.x2, f(s.x2), self.DEFAULT_ERROR) | |
# def merge(self, s): | |
# """ Merge two segments and return a new one. """ | |
# return Segment(self.x1, self.y1, s.x2, s.y2, self.DEFAULT_ERROR) | |
def __str__(self): | |
""" Return a readable representation of the segment """ | |
return """<{} x1:{:.2f}, y1:{:.2f}, x2:{:.2f}, y2:{:.2f}, err:{:.2f}>"""\ | |
.format(self._ind, self.x1, self.y1, | |
self.x2, self.y2, | |
self.error) | |
def __repr__(self): | |
return self.__str__() | |
class BottomUpJoiner(object): | |
def __init__(self, x, y): | |
self.x = x | |
self.y = y | |
self.total_err = 1 | |
self.merged_ind = None | |
# Initialize all segments | |
self.segments = [] | |
for i in range(len(self.x)-1): | |
s = Segment(self.x[i], self.y[i], self.x[i+1], self.y[i+1]) | |
s._ind = i | |
self.segments.append(s) | |
def y_in_segment(self, s): | |
return self.y[(s.x1 <= self.x) & (self.x <= s.x2)] | |
def update_merge_costs(self): | |
# Calculate the merge cost for each segment | |
# Error for merging segments 1 and 2 lies in segment 1 (Left bias) | |
for i in range(len(self.segments) - 1): | |
s1, s2 = self.segments[i], self.segments[i+1] | |
merged_segment = s1.merge(s2, self.x, self.y) | |
yi = self.y_in_segment(merged_segment) | |
fi = merged_segment.interpolate(self.x) | |
self.segments[i].error = np.sum((yi - fi)**2) | |
def merge_least_costly_segment(self): | |
ind = np.nanargmin([s.error for s in self.segments]) | |
left_ind = ind - 1 | |
if left_ind < 0: | |
left_ind = None | |
right_ind = min(ind + 1, len(self.segments) - 1) | |
if right_ind == len(self.segments) - 1: | |
right_ind = None | |
self.merged_ind = ind | |
s1, s2 = self.segments[ind], self.segments[ind + 1] | |
self.segments[ind] = s1.merge(s2, self.x, self.y) | |
self.segments.pop(ind + 1) | |
for i in range(ind, len(self.segments)): | |
self.segments[i]._ind -= 1 | |
self.segments[ind]._ind = ind | |
if left_ind is not None: | |
self.segments[left_ind].y2 = self.segments[left_ind + 1].y1 | |
if right_ind is not None: | |
self.segments[right_ind].y1 = self.segments[right_ind - 1].y2 | |
def compute_total_error(self): | |
fi = [] | |
for i, s in enumerate(self.segments): | |
vals = list(s.interpolate(self.x)) | |
if i != 0: | |
vals = vals[1:] | |
fi = fi + vals | |
assert len(fi) == len(self.y), "Lengths must be equal" | |
self.total_err = r_sqr(self.y, np.array(fi)) | |
print("{:.4f}".format(self.total_err)) | |
if __name__ == '__main__': | |
x = np.array(range(100)) | |
np.random.seed(2) | |
f = lambda x: np.round(-0.02*x**2+0.02*x+200+20*(np.random.rand(100,) - 0.5)) | |
y = f(x) | |
b = BottomUpJoiner(x, y) | |
print(b.segments) | |
it = 0 | |
with tempfile.TemporaryDirectory() as tmpdirname: | |
while len(b.segments) > 3: | |
b.update_merge_costs() | |
b.merge_least_costly_segment() | |
b.compute_total_error() | |
# if it % 5 == 0: | |
plt.close('all') | |
fig = plt.figure(figsize=(10, 10)) | |
ax = plt.gca() | |
plt.plot(b.x, b.y) | |
for s in b.segments: | |
plt.plot([s.x1, s.x2], [s.y1, s.y2]) | |
plt.ylim(bottom=-5, top=225) | |
plt.tight_layout() | |
plt.savefig('{}/{:0>3}.png'.format(tmpdirname, it)) | |
it += 1 | |
os.system('convert -delay 25 -loop 0 {}/*.png out.gif'.format(tmpdirname)) | |
print(b.segments) | |
plt.show() | |
Inspired by: https://www.ics.uci.edu/~pazzani/Publications/survey.pdf
And for when link inevitably breaks:
Segmenting Time Series: A Survey and Novel Approach
Eamonn Keogh Selina Chu David Hart Michael Pazzani
Department of Information and Computer Science
University of California, Irvine, California 92697 USA
{eamonn, selina, dhart, pazzani}@ics.uci.edu
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Epic bottom up smoothing with smart interpolation!