Created
October 12, 2011 22:52
-
-
Save alaiacano/1282885 to your computer and use it in GitHub Desktop.
outlier detector
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
import numpy as np | |
from pylab import * | |
from warnings import warn | |
class OutlierDetector(object): | |
def __init__(self, N=10): | |
""" | |
Generates some simple statistics on the previous N data points | |
S = OutlierDetector() | |
S.update(4) | |
S.update(120) | |
""" | |
self.__index = 0 | |
self.__N = N | |
self.__x = np.zeros(N) # previous N data points | |
self.__x2 = np.zeros(N) # square of each of the previous N data points | |
self.__mean = 0 | |
self.__var = 0 | |
self.__cnt = 0 | |
def get_mean(self): | |
return self.__mean | |
def get_var(self): | |
return self.__var | |
def get_std(self): | |
return np.sqrt(self.__var) | |
def get_data(self): | |
return self.__x | |
def update(self, point): | |
""" | |
Adds a new datapoint and updates the statistics | |
""" | |
outlier = False | |
if self.__cnt > self.__N: | |
if point > self.__mean + 3*np.sqrt(self.__var): | |
point = self.__mean + 3*np.sqrt(self.__var) | |
outlier = True | |
elif point < self.__mean - 3*np.sqrt(self.__var): | |
point = self.__mean - 3*np.sqrt(self.__var) | |
outlier = True | |
else: | |
self.__cnt += 1 | |
prev_point = self.__x[self.__index] | |
try: | |
self.__x[self.__index] = point | |
self.__x2[self.__index] = point ** 2 | |
except: | |
warn("Invalid datapoint, skipping.") | |
return False | |
self.__mean = self.__mean - prev_point/self.__N + point/self.__N | |
self.__var = sum(self.__x2) / self.__N - (self.__mean ** 2) # could be faster | |
self.__index = (self.__index + 1) % self.__N | |
return outlier | |
if __name__ == "__main__": | |
S = OutlierDetector(100) | |
Npts = 10000 | |
t = array(range(Npts)) | |
data = np.sin(np.linspace(0, 30, Npts)) + np.random.normal(0, .3, Npts) | |
# add a step | |
data[4000:] = data[4000:] + 4 | |
outliers = array([0]*Npts) | |
max_lim = np.zeros(Npts) | |
min_lim = np.zeros(Npts) | |
m = np.zeros(Npts) | |
for i, point in enumerate(data): | |
outliers[i] = S.update(point) | |
m[i] = S.get_mean() | |
max_lim[i] = S.get_mean() + 3*S.get_std() | |
min_lim[i] = S.get_mean() - 3*S.get_std() | |
plot(t, data) | |
plot(t[outliers==1], data[outliers==1], 'r.') | |
plot(t, max_lim, 'g') | |
plot(t, min_lim, 'g') | |
plot(t, m, 'k') | |
grid() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment