Created
March 7, 2024 12:14
-
-
Save sscdotopen/a0aabb3ef6a42b4261fa3feb40bc87e3 to your computer and use it in GitHub Desktop.
Implementation of the incremental view maintenance example from our lecture in Differential Dataflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate timely; | |
extern crate differential_dataflow; | |
use timely::dataflow::operators::probe::Handle; | |
use differential_dataflow::input::Input; | |
use differential_dataflow::operators::*; | |
fn main() { | |
let _ = timely::execute_from_args(std::env::args(), move |worker| { | |
let mut probe = Handle::new(); | |
let (mut customers_input, mut orders_input) = worker.dataflow(|scope| { | |
let (customers_input, customers) = scope.new_collection(); | |
let (orders_input, orders) = scope.new_collection(); | |
/* | |
CREATE VIEW HighPriceOrdersPerCustomer AS | |
SELECT Customers.Name, COUNT(*) AS NumOrders | |
FROM Customers | |
JOIN Orders ON Customers.Name = Orders.Name | |
WHERE Orders.Price > 250 | |
GROUP BY Customers.Name | |
*/ | |
let high_priced_orders_per_customer = | |
orders | |
.filter(|(_name, (_category, price))| *price > 250) | |
.join_map(&customers, |name: &String, _, _| (*name).to_string()) | |
.inspect(|(record, time, change)| { | |
eprintln!( | |
"\t Customer: {:?}, time: {:?}, change in order count: {:?}", | |
record, | |
time, | |
change | |
) | |
}); | |
high_priced_orders_per_customer.probe_with(&mut probe); | |
(customers_input, orders_input) | |
}); | |
let initial_customers = [ | |
("Bob".to_string(), ("99 High St.".to_string(), 415000)), | |
("Aliya".to_string(), ("125 Baker St.".to_string(), 415202)), | |
("Ji".to_string(), ("76 Square St.".to_string(), 415123)), | |
]; | |
let initial_orders = [ | |
("Bob".to_string(), ("Clothing".to_string(), 1200)), | |
("Bob".to_string(), ("Clothing".to_string(), 500)), | |
("Aliya".to_string(), ("Furniture".to_string(), 300)), | |
]; | |
customers_input.advance_to(0); | |
orders_input.advance_to(0); | |
for customer in initial_customers { | |
customers_input.insert(customer); | |
} | |
for order in initial_orders { | |
orders_input.insert(order); | |
} | |
customers_input.close(); | |
orders_input.advance_to(1); | |
orders_input.flush(); | |
println!("\n\t -- time 0 -> 1 --------------------"); | |
worker.step_while(|| probe.less_than(orders_input.time())); | |
let canceled_orders = [ | |
("Bob".to_string(), ("Clothing".to_string(), 500)), | |
]; | |
let new_orders = [ | |
("Bob".to_string(), ("Clothing".to_string(), 100)), | |
("Ji".to_string(), ("Furniture".to_string(), 1000)), | |
("Aliya".to_string(), ("Clothing".to_string(), 50)), | |
]; | |
for order in canceled_orders { | |
orders_input.remove(order); | |
} | |
for order in new_orders { | |
orders_input.insert(order); | |
} | |
orders_input.advance_to(2); | |
orders_input.flush(); | |
println!("\n\t -- time 1 -> 2 --------------------"); | |
worker.step_while(|| probe.less_than(orders_input.time())); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Running this program produces the following output: