Skip to content

Instantly share code, notes, and snippets.

@kurtbrose
Last active December 18, 2015 16:19
Show Gist options
  • Select an option

  • Save kurtbrose/5811008 to your computer and use it in GitHub Desktop.

Select an option

Save kurtbrose/5811008 to your computer and use it in GitHub Desktop.
Implementation of piece-wise parabolic quantile estimation, algorithm defined in http://www.cs.wustl.edu/~jain/papers/ftp/psqr.pdf
'''
Implementation of piece-wise parabolic quantile estimation, algorithm defined in
http://www.cs.wustl.edu/~jain/papers/ftp/psqr.pdf
'''
from math import copysign
DEFAULT_LEVELS = (0.1, 1, 2, 5, 10, 25, 50, 75, 80, 85, 90, 95, 98, 99, 99.5, 99.8, 99.9, 99.99)
class Measure(object):
def __new__(self, levels=DEFAULT_LEVELS):
if len(levels) > 5:
return LargeMeasure(levels)
return SmallMeasure(levels)
class SmallMeasure(object):
def __init__(self, first5, levels=DEFAULT_LEVELS):
self.min, p1, p2, p3, self.max = sorted(first5)
self.observations = dict([(e, [p1, 2, p2, 3, p3, 4]) for e in levels])
self.num_vals = 5
def get_percentiles(self):
return dict([(p, self.observations[p][2]) for p in self.observations])
def add_val(self, val):
self.num_vals += 1
if val < self.min:
self.min = val
elif val > self.max:
self.max = val
scale = self.num_vals - 1
#TODO: don't re-calculate the same values over and over
for percent, sofar in self.observations.iteritems():
#calculate current quantiles
lo_quantile = percent / 200.0
mid_quantile = percent / 100.0
hi_quantile = (100 + percent) / 200.0
lo_q, lo_n, mid_q, mid_n, hi_q, hi_n = sofar
#update counts of points believed to be to the left of each value
if val < hi_q:
hi_n += 1
if hi_n == self.num_vals:
hi_n -= 1
if val < mid_q:
mid_n += 1
if mid_n == hi_n:
mid_n -= 1
if val < lo_q:
lo_n += 1
if lo_n == mid_n:
lo_n -= 1
#calculate new q values where needed
lo_q, lo_n = _nxt(1, self.min, lo_n, lo_q, mid_n, mid_q, lo_quantile, scale)
#import pdb; pdb.set_trace()
mid_q, mid_n = _nxt(lo_n, lo_q, mid_n, mid_q, hi_n, hi_q, mid_quantile, scale)
#import pdb; pdb.set_trace()
hi_q, hi_n = _nxt(mid_n, mid_q, hi_n, hi_q, self.num_vals, self.max, hi_quantile, scale)
if lo_n == mid_n or mid_n == hi_n:
raise Exception("oops")
s = sofar
s[0], s[1], s[2], s[3], s[4], s[5] = lo_q, lo_n, mid_q, mid_n, hi_q, hi_n
def __repr__(self):
return "Measure("+repr(self.get_percentiles())+")"
class LargeMeasure(object):
def __init__(self, firstN, levels=DEFAULT_LEVELS):
if firstN < len(levels)+2:
raise ValueError("at least N+2 points needed to initialize N percentile measure"
"(got {0} points for {1} percentiles)".format(len(firstN, len(levels))))
firstN = sorted(firstN)
n = len(levels) + 2
firstN, rest = firstN[:n], firstN[n:]
self.min = firstN[0]
self.max = firstN[-1]
vals = [[i+1, firstN[i]] for i in range(1, len(firstN) - 1)]
self.points = zip(levels, vals)
self.num_vals = n
print "initial points", self.points
for e in rest:
self.add_val(e)
def add_val(self, val):
self.num_vals += 1
if val < self.min:
self.min = val
if val > self.max:
self.max = val
scale = self.num_vals - 1
#right-most point is stopping case; handle first
right = self.points[-1][1]
if val <= right[1]:
right[0] += 1
if right[0] == self.num_vals:
right[0] -= 1
#handle the rest of the points
for i in range(len(self.points)-2, 0, -1):
point = self.points[i][1]
if val <= point[1]:
point[0] += 1
if point[0] == self.points[i+1][1][0]:
point[0] -= 1
#left-most point is a special case
left = self.points[0][1]
left[1], left[0] = _nxt(
1, self.min, left[0], left[1], self.points[1][1][0], self.points[1][1][1],
self.points[0][0] / 100.0, scale)
#update estimated locations of percentiles
for i in range(1, len(self.points) - 1):
prev = self.points[i-1][1]
point = self.points[i][1]
nxt = self.points[i+1][1]
point[1], point[0] = _nxt(
prev[0], prev[1], point[0], point[1], nxt[0], nxt[1],
self.points[i][0]/100.0, scale)
#right-most point is a special case
right[1], right[0] = _nxt(
self.points[-2][1][0], self.points[-2][1][1], right[0], right[1],
self.num_vals, self.max, self.points[-1][0] / 100.0, scale)
def get_percentiles(self):
return dict([(e[0], e[1][1]) for e in self.points])
def __repr__(self):
return "Measure("+repr(self.get_percentiles())+")"
def _nxt(left_n, left_q, cur_n, cur_q, right_n, right_q, quantile, scale):
#calculate desired position
d = int(scale * quantile + 1 - cur_n)
if d:
#if cur_n == 5:
# import pdb; pdb.set_trace()
d = copysign(1, d) # clamp d at +/- 1
if left_n < cur_n + d < right_n: # try parabolic eqn
nxt_q = cur_q + (d / (right_n - left_n)) * (
(cur_n - left_n + d) * (right_q - cur_q) / (right_n - cur_n) +
(right_n - cur_n - d) * (cur_q - left_q) / (cur_n - left_n))
if not (left_q < nxt_q < right_q): # fall back on linear eqn
if d == 1:
nxt_q = cur_q + (right_q - cur_q) / (right_n - cur_n)
elif d == -1:
nxt_q = cur_q - (left_q - cur_q) / (left_n - cur_n)
return nxt_q, cur_n + d
return cur_q, cur_n
def test_random():
#test random.random() values; evenly distributed between 0 and 1, so 50th percentils ie 0.5, etc
import random
import time
nsamples = 10000
vals = [random.random() for i in range(nsamples)]
m = LargeMeasure(vals[:len(DEFAULT_LEVELS)+2])
try:
start = time.time()
for e in vals:
m.add_val(e)
duration = time.time() - start
print "processed", nsamples, "measurements in", duration, "sec (", 1000 * duration/nsamples, "ms each)"
except:
import traceback
import pdb
traceback.print_exc()
pdb.post_mortem()
p = m.get_percentiles()
for k, v in p.items():
if (k / 100.0) / v > 1.01:
print "problem", k, "is", v, "should be", k/100
return m
def test_examples():
#run test vectors from paper
observations = [0.02, 0.05, 0.74, 3.39, 0.83, 22.37, 10.15, 15.43, 36.62, 15.92, 34.60,
10.28, 1.47, 0.40, 0.05, 11.39, 0.27, 0.42, 0.09, 11.37]
m = SmallMeasure(observations[:5], [50])
def expect(step, p2, p3, p4):
obs = m.observations[50]
obs = [obs[1], obs[3], obs[5]]
print step, "error {0:2.0f}, {1:2.0f}, {2:2.0f}".format(obs[0] - p2, obs[1] - p3, obs[2] - p4)
if obs != [p2, p3, p4]:
print "got", obs, "expected", p2, p3, p4
m.add_val(observations[5])
expect(1, 2, 3, 4)
m.add_val(observations[6])
expect(2, 2, 3, 5)
m.add_val(observations[7])
expect(3, 2, 4, 6)
m.add_val(observations[8])
expect(4, 3, 5, 7)
m.add_val(observations[9])
expect(5, 3, 5, 7)
m.add_val(observations[10])
expect(6, 3, 6, 9)
m.add_val(observations[11])
expect(7, 3, 6, 9)
if __name__ == "__main__":
m1 = test_random()
import pprint
pprint.pprint(m1.get_percentiles())
test_examples()
Passing basic tests!
>>> dynstats.test_examples()
1 error 0, 0, 0
2 error 0, 0, 0
3 error 0, 0, 0
4 error 0, 0, 0
5 error 0, 0, 0
6 error 0, 0, -1
got [3.0, 6.0, 8.0] expected 3 6 9
7 error 0, 0, 0
>>> dynstats.test_random()
processed 10000 measurements in 1.96900010109 sec ( 0.196900010109 ms each)
Measure({99.5: 0.9942897566556037, 1: 0.01114820913549699, 2: 0.0223555971355880
4, 99: 0.9900875063806345, 5: 0.05477256055670621, 0.1: 0.0014984698499020425, 1
0: 0.1047030220884278, 75: 0.7510633333982301, 99.8: 0.9975681417900762, 99.9: 0
.9988589605380689, 80: 0.8052640092276312, 50: 0.5055690519392191, 99.99: 0.9995
006126963584, 85: 0.8546116729481641, 25: 0.24844780616272752, 98: 0.98134565467
42141, 90: 0.9024448244974481, 95: 0.9498598024886282})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment