Last active
January 8, 2025 19:00
-
-
Save sscdotopen/0a90b727fa2101edac5de6af7ca69c01 to your computer and use it in GitHub Desktop.
Efficient maintenance of the mean of a bag of numbers with differential dataflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate timely; | |
extern crate differential_dataflow; | |
use serde::{Serialize, Deserialize}; | |
use timely::dataflow::operators::probe::Handle; | |
use differential_dataflow::input::Input; | |
use differential_dataflow::operators::*; | |
use differential_dataflow::difference::{Abelian, IsZero, Monoid, Semigroup}; | |
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] | |
struct MeanAggregate { | |
sum: i64, | |
count: isize | |
} | |
impl MeanAggregate { | |
fn new(value: i64, multiplicity: isize) -> Self { | |
Self { sum: value * multiplicity as i64, count: multiplicity, } | |
} | |
fn get(&self) -> i64 { | |
self.sum / self.count as i64 | |
} | |
} | |
impl IsZero for MeanAggregate { | |
fn is_zero(&self) -> bool { | |
self.count == 0 | |
} | |
} | |
impl Semigroup for MeanAggregate { | |
fn plus_equals(&mut self, other: &Self) { | |
self.sum += other.sum; | |
self.count += other.count; | |
} | |
} | |
impl Monoid for MeanAggregate { | |
fn zero() -> Self { | |
Self { sum: 0, count: 0, } | |
} | |
} | |
impl Abelian for MeanAggregate { | |
fn negate(&mut self) { | |
self.sum *= -1; | |
self.count *= -1; | |
} | |
} | |
fn main() { | |
let _ = timely::execute_from_args(std::env::args(), move |worker| { | |
let mut probe = Handle::new(); | |
let mut column_values_input = worker.dataflow(|scope| { | |
let (column_values_input, column_values) = scope.new_collection(); | |
let mean_per_column = column_values | |
.threshold(|(_column_id, value), multiplicity| { | |
MeanAggregate::new(*value, *multiplicity) | |
}) | |
.map(|(column_id, _value)| column_id) | |
.count() | |
.map(|(column, mean_aggregate)| (column, mean_aggregate.get())) | |
.consolidate() | |
.inspect(|(record, time, change)| { | |
eprintln!("Mean: {:?}, time: {:?}, change: {:?}", record, time, change) | |
}); | |
mean_per_column.probe_with(&mut probe); | |
column_values_input | |
}); | |
const COLUMN_ID: usize = 1; | |
let initial_values = [2, 4, 2, 4]; | |
column_values_input.advance_to(0); | |
for value in initial_values { | |
column_values_input.insert((COLUMN_ID, value)); | |
} | |
column_values_input.advance_to(1); | |
column_values_input.flush(); | |
println!("\n-- time 0 -> 1 --------------------"); | |
worker.step_while(|| probe.less_than(column_values_input.time())); | |
column_values_input.remove((COLUMN_ID, 4)); | |
column_values_input.remove((COLUMN_ID, 4)); | |
column_values_input.insert((COLUMN_ID, 5)); | |
column_values_input.advance_to(2); | |
column_values_input.flush(); | |
// No change in mean expected | |
println!("\n-- time 1 -> 2 --------------------"); | |
worker.step_while(|| probe.less_than(column_values_input.time())); | |
column_values_input.insert((COLUMN_ID, 7)); | |
column_values_input.advance_to(3); | |
column_values_input.flush(); | |
// Change in mean expected | |
println!("\n-- time 1 -> 3 --------------------"); | |
worker.step_while(|| probe.less_than(column_values_input.time())); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment