Last active
April 16, 2019 06:52
-
-
Save comnik/a6982534a0ec85fe831a5137236dd039 to your computer and use it in GitHub Desktop.
The shipping puzzle (https://kevinlynagh.com/notes/shipping-puzzle/) in Differential Dataflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] | |
extern crate abomonation_derive; | |
extern crate abomonation; | |
extern crate timely; | |
extern crate differential_dataflow; | |
use std::fs::File; | |
use std::io::{BufRead,BufReader}; | |
use timely::dataflow::*; | |
use timely::dataflow::operators::probe::Handle; | |
use differential_dataflow::Collection; | |
use differential_dataflow::input::Input; | |
use differential_dataflow::lattice::Lattice; | |
use differential_dataflow::operators::*; | |
use differential_dataflow::operators::arrange::ArrangeByKey; | |
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Debug, Abomonation)] | |
enum Day { M, T, W, R, F, } | |
type Leg = (u64, Day, String, String); | |
type Route = (Vec<u64>,String,bool); | |
type Meetpoint = (Day,String); | |
fn next_day(day: &Day) -> Day { | |
match &day { | |
Day::M => Day::T, | |
Day::T => Day::W, | |
Day::W => Day::R, | |
Day::R => Day::F, | |
Day::F => Day::F, | |
} | |
} | |
fn initialize_route(day: &Day, (id,start,end): &(u64,String,String)) -> (Meetpoint,Route) { | |
((day.clone(),start.to_string()),(vec![*id],end.to_string(),true)) | |
} | |
fn main() { | |
timely::execute_from_args(std::env::args(), move |worker| { | |
let timer = ::std::time::Instant::now(); | |
let mut probe = Handle::new(); | |
let mut legs_in = worker.dataflow::<u64, _, _>(|scope| { | |
let (legs_in, raw_legs) = scope.new_collection::<Leg,isize>(); | |
let start_on = raw_legs | |
.map(|(id,day,start,end)| (day, (id,start,end))) | |
.arrange_by_key(); | |
let routes = start_on | |
.filter(|day,_leg| day == &Day::M) | |
.as_collection(|day, (id,_start,end)| ((next_day(&day),end.to_string()),(vec![*id],end.to_string(),false))) | |
.concat(&start_on.filter(|day,_leg| day == &Day::T).as_collection(initialize_route)) | |
.pair_up() | |
.concat(&start_on.filter(|day,_leg| day == &Day::W).as_collection(initialize_route)) | |
.pair_up() | |
.concat(&start_on.filter(|day,_leg| day == &Day::R).as_collection(initialize_route)) | |
.pair_up() | |
.concat(&start_on.filter(|day,_leg| day == &Day::F).as_collection(initialize_route)) | |
.pair_up(); | |
routes | |
.map(|_| ()) | |
.consolidate() | |
.inspect(|x| { println!("found {:?} routes", x); }) | |
.probe_with(&mut probe); | |
legs_in | |
}); | |
let input = BufReader::new(File::open("/Users/niko/data/shipping-puzzle/legs.txt").unwrap()); | |
let index = worker.index(); | |
let num_workers = worker.peers(); | |
let mut line_count = 0; | |
for line in input.lines() { | |
if line_count % num_workers == index { | |
let l = line.unwrap(); | |
let vals: Vec<&str> = l.split(" ").collect(); | |
let id: u64 = vals[0].parse().unwrap(); | |
let day: Day = match vals[3] { | |
"M" => Day::M, | |
"T" => Day::T, | |
"W" => Day::W, | |
"R" => Day::R, | |
"F" => Day::F, | |
_ => panic!("Invalid day found: {}", vals[3]), | |
}; | |
legs_in.insert((id, day, vals[1].to_string(), vals[2].to_string())); | |
} | |
line_count = line_count + 1; | |
} | |
println!("{:?}\tloaded", timer.elapsed()); | |
legs_in.advance_to(1); | |
legs_in.flush(); | |
worker.step_while(|| probe.less_than(legs_in.time())); | |
println!("{:?}\tstable", timer.elapsed()); | |
legs_in.insert((20000, Day::W, "LOS_ANGELES".to_string(), "CHARLOTTE".to_string())); | |
legs_in.advance_to(2); | |
legs_in.flush(); | |
worker.step_while(|| probe.less_than(legs_in.time())); | |
println!("{:?}\tstable", timer.elapsed()); | |
}).unwrap(); | |
} | |
trait PairUp<G: Scope> { fn pair_up(&self) -> Collection<G, (Meetpoint,Route), isize>; } | |
impl<G> PairUp<G> for Collection<G, (Meetpoint,Route), isize> | |
where G: Scope, G::Timestamp: Lattice, | |
{ | |
fn pair_up(&self) -> Collection<G, (Meetpoint,Route)> { | |
self | |
.group(|(day,place), input, output| { | |
let (mut arrivals, mut departures): (Vec<Route>,Vec<Route>) = input.iter().cloned() | |
.map(|(x,_diff)| x.clone()) | |
.partition(|(_,_,is_departing)| *is_departing == false); | |
{ | |
let pairs = arrivals.iter().zip(departures.iter()); | |
for ((arriving,_,_),(departing,to,_)) in pairs { | |
let mut route = arriving.clone(); | |
route.extend(departing); | |
output.push((((next_day(day),to.clone()),(route,to.clone(),false)), 1)) | |
} | |
} | |
if arrivals.len() > departures.len() { | |
for (route,to,_) in arrivals.drain(departures.len()..) { | |
output.push((((day.clone(),place.clone()),(route,to,false)), 1)) | |
} | |
} else if departures.len() > arrivals.len() { | |
for (route,to,_) in departures.drain(arrivals.len()..) { | |
output.push((((next_day(day),to.clone()),(route,to,false)), 1)) | |
} | |
} | |
}) | |
.map(|(_meetpoint,route)| route) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment