Skip to content

Instantly share code, notes, and snippets.

@sscdotopen
Last active January 8, 2025 19:00
Show Gist options
  • Save sscdotopen/0a90b727fa2101edac5de6af7ca69c01 to your computer and use it in GitHub Desktop.
Save sscdotopen/0a90b727fa2101edac5de6af7ca69c01 to your computer and use it in GitHub Desktop.
Efficient maintenance of the mean of a bag of numbers with differential dataflow
extern crate timely;
extern crate differential_dataflow;
use serde::{Serialize, Deserialize};
use timely::dataflow::operators::probe::Handle;
use differential_dataflow::input::Input;
use differential_dataflow::operators::*;
use differential_dataflow::difference::{Abelian, IsZero, Monoid, Semigroup};
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
struct MeanAggregate {
sum: i64,
count: isize
}
impl MeanAggregate {
fn new(value: i64, multiplicity: isize) -> Self {
Self { sum: value * multiplicity as i64, count: multiplicity, }
}
fn get(&self) -> i64 {
self.sum / self.count as i64
}
}
impl IsZero for MeanAggregate {
fn is_zero(&self) -> bool {
self.count == 0
}
}
impl Semigroup for MeanAggregate {
fn plus_equals(&mut self, other: &Self) {
self.sum += other.sum;
self.count += other.count;
}
}
impl Monoid for MeanAggregate {
fn zero() -> Self {
Self { sum: 0, count: 0, }
}
}
impl Abelian for MeanAggregate {
fn negate(&mut self) {
self.sum *= -1;
self.count *= -1;
}
}
fn main() {
let _ = timely::execute_from_args(std::env::args(), move |worker| {
let mut probe = Handle::new();
let mut column_values_input = worker.dataflow(|scope| {
let (column_values_input, column_values) = scope.new_collection();
let mean_per_column = column_values
.threshold(|(_column_id, value), multiplicity| {
MeanAggregate::new(*value, *multiplicity)
})
.map(|(column_id, _value)| column_id)
.count()
.map(|(column, mean_aggregate)| (column, mean_aggregate.get()))
.consolidate()
.inspect(|(record, time, change)| {
eprintln!("Mean: {:?}, time: {:?}, change: {:?}", record, time, change)
});
mean_per_column.probe_with(&mut probe);
column_values_input
});
const COLUMN_ID: usize = 1;
let initial_values = [2, 4, 2, 4];
column_values_input.advance_to(0);
for value in initial_values {
column_values_input.insert((COLUMN_ID, value));
}
column_values_input.advance_to(1);
column_values_input.flush();
println!("\n-- time 0 -> 1 --------------------");
worker.step_while(|| probe.less_than(column_values_input.time()));
column_values_input.remove((COLUMN_ID, 4));
column_values_input.remove((COLUMN_ID, 4));
column_values_input.insert((COLUMN_ID, 5));
column_values_input.advance_to(2);
column_values_input.flush();
// No change in mean expected
println!("\n-- time 1 -> 2 --------------------");
worker.step_while(|| probe.less_than(column_values_input.time()));
column_values_input.insert((COLUMN_ID, 7));
column_values_input.advance_to(3);
column_values_input.flush();
// Change in mean expected
println!("\n-- time 1 -> 3 --------------------");
worker.step_while(|| probe.less_than(column_values_input.time()));
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment