Skip to content

Instantly share code, notes, and snippets.

@alaiacano
Created October 12, 2011 22:52
Show Gist options
  • Save alaiacano/1282885 to your computer and use it in GitHub Desktop.
Save alaiacano/1282885 to your computer and use it in GitHub Desktop.
outlier detector
from __future__ import division
import numpy as np
from pylab import *
from warnings import warn
class OutlierDetector(object):
def __init__(self, N=10):
"""
Generates some simple statistics on the previous N data points
S = OutlierDetector()
S.update(4)
S.update(120)
"""
self.__index = 0
self.__N = N
self.__x = np.zeros(N) # previous N data points
self.__x2 = np.zeros(N) # square of each of the previous N data points
self.__mean = 0
self.__var = 0
self.__cnt = 0
def get_mean(self):
return self.__mean
def get_var(self):
return self.__var
def get_std(self):
return np.sqrt(self.__var)
def get_data(self):
return self.__x
def update(self, point):
"""
Adds a new datapoint and updates the statistics
"""
outlier = False
if self.__cnt > self.__N:
if point > self.__mean + 3*np.sqrt(self.__var):
point = self.__mean + 3*np.sqrt(self.__var)
outlier = True
elif point < self.__mean - 3*np.sqrt(self.__var):
point = self.__mean - 3*np.sqrt(self.__var)
outlier = True
else:
self.__cnt += 1
prev_point = self.__x[self.__index]
try:
self.__x[self.__index] = point
self.__x2[self.__index] = point ** 2
except:
warn("Invalid datapoint, skipping.")
return False
self.__mean = self.__mean - prev_point/self.__N + point/self.__N
self.__var = sum(self.__x2) / self.__N - (self.__mean ** 2) # could be faster
self.__index = (self.__index + 1) % self.__N
return outlier
if __name__ == "__main__":
S = OutlierDetector(100)
Npts = 10000
t = array(range(Npts))
data = np.sin(np.linspace(0, 30, Npts)) + np.random.normal(0, .3, Npts)
# add a step
data[4000:] = data[4000:] + 4
outliers = array([0]*Npts)
max_lim = np.zeros(Npts)
min_lim = np.zeros(Npts)
m = np.zeros(Npts)
for i, point in enumerate(data):
outliers[i] = S.update(point)
m[i] = S.get_mean()
max_lim[i] = S.get_mean() + 3*S.get_std()
min_lim[i] = S.get_mean() - 3*S.get_std()
plot(t, data)
plot(t[outliers==1], data[outliers==1], 'r.')
plot(t, max_lim, 'g')
plot(t, min_lim, 'g')
plot(t, m, 'k')
grid()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment