Created
May 4, 2020 07:52
-
-
Save gterzian/ece3cc69549174d85e4317a9b254e560 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Spawn a "processor" component in parallel. | |
let _ = thread::spawn(move || { | |
// The processor has two worker threads at it's disposal. | |
let pool = rayon::ThreadPoolBuilder::new() | |
.num_threads(2) | |
.build() | |
.unwrap(); | |
// Workers in the pool communicate that they've finished a unit of work, | |
// back to the main-thread of the "processor", via this channel. | |
let (pool_result_sender, pool_result_receiver) = unbounded(); | |
// A counter of ongoing work performed on the pool. | |
let mut ongoing_work = 0; | |
// A flag to keep track of whether the source has already stopped producing. | |
let mut exiting = false; | |
loop { | |
// Receive, and handle, messages, | |
// until told to exit. | |
select! { | |
recv(work_receiver) -> msg => { | |
match msg { | |
Ok(SourceMsg::Work(num)) => { | |
// Surprisingly, the length of the channel doesn't really go up. | |
// Where's the work piling-up? | |
println!("Queue: {:?}", work_receiver.len()); | |
// Clone the channels to move them into the worker. | |
let result_sender = result_sender.clone(); | |
let pool_result_sender = pool_result_sender.clone(); | |
ongoing_work +=1; | |
pool.spawn(move || { | |
// Perform some "work", sending the result to the "consumer". | |
thread::sleep(Duration::from_millis(3)); | |
let _ = result_sender.send(ProcessorMsg::Result(num)); | |
let _ = pool_result_sender.send(()); | |
}); | |
}, | |
Ok(SourceMsg::Stopped) => { | |
// Note that we've received the request to exit. | |
exiting = true; | |
// If there is no ongoing work, | |
// we can immediately exit. | |
if ongoing_work == 0 { | |
let _ = result_sender.send(ProcessorMsg::Stopped); | |
break; | |
} | |
} | |
_ => { | |
// Note: will not happen thanks to `_work_sender_clone`. | |
panic!("Error receiving a SourceMsg."); | |
}, | |
} | |
}, | |
recv(pool_result_receiver) -> _ => { | |
if ongoing_work == 0 { | |
panic!("Received an unexpected pool result."); | |
} | |
// Note that a unit of work has been completed. | |
ongoing_work -=1; | |
// If there is no more ongoing work, | |
// and we've received the request to exit, | |
// now is the time to exit. | |
if ongoing_work == 0 && exiting { | |
let _ = result_sender.send(ProcessorMsg::Stopped); | |
break; | |
} | |
}, | |
} | |
} | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment