Created
August 6, 2020 16:10
-
-
Save jesperdj/9ecf6ade50b15d27efd8ff0ceb2205a0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crossbeam_channel; | |
use crossbeam_utils::thread; | |
/// Multi-threaded processing using Crossbeam. | |
/// | |
/// This function takes a generator which is an `Iterator` over values of type `V`, a processor function that processes values of type `V` into results of | |
/// type `R`, and an aggregator function which accepts results of type `R`. | |
/// | |
/// It starts one generator thread and a number of processor threads. The generator thread gets values from the generator and sends them to an input channel. | |
/// The processor threads receive from the input channel, call the processor function to compute a result for each value and send the results to an output | |
/// channel. The main thread receives from the output channel and aggregates the results using the aggregator function. | |
fn process<V, R, G, P, A>(mut generate: G, process: &P, aggregate: &mut A) -> std::thread::Result<()> | |
where | |
V: Sync + Send, | |
R: Sync + Send, | |
G: Iterator<Item=V> + Sync + Send, | |
P: Fn(V) -> R + Sync + Send, | |
A: FnMut(R), | |
{ | |
const PROCESSOR_COUNT: usize = 4; | |
const INPUT_CHANNEL_CAPACITY: usize = 1000; | |
const OUTPUT_CHANNEL_CAPACITY: usize = 1000; | |
let (input_snd, input_rcv) = crossbeam_channel::bounded(INPUT_CHANNEL_CAPACITY); | |
let (output_snd, output_rcv) = crossbeam_channel::bounded(OUTPUT_CHANNEL_CAPACITY); | |
thread::scope(|scope| { | |
{ | |
let generator_snd = input_snd.clone(); | |
scope.spawn(move |_| { | |
println!("generator started"); | |
while let Some(value) = generate.next() { | |
generator_snd.send(value).unwrap(); | |
} | |
println!("generator finished"); | |
}); | |
} | |
for id in 1..=PROCESSOR_COUNT { | |
let processor_rcv = input_rcv.clone(); | |
let processor_snd = output_snd.clone(); | |
scope.spawn(move |_| { | |
println!("thread {} started", id); | |
for value in processor_rcv { | |
let result = process(value); | |
processor_snd.send(result).unwrap(); | |
} | |
println!("thread {} finished", id); | |
}); | |
} | |
drop(input_snd); | |
drop(input_rcv); | |
drop(output_snd); | |
println!("aggregator started"); | |
for result in output_rcv { | |
aggregate(result); | |
} | |
println!("aggregator finished"); | |
}) | |
} | |
fn main() { | |
let xs = vec![1, 5, 34, -2, 6, 10]; | |
let prc = |x| x * 2 + 1; | |
let mut total = 0; | |
let mut agg = |x| total += x; | |
process(xs.iter(), &prc, &mut agg).unwrap(); | |
println!("total = {}", total); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment