Skip to content

Instantly share code, notes, and snippets.

@blt
Created June 23, 2016 04:23
Show Gist options
  • Save blt/483f6e847571550905cb489f64a52ac4 to your computer and use it in GitHub Desktop.
Save blt/483f6e847571550905cb489f64a52ac4 to your computer and use it in GitHub Desktop.
#![cfg_attr(test, feature(plugin))]
#![cfg_attr(test, plugin(quickcheck_macros))]
#![allow(dead_code)]
#![allow(unused_imports)]
#[cfg(test)]
extern crate quickcheck;
/// This is an implementation of the algorithm presented in Cormode, Korn,
/// Muthukrishnan, Srivastava's paper "Effective Computation of Biased Quantiles
/// over Data Streams". The ambition here is to approximate quantiles on a
/// stream of data without having a boatload of information kept in memory.
//use std::collections::BinaryHeap;
use std::fmt::Debug;
struct Quantile {
quantile: f64, // These all could probably be f32
// error: f64,
u: f64,
v: f64,
}
impl Quantile {
fn new(quantile: f64, error: f64) -> Quantile {
Quantile {
quantile: quantile,
// error: error,
u: 2.0 * error / (1.0 - quantile),
v: 2.0 * error / quantile,
}
}
}
#[derive(Debug,Clone)]
pub struct Entry<T: Copy> {
v: T,
g: f64,
delta: f64,
}
enum Cmp {
LT,
GT,
EQ,
}
fn cmp<T: PartialOrd + Debug>(l: T, r: T) -> Cmp {
if l == r { Cmp::EQ }
else if l < r { Cmp::LT }
else { Cmp::GT }
}
pub struct CKMS<T: Copy> {
n: usize, // number of items, >= self.samples.len()
// We follow the 'cursor' method of the above paper. In this method,
// incoming items are buffered in a priority queue, called 'buffer' here,
// and once insert_threshold items are stored in the buffer it is drained
// into the 'samples' collection. Insertion will cause some extranious
// points to be held that can be merged. Once compress_threshold threshold
// items are buffered the COMPRESS operation merges these extranious points.
/// insert_threshold: usize,
/// compress_threshold: usize,
// We aim for 'targeted' biased quantiles. That is, we have no interest in
// QUERY returning quantiles not a part of this structure. This saves a
// little memory space, cpu time and gives us better hooks for property
// testing.
quantiles: Vec<Quantile>,
// We store incoming points in a priority queue and every insert_threshold
// items drain buffer into samples. This is the 'cursor' method of the
// paper.
/// buffer: BinaryHeap<T>,
// This is the S(n) of the above paper. Entries are stored here and
// occasionally merged. The outlined implementation uses a linked list but
// we prefer a Vec for reasons of cache locality at the cost of worse
// computational complexity.
samples: Vec<Entry<T>>,
}
impl<T: Copy + PartialOrd + Debug> CKMS<T> { // where T: std::cmp::Ord {
fn new() -> CKMS<T> {
CKMS {
n: 0,
// insert_threshold: 1000,
// compress_threshold: 1250,
quantiles: vec![Quantile::new(0.25, 0.01),
Quantile::new(0.50, 0.01),
Quantile::new(0.75, 0.01),
Quantile::new(0.95, 0.01),
Quantile::new(0.99, 0.01),
Quantile::new(0.999, 0.01),
],
// buffer: BinaryHeap::<T>::new(),
samples: Vec::<Entry<T>>::new(),
}
}
#[cfg(test)]
pub fn smpls(&self) -> Vec<T> {
self.samples.clone().into_iter().map(|e| e.v).collect()
}
#[cfg(test)]
pub fn min(&self) -> Entry<T> {
self.samples[0].clone()
}
#[cfg(test)]
pub fn max(&self) -> Entry<T> {
self.samples[self.samples.len()-1].clone()
}
// I think this may well be wrong
pub fn invariant(&self, r: f64, n: f64) -> f64 {
let mut min_error = n + 1.0;
for q in self.quantiles.iter() {
let error;
if r <= q.quantile * n { error = q.u * (n - r); }
else { error = q.v * r; }
if error < min_error { min_error = error; }
}
min_error
}
pub fn insert(&mut self, v: T) {
let mut r = 0.0;
let s = self.samples.len();
if s == 0 {
self.samples.insert(0, Entry{v: v, g: 1.0, delta: 0.0});
self.n += 1;
return;
}
let mut idx = 0;
for i in 0..s {
let ref smpl = self.samples[i];
match cmp(smpl.v, v) {
Cmp::LT => { idx += 1 },
Cmp::EQ => break,
Cmp::GT => break,
}
r = r + smpl.g;
}
let delta;
if idx == 0 || idx == s {
delta = 0.0;
} else {
delta = self.invariant(r, self.n as f64).floor() - 1.0;
}
self.samples.insert(idx, Entry{v: v, g: 1.0, delta: delta});
self.n += 1;
}
pub fn query(&self, q: f64) -> Option<(f64, T)> {
let s = self.samples.len();
let mut r = 0.0;
let nphi = q*(self.n as f64);
for i in 1..s {
let ref prev = self.samples[i-1];
let ref cur = self.samples[i];
r += prev.g;
if (r + cur.g + cur.delta) > (nphi + (self.invariant(nphi, self.n as f64) / 2.0)) {
return Some((r, prev.v))
}
}
let v = self.samples[s-1].v;
return Some((s as f64, v));
}
}
#[cfg(test)]
mod test {
use super::*;
use quickcheck::{QuickCheck, TestResult};
use std::f64::consts::E;
// prop: forany phi. (phi*n - f(phi*n, n)/2) =< r_i =< (phi*n + f(phi*n, n)/2)
#[test]
fn query_invariant_test() {
fn query_invariant(f: f64, fs: Vec<i64>) -> TestResult {
if fs.len() < 1 {
return TestResult::discard();
}
let phi = ( 1.0/(1.0 + E.powf(f.abs())) )*2.0;
let mut ckms = CKMS::<i64>::new();
for f in fs {
ckms.insert(f);
}
match ckms.query(phi) {
None => TestResult::passed(), // invariant to check here? n*phi + f > 1?
Some((rank, _)) => {
let nphi = phi * (ckms.n as f64);
let fdiv2 = ckms.invariant(nphi, ckms.n as f64) / 2.0;
TestResult::from_bool( ((nphi - fdiv2) <= rank) || (rank <= (nphi + fdiv2)))
}
}
}
QuickCheck::new().tests(10000).max_tests(100000).quickcheck(query_invariant as fn(f64, Vec<i64>) -> TestResult);
}
// prop: v_i-1 < v_i =< v_i+1
#[test]
fn asc_samples_test() {
fn asc_samples(fs: Vec<i64>) -> TestResult {
let mut ckms = CKMS::<i64>::new();
let mut srt_fs = fs.clone();
let fsc = fs.clone();
for f in fs {
ckms.insert(f);
}
srt_fs.sort();
if srt_fs != ckms.smpls() {
println!("{:?} => {:?}", fsc.clone(), ckms.smpls());
return TestResult::failed()
}
TestResult::passed()
}
QuickCheck::new().tests(10000).max_tests(100000).quickcheck(asc_samples as fn(Vec<i64>) -> TestResult);
}
// prop: forall i. g_i + delta_i =< f(r_i, n)
#[test]
fn f_invariant_test() {
fn f_invariant(fs: Vec<i64>) -> bool {
let mut ckms = CKMS::<i64>::new();
for f in fs {
ckms.insert(f);
}
let mut r = 0.0;
for ent in ckms.samples.iter() {
r += ent.g;
let res = (ent.g + ent.delta) <= ckms.invariant(r, ckms.n as f64);
if !res {
println!("{:?} <= {:?}", ent.g + ent.delta, ckms.invariant(r, ckms.n as f64));
println!("samples: {:?}", ckms.samples);
return false;
}
}
true
}
QuickCheck::new().tests(10000).max_tests(100000).quickcheck(f_invariant as fn(Vec<i64>) -> bool);
}
#[test]
fn test_basics() {
let mut ckms = CKMS::<i32>::new();
for i in 1..11 { ckms.insert(i as i32); }
assert_eq!(ckms.query(0.00), Some((1.0, 1)));
assert_eq!(ckms.query(0.05), Some((1.0, 1)));
assert_eq!(ckms.query(0.10), Some((1.0, 1)));
assert_eq!(ckms.query(0.15), Some((1.0, 1)));
assert_eq!(ckms.query(0.20), Some((2.0, 2)));
assert_eq!(ckms.query(0.25), Some((2.0, 2)));
assert_eq!(ckms.query(0.30), Some((3.0, 3)));
assert_eq!(ckms.query(0.35), Some((3.0, 3)));
assert_eq!(ckms.query(0.40), Some((4.0, 4)));
assert_eq!(ckms.query(0.45), Some((4.0, 4)));
assert_eq!(ckms.query(0.50), Some((5.0, 5)));
assert_eq!(ckms.query(0.55), Some((5.0, 5)));
assert_eq!(ckms.query(0.60), Some((6.0, 6)));
assert_eq!(ckms.query(0.65), Some((6.0, 6)));
assert_eq!(ckms.query(0.70), Some((7.0, 7)));
assert_eq!(ckms.query(0.75), Some((7.0, 7)));
assert_eq!(ckms.query(0.80), Some((8.0, 8)));
assert_eq!(ckms.query(0.85), Some((8.0, 8)));
assert_eq!(ckms.query(0.90), Some((9.0, 9)));
assert_eq!(ckms.query(0.95), Some((9.0, 9)));
assert_eq!(ckms.query(0.99), Some((10.0, 10)));
assert_eq!(ckms.query(1.00), Some((10.0, 10)));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment