-
-
Save mrsarm/28ff763ccc710e6a2875c257cb03edee to your computer and use it in GitHub Desktop.
vector_threads_and_channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Multi-thread example in Rust of iterating a vector as input data | |
/// using threads in a safe way, using the threads and the channels APIs, and | |
/// the `Arc` and `Mutex` types. | |
/// | |
/// ## Details | |
/// | |
/// Spawn MAX_NTHREADS number of threads, then from each thread borrow | |
/// the vector to only pop one string from it (the element is taken out | |
/// of the vector), and process the string element to | |
/// calculate a new string and publish it into a channel, that | |
/// can then be listened by a receiver from the main thread, while the | |
/// children threads keep sending new elements to be processed. | |
/// | |
/// Finally, print the result from the main thread, after all children | |
/// threads finished. | |
use rand::random; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::mpsc::{Sender, Receiver}; | |
use std::sync::mpsc; | |
use std::time::Duration; | |
use std::thread; | |
static MAX_NTHREADS: usize = 4; | |
/// Function that pretends to do something useful, getting | |
/// a new string from input data, and taking some time to do it | |
fn transformer(tid: usize, element: &str) -> String { | |
thread::sleep(Duration::from_millis(3 * random::<u8>() as u64)); | |
format!("{}:{}", tid, element) | |
} | |
fn main() { | |
let num_elements = 40; // Elements created inside the input vector for testing | |
// input vector, inside an Arc<Mutex> to be consumed from the threads, | |
// at the end of the execution of all threads, the vector will have | |
// all it's elements consumed (will be empty) | |
let input = Arc::new( | |
Mutex::new( | |
(0..num_elements).step_by(1) | |
.map(|n| n.to_string()).collect::<Vec<String>>() | |
) | |
); | |
let nthreads = MAX_NTHREADS; | |
let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel(); | |
let mut tchildren = Vec::new(); | |
println!("Spawning {} threads ...", nthreads); | |
for tid in 0..nthreads { | |
let input = Arc::clone(&input); | |
let thread_tx = tx.clone(); | |
let child = thread::spawn(move || { | |
println!("Running scoped thread {} ...", tid); | |
loop { | |
let mut v = input.lock().unwrap(); | |
let last = v.pop(); // take one element out from the vec and free | |
drop(v); // the vector lock so other threads can get it | |
if let Some(el) = last { | |
thread_tx.send(transformer(tid, &el)).unwrap(); | |
} else { | |
break; // The vector got empty | |
} | |
} | |
}); | |
tchildren.push(child); | |
} | |
println!("... spawning ended."); | |
// Here, all the messages are collected | |
let mut output: Vec<String> = Vec::with_capacity(num_elements); | |
for _ in 0..num_elements { | |
// The `recv` method picks a message from the channel | |
// `recv` will block the current thread if there are no messages available | |
let out = rx.recv().unwrap(); | |
println!(">> {out}"); | |
output.push(out); | |
} | |
// Wait for the threads to complete any remaining work | |
for child in tchildren { | |
child.join().expect("oops! the child thread panicked"); | |
} | |
dbg!(output); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment