-
-
Save ximeg/587011a65d05f067a29ce9c22894d1d2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
# Implementation of algorithm from http://stackoverflow.com/a/22640362/6029703 | |
import numpy as np | |
import pylab | |
def thresholding_algo(y, lag, threshold, influence): | |
signals = np.zeros(len(y)) | |
filteredY = np.array(y) | |
avgFilter = [0]*len(y) | |
stdFilter = [0]*len(y) | |
avgFilter[lag - 1] = np.mean(y[0:lag]) | |
stdFilter[lag - 1] = np.std(y[0:lag]) | |
for i in range(lag, len(y) - 1): | |
if abs(y[i] - avgFilter[i-1]) > threshold * stdFilter [i-1]: | |
if y[i] > avgFilter[i-1]: | |
signals[i] = 1 | |
else: | |
signals[i] = -1 | |
filteredY[i] = influence * y[i] + (1 - influence) * filteredY[i-1] | |
avgFilter[i] = np.mean(filteredY[(i-lag):i]) | |
stdFilter[i] = np.std(filteredY[(i-lag):i]) | |
else: | |
signals[i] = 0 | |
filteredY[i] = y[i] | |
avgFilter[i] = np.mean(filteredY[(i-lag):i]) | |
stdFilter[i] = np.std(filteredY[(i-lag):i]) | |
return dict(signals = np.asarray(signals), | |
avgFilter = np.asarray(avgFilter), | |
stdFilter = np.asarray(stdFilter)) | |
# Data | |
y = np.array([1,1,1.1,1,0.9,1,1,1.1,1,0.9,1,1.1,1,1,0.9,1,1,1.1,1,1,1,1,1.1,0.9,1,1.1,1,1,0.9, | |
1,1.1,1,1,1.1,1,0.8,0.9,1,1.2,0.9,1,1,1.1,1.2,1,1.5,1,3,2,5,3,2,1,1,1,0.9,1,1,3, | |
2.6,4,3,3.2,2,1,1,0.8,4,4,2,2.5,1,1,1]) | |
# Settings: lag = 30, threshold = 5, influence = 0 | |
lag = 30 | |
threshold = 5 | |
influence = 0 | |
# Run algo with settings from above | |
result = thresholding_algo(y, lag=lag, threshold=threshold, influence=influence) | |
# Plot result | |
pylab.subplot(211) | |
pylab.plot(np.arange(1, len(y)+1), y) | |
pylab.plot(np.arange(1, len(y)+1), | |
result["avgFilter"], color="cyan", lw=2) | |
pylab.plot(np.arange(1, len(y)+1), | |
result["avgFilter"] + threshold * result["stdFilter"], color="green", lw=2) | |
pylab.plot(np.arange(1, len(y)+1), | |
result["avgFilter"] - threshold * result["stdFilter"], color="green", lw=2) | |
pylab.subplot(212) | |
pylab.step(np.arange(1, len(y)+1), result["signals"], color="red", lw=2) | |
pylab.ylim(-1.5, 1.5) | |
Any idea how to modify this so it works with a real time data stream? I mean without recalculating the entire series each time, but instead to only recalculate the last item passed to the function.
Thanks for posting this. I noticed a bug on line 13.
It should be:
for i in range(lag, len(y)):
(otherwise the last item in the y
list never gets checked).
i think
np.std(y[0:lag], ddof=1)
you use the wrong degrees of freedom
Thank you so much for your code!
With small changes, your code could be faster. I changed the list to arrays and used numba for optimization.
With large time series, your code takes approximately 35 s, with changes the code takes 1.5 s.
Check here:
https://gist.github.com/dlegor/286dd5cd5ecc317c34ed81aaed4a2b37
Thank you for this code,
I did some modifications for the estimation of avg and std.
https://gist.github.com/PeiLi-Sandman/d67baad835a516481e6ca124a08840f2
Yes!!! very nice. Thankyou