Created
April 2, 2013 18:31
-
-
Save astanway/5294861 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pandas import stats, Series | |
import numpy as np | |
from numpy import linalg as numpy_linalg | |
from numpy import mean as numpy_mean | |
from numpy import sqrt as numpy_sqrt | |
from numpy import round as numpy_round | |
from scipy import stats as sci_stats | |
from scipy import array as scipy_array | |
from scipy import std as scipy_std | |
from time import time | |
import settings | |
""" | |
This is no man's land. Do anything you want in here, | |
as long as you return a boolean that determines whether the input | |
timeseries is aberrant or not. | |
The only thing that calls these functions is the | |
analyze() function in analysis_manager.py. Be sure to | |
change the algorithm appropriately within that function. | |
All algorithm explanations should begin with, | |
"A timeseries is aberrant if..." | |
""" | |
WINDOW = -1 | |
def grubbs(timeseries): | |
series = scipy_array([x[1] for x in timeseries]) | |
stdDev = scipy_std(series) | |
mean = numpy_mean(series) | |
tail_average = (series[-1] + series[-2] + series[-3]) / 3 | |
z_score = (tail_average - mean) / stdDev | |
len_series = len(series) | |
threshold = sci_stats.t.isf(.05 / (2 * len_series) , len_series - 2) | |
threshold_squared = threshold * threshold | |
grubbs_score = ((len_series - 1) / numpy_sqrt(len_series)) * numpy_sqrt(threshold_squared / (len_series - 2 + threshold_squared)) | |
if z_score > grubbs_score: | |
return numpy_round(z_score - grubbs_score, 2) | |
return False | |
def stddev_from_moving_average(timeseries): | |
""" | |
A timeseries is aberrant if the absolute value of the latest | |
datapoint minus the moving average is greater than one standard | |
deviation of the moving average | |
""" | |
series = Series([x[1] for x in timeseries]) | |
expAverage = stats.moments.ewma(series, com=15) | |
stdDev = stats.moments.ewmstd(series, com=15) | |
if abs(series.iget(WINDOW) - expAverage.iget(WINDOW)) > 3 * stdDev.iget(WINDOW): | |
diff = round(abs(series.iget(WINDOW) - expAverage.iget(WINDOW)) - round(3 * stdDev.iget(WINDOW))) | |
return diff | |
return False | |
def linear_regression(timeseries): | |
x = np.array([t[0] for t in timeseries]) | |
y = np.array([t[1] for t in timeseries]) | |
A = np.vstack([x, np.ones(len(x))]).T | |
m, c = numpy_linalg.lstsq(A, y)[0] | |
residuals = [] | |
for i, value in enumerate(y): | |
projected = m * x[i] + c | |
diff = value - projected | |
residuals.append(diff) | |
std_dev = scipy_std(residuals) | |
tail_avg = (residuals[-1] + residuals[-2] + residuals[-3]) / 3 | |
if abs(tail_avg) > std_dev * 3 and round(std_dev) != 0 and round(tail_avg) != 0: | |
return tail_avg - std_dev | |
return False | |
def mean_subtraction_cumulation(timeseries): | |
""" | |
A timeseries is aberrant if the value of the next datapoint in the | |
series is farther than a standard deviation out in culmulative terms | |
after subtracting the mean from each data point. | |
""" | |
series = Series([x[1] if x[1] else 0 for x in timeseries]) | |
series = series - series[0:len(series) - WINDOW].mean() | |
if abs(series.iget(WINDOW)) > 3 * series[0:len(series) - WINDOW].std(): | |
return round(series.iget(WINDOW)) | |
return False | |
SELECTED_ALGORITHM = grubbs | |
STALE_PERIOD = settings.STALE_PERIOD | |
FULL_DURATION = settings.FULL_DURATION | |
MIN_TOLERABLE_LENGTH = settings.MIN_TOLERABLE_LENGTH | |
MAX_TOLERABLE_SILENCE = settings.MAX_TOLERABLE_SILENCE | |
algorithms = [ | |
grubbs, | |
mean_subtraction_cumulation, | |
stddev_from_moving_average, | |
linear_regression | |
] | |
def run_selected_algorithm(timeseries): | |
""" | |
Filter timeseries and run selected algorithm. | |
""" | |
# Get rid of short series | |
if len(timeseries) < MIN_TOLERABLE_LENGTH: | |
return False | |
# Get rid of stale series | |
if time() - timeseries[-1][0] > STALE_PERIOD: | |
return False | |
# Get rid of incomplete series | |
duration = timeseries[-1][0] - timeseries[0][0] | |
if duration < FULL_DURATION: | |
return False | |
# Get rid of empty series | |
total = sum([tuple[1] for i, tuple in enumerate(timeseries) if i < MAX_TOLERABLE_SILENCE]) | |
if total == 0: | |
return False | |
ensemble = [algorithm(timeseries) for algorithm in algorithms] | |
try: | |
return sum(ensemble) / len(ensemble) | |
except: | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment