Skip to content

Instantly share code, notes, and snippets.

@kurtbrose
Created June 21, 2013 07:31
Show Gist options
  • Select an option

  • Save kurtbrose/5829527 to your computer and use it in GitHub Desktop.

Select an option

Save kurtbrose/5829527 to your computer and use it in GitHub Desktop.
putting a bow on the stats collector
'''
Efficient online statistics gathering. Create a Stats object to track
as many statistics as desired.
Implementation of piece-wise parabolic quantile estimation, algorithm defined in
http://www.cs.wustl.edu/~jain/papers/ftp/psqr.pdf
'''
from math import copysign
DEFAULT_PERCENTILES = (0.1, 1, 2, 5, 10, 25, 50, 75, 80, 85, 90, 95, 98, 99, 99.5, 99.8, 99.9, 99.99)
class Stats(object):
'''
Key-value store for statistics.
>>> stats = Stats()
>>> for i in range(1000): stats['foo'] = i/1000.0
>>> stats['foo']
'''
def __init__(self, percentiles=DEFAULT_PERCENTILES):
self.percentiles = percentiles
def __getitem__(self, key):
if key in self.measures:
return self.measures[key].get_percentiles()
return dict([(p, None) for p in self.percentiles])
def __setitem__(self, key, val):
if key in self.measures:
self.measures[key].add_val(val)
self.measures[key] = Measure(self.percentiles, val)
def __repr__(self):
avgs = dict([(k, v.sum/v.num_vals) for k, v in self.measures.items()])
return "Stats("+repr(avgs)+")"
class Measure(object):
'''
May be used to track percentiles and average of a single variable.
'''
def __init__(self, percentiles=DEFAULT_PERCENTILES):
self.firstN = []
self.sum = 0
self.num_vals = 0
self.percentiles = percentiles
def _start(self):
firstN = sorted(self.firstN)
self.min = firstN[0]
self.max = firstN[-1]
vals = [[i+1, firstN[i]] for i in range(1, len(firstN) - 1)]
self.points = zip(self.percentiles, vals)
def add_val(self, val):
self.num_vals += 1
self.sum += val
if len(self.firstN) < len(self.percentiles) + 2:
self.firstN.append(val)
if len(self.firstN) == len(self.percentiles):
self._start()
return
if val < self.min:
self.min = val
if val > self.max:
self.max = val
scale = self.num_vals - 1
#right-most point is stopping case; handle first
right = self.points[-1][1]
if val <= right[1]:
right[0] += 1
if right[0] == self.num_vals:
right[0] -= 1
#handle the rest of the points
for i in range(len(self.points)-2, -1, -1):
point = self.points[i][1]
if val <= point[1]:
point[0] += 1
if point[0] == self.points[i+1][1][0]:
point[0] -= 1
#left-most point is a special case
left = self.points[0][1]
left[1], left[0] = _nxt(
1, self.min, left[0], left[1], self.points[1][1][0], self.points[1][1][1],
self.points[0][0] / 100.0, scale)
#update estimated locations of percentiles
for i in range(1, len(self.points) - 1):
prev = self.points[i-1][1]
point = self.points[i][1]
nxt = self.points[i+1][1]
point[1], point[0] = _nxt(
prev[0], prev[1], point[0], point[1], nxt[0], nxt[1],
self.points[i][0]/100.0, scale)
#right-most point is a special case
right[1], right[0] = _nxt(
self.points[-2][1][0], self.points[-2][1][1], right[0], right[1],
self.num_vals, self.max, self.points[-1][0] / 100.0, scale)
def get_percentiles(self):
data = dict([(e[0], e[1][1]) for e in self.points])
data["mean"] = self.sum / self.num_vals
return data
def __repr__(self):
return "Measure("+repr(self.get_percentiles())+")"
def _nxt(left_n, left_q, cur_n, cur_q, right_n, right_q, quantile, scale):
#calculate desired position
d = int(scale * quantile + 1 - cur_n)
if d:
#if cur_n == 5:
# import pdb; pdb.set_trace()
d = copysign(1, d) # clamp d at +/- 1
if left_n < cur_n + d < right_n: # try parabolic eqn
nxt_q = cur_q + (d / (right_n - left_n)) * (
(cur_n - left_n + d) * (right_q - cur_q) / (right_n - cur_n) +
(right_n - cur_n - d) * (cur_q - left_q) / (cur_n - left_n))
if not (left_q < nxt_q < right_q): # fall back on linear eqn
if d == 1:
nxt_q = cur_q + (right_q - cur_q) / (right_n - cur_n)
elif d == -1:
nxt_q = cur_q - (left_q - cur_q) / (left_n - cur_n)
return nxt_q, cur_n + d
return cur_q, cur_n
def test_random():
#test random.random() values; evenly distributed between 0 and 1, so 50th percentils ie 0.5, etc
import random
import time
nsamples = 10000
vals = [random.random() for i in range(nsamples)]
m = Measure()
try:
start = time.time()
for e in vals:
m.add_val(e)
duration = time.time() - start
print "processed", nsamples, "measurements in", duration, "sec (", 1000 * duration/nsamples, "ms each)"
except:
import traceback
import pdb
traceback.print_exc()
pdb.post_mortem()
p = m.get_percentiles()
for k, v in p.items():
if k == "mean":
continue
if 0.9 > (k / 100.0) / v > 1.1:
print "problem", k, "is", v, "should be", k/100.0
return m
if __name__ == "__main__":
m1 = test_random()
import pprint
pprint.pprint(m1.get_percentiles())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment