Skip to content

Instantly share code, notes, and snippets.

@gterzian
Created May 4, 2020 07:52
Show Gist options
  • Save gterzian/ece3cc69549174d85e4317a9b254e560 to your computer and use it in GitHub Desktop.
Save gterzian/ece3cc69549174d85e4317a9b254e560 to your computer and use it in GitHub Desktop.
// Spawn a "processor" component in parallel.
let _ = thread::spawn(move || {
// The processor has two worker threads at it's disposal.
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(2)
.build()
.unwrap();
// Workers in the pool communicate that they've finished a unit of work,
// back to the main-thread of the "processor", via this channel.
let (pool_result_sender, pool_result_receiver) = unbounded();
// A counter of ongoing work performed on the pool.
let mut ongoing_work = 0;
// A flag to keep track of whether the source has already stopped producing.
let mut exiting = false;
loop {
// Receive, and handle, messages,
// until told to exit.
select! {
recv(work_receiver) -> msg => {
match msg {
Ok(SourceMsg::Work(num)) => {
// Surprisingly, the length of the channel doesn't really go up.
// Where's the work piling-up?
println!("Queue: {:?}", work_receiver.len());
// Clone the channels to move them into the worker.
let result_sender = result_sender.clone();
let pool_result_sender = pool_result_sender.clone();
ongoing_work +=1;
pool.spawn(move || {
// Perform some "work", sending the result to the "consumer".
thread::sleep(Duration::from_millis(3));
let _ = result_sender.send(ProcessorMsg::Result(num));
let _ = pool_result_sender.send(());
});
},
Ok(SourceMsg::Stopped) => {
// Note that we've received the request to exit.
exiting = true;
// If there is no ongoing work,
// we can immediately exit.
if ongoing_work == 0 {
let _ = result_sender.send(ProcessorMsg::Stopped);
break;
}
}
_ => {
// Note: will not happen thanks to `_work_sender_clone`.
panic!("Error receiving a SourceMsg.");
},
}
},
recv(pool_result_receiver) -> _ => {
if ongoing_work == 0 {
panic!("Received an unexpected pool result.");
}
// Note that a unit of work has been completed.
ongoing_work -=1;
// If there is no more ongoing work,
// and we've received the request to exit,
// now is the time to exit.
if ongoing_work == 0 && exiting {
let _ = result_sender.send(ProcessorMsg::Stopped);
break;
}
},
}
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment