Created
June 23, 2016 04:23
-
-
Save blt/483f6e847571550905cb489f64a52ac4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![cfg_attr(test, feature(plugin))] | |
#![cfg_attr(test, plugin(quickcheck_macros))] | |
#![allow(dead_code)] | |
#![allow(unused_imports)] | |
#[cfg(test)] | |
extern crate quickcheck; | |
/// This is an implementation of the algorithm presented in Cormode, Korn, | |
/// Muthukrishnan, Srivastava's paper "Effective Computation of Biased Quantiles | |
/// over Data Streams". The ambition here is to approximate quantiles on a | |
/// stream of data without having a boatload of information kept in memory. | |
//use std::collections::BinaryHeap; | |
use std::fmt::Debug; | |
struct Quantile { | |
quantile: f64, // These all could probably be f32 | |
// error: f64, | |
u: f64, | |
v: f64, | |
} | |
impl Quantile { | |
fn new(quantile: f64, error: f64) -> Quantile { | |
Quantile { | |
quantile: quantile, | |
// error: error, | |
u: 2.0 * error / (1.0 - quantile), | |
v: 2.0 * error / quantile, | |
} | |
} | |
} | |
#[derive(Debug,Clone)] | |
pub struct Entry<T: Copy> { | |
v: T, | |
g: f64, | |
delta: f64, | |
} | |
enum Cmp { | |
LT, | |
GT, | |
EQ, | |
} | |
fn cmp<T: PartialOrd + Debug>(l: T, r: T) -> Cmp { | |
if l == r { Cmp::EQ } | |
else if l < r { Cmp::LT } | |
else { Cmp::GT } | |
} | |
pub struct CKMS<T: Copy> { | |
n: usize, // number of items, >= self.samples.len() | |
// We follow the 'cursor' method of the above paper. In this method, | |
// incoming items are buffered in a priority queue, called 'buffer' here, | |
// and once insert_threshold items are stored in the buffer it is drained | |
// into the 'samples' collection. Insertion will cause some extranious | |
// points to be held that can be merged. Once compress_threshold threshold | |
// items are buffered the COMPRESS operation merges these extranious points. | |
/// insert_threshold: usize, | |
/// compress_threshold: usize, | |
// We aim for 'targeted' biased quantiles. That is, we have no interest in | |
// QUERY returning quantiles not a part of this structure. This saves a | |
// little memory space, cpu time and gives us better hooks for property | |
// testing. | |
quantiles: Vec<Quantile>, | |
// We store incoming points in a priority queue and every insert_threshold | |
// items drain buffer into samples. This is the 'cursor' method of the | |
// paper. | |
/// buffer: BinaryHeap<T>, | |
// This is the S(n) of the above paper. Entries are stored here and | |
// occasionally merged. The outlined implementation uses a linked list but | |
// we prefer a Vec for reasons of cache locality at the cost of worse | |
// computational complexity. | |
samples: Vec<Entry<T>>, | |
} | |
impl<T: Copy + PartialOrd + Debug> CKMS<T> { // where T: std::cmp::Ord { | |
fn new() -> CKMS<T> { | |
CKMS { | |
n: 0, | |
// insert_threshold: 1000, | |
// compress_threshold: 1250, | |
quantiles: vec![Quantile::new(0.25, 0.01), | |
Quantile::new(0.50, 0.01), | |
Quantile::new(0.75, 0.01), | |
Quantile::new(0.95, 0.01), | |
Quantile::new(0.99, 0.01), | |
Quantile::new(0.999, 0.01), | |
], | |
// buffer: BinaryHeap::<T>::new(), | |
samples: Vec::<Entry<T>>::new(), | |
} | |
} | |
#[cfg(test)] | |
pub fn smpls(&self) -> Vec<T> { | |
self.samples.clone().into_iter().map(|e| e.v).collect() | |
} | |
#[cfg(test)] | |
pub fn min(&self) -> Entry<T> { | |
self.samples[0].clone() | |
} | |
#[cfg(test)] | |
pub fn max(&self) -> Entry<T> { | |
self.samples[self.samples.len()-1].clone() | |
} | |
// I think this may well be wrong | |
pub fn invariant(&self, r: f64, n: f64) -> f64 { | |
let mut min_error = n + 1.0; | |
for q in self.quantiles.iter() { | |
let error; | |
if r <= q.quantile * n { error = q.u * (n - r); } | |
else { error = q.v * r; } | |
if error < min_error { min_error = error; } | |
} | |
min_error | |
} | |
pub fn insert(&mut self, v: T) { | |
let mut r = 0.0; | |
let s = self.samples.len(); | |
if s == 0 { | |
self.samples.insert(0, Entry{v: v, g: 1.0, delta: 0.0}); | |
self.n += 1; | |
return; | |
} | |
let mut idx = 0; | |
for i in 0..s { | |
let ref smpl = self.samples[i]; | |
match cmp(smpl.v, v) { | |
Cmp::LT => { idx += 1 }, | |
Cmp::EQ => break, | |
Cmp::GT => break, | |
} | |
r = r + smpl.g; | |
} | |
let delta; | |
if idx == 0 || idx == s { | |
delta = 0.0; | |
} else { | |
delta = self.invariant(r, self.n as f64).floor() - 1.0; | |
} | |
self.samples.insert(idx, Entry{v: v, g: 1.0, delta: delta}); | |
self.n += 1; | |
} | |
pub fn query(&self, q: f64) -> Option<(f64, T)> { | |
let s = self.samples.len(); | |
let mut r = 0.0; | |
let nphi = q*(self.n as f64); | |
for i in 1..s { | |
let ref prev = self.samples[i-1]; | |
let ref cur = self.samples[i]; | |
r += prev.g; | |
if (r + cur.g + cur.delta) > (nphi + (self.invariant(nphi, self.n as f64) / 2.0)) { | |
return Some((r, prev.v)) | |
} | |
} | |
let v = self.samples[s-1].v; | |
return Some((s as f64, v)); | |
} | |
} | |
#[cfg(test)] | |
mod test { | |
use super::*; | |
use quickcheck::{QuickCheck, TestResult}; | |
use std::f64::consts::E; | |
// prop: forany phi. (phi*n - f(phi*n, n)/2) =< r_i =< (phi*n + f(phi*n, n)/2) | |
#[test] | |
fn query_invariant_test() { | |
fn query_invariant(f: f64, fs: Vec<i64>) -> TestResult { | |
if fs.len() < 1 { | |
return TestResult::discard(); | |
} | |
let phi = ( 1.0/(1.0 + E.powf(f.abs())) )*2.0; | |
let mut ckms = CKMS::<i64>::new(); | |
for f in fs { | |
ckms.insert(f); | |
} | |
match ckms.query(phi) { | |
None => TestResult::passed(), // invariant to check here? n*phi + f > 1? | |
Some((rank, _)) => { | |
let nphi = phi * (ckms.n as f64); | |
let fdiv2 = ckms.invariant(nphi, ckms.n as f64) / 2.0; | |
TestResult::from_bool( ((nphi - fdiv2) <= rank) || (rank <= (nphi + fdiv2))) | |
} | |
} | |
} | |
QuickCheck::new().tests(10000).max_tests(100000).quickcheck(query_invariant as fn(f64, Vec<i64>) -> TestResult); | |
} | |
// prop: v_i-1 < v_i =< v_i+1 | |
#[test] | |
fn asc_samples_test() { | |
fn asc_samples(fs: Vec<i64>) -> TestResult { | |
let mut ckms = CKMS::<i64>::new(); | |
let mut srt_fs = fs.clone(); | |
let fsc = fs.clone(); | |
for f in fs { | |
ckms.insert(f); | |
} | |
srt_fs.sort(); | |
if srt_fs != ckms.smpls() { | |
println!("{:?} => {:?}", fsc.clone(), ckms.smpls()); | |
return TestResult::failed() | |
} | |
TestResult::passed() | |
} | |
QuickCheck::new().tests(10000).max_tests(100000).quickcheck(asc_samples as fn(Vec<i64>) -> TestResult); | |
} | |
// prop: forall i. g_i + delta_i =< f(r_i, n) | |
#[test] | |
fn f_invariant_test() { | |
fn f_invariant(fs: Vec<i64>) -> bool { | |
let mut ckms = CKMS::<i64>::new(); | |
for f in fs { | |
ckms.insert(f); | |
} | |
let mut r = 0.0; | |
for ent in ckms.samples.iter() { | |
r += ent.g; | |
let res = (ent.g + ent.delta) <= ckms.invariant(r, ckms.n as f64); | |
if !res { | |
println!("{:?} <= {:?}", ent.g + ent.delta, ckms.invariant(r, ckms.n as f64)); | |
println!("samples: {:?}", ckms.samples); | |
return false; | |
} | |
} | |
true | |
} | |
QuickCheck::new().tests(10000).max_tests(100000).quickcheck(f_invariant as fn(Vec<i64>) -> bool); | |
} | |
#[test] | |
fn test_basics() { | |
let mut ckms = CKMS::<i32>::new(); | |
for i in 1..11 { ckms.insert(i as i32); } | |
assert_eq!(ckms.query(0.00), Some((1.0, 1))); | |
assert_eq!(ckms.query(0.05), Some((1.0, 1))); | |
assert_eq!(ckms.query(0.10), Some((1.0, 1))); | |
assert_eq!(ckms.query(0.15), Some((1.0, 1))); | |
assert_eq!(ckms.query(0.20), Some((2.0, 2))); | |
assert_eq!(ckms.query(0.25), Some((2.0, 2))); | |
assert_eq!(ckms.query(0.30), Some((3.0, 3))); | |
assert_eq!(ckms.query(0.35), Some((3.0, 3))); | |
assert_eq!(ckms.query(0.40), Some((4.0, 4))); | |
assert_eq!(ckms.query(0.45), Some((4.0, 4))); | |
assert_eq!(ckms.query(0.50), Some((5.0, 5))); | |
assert_eq!(ckms.query(0.55), Some((5.0, 5))); | |
assert_eq!(ckms.query(0.60), Some((6.0, 6))); | |
assert_eq!(ckms.query(0.65), Some((6.0, 6))); | |
assert_eq!(ckms.query(0.70), Some((7.0, 7))); | |
assert_eq!(ckms.query(0.75), Some((7.0, 7))); | |
assert_eq!(ckms.query(0.80), Some((8.0, 8))); | |
assert_eq!(ckms.query(0.85), Some((8.0, 8))); | |
assert_eq!(ckms.query(0.90), Some((9.0, 9))); | |
assert_eq!(ckms.query(0.95), Some((9.0, 9))); | |
assert_eq!(ckms.query(0.99), Some((10.0, 10))); | |
assert_eq!(ckms.query(1.00), Some((10.0, 10))); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment