Last active
March 1, 2024 22:58
-
-
Save mrsarm/0df4c474b6f9c2008ff9e421c5855df5 to your computer and use it in GitHub Desktop.
vector_threads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Multi-thread example in Rust of iterating a vector as input data in chunks, | |
/// using threads in a safe way, using the threads and the channels APIs, and | |
/// the `Arc` type. | |
/// | |
/// ## Details | |
/// | |
/// Spawn MAX_NTHREADS number of threads, then from each thread borrow a slice | |
/// of the vector data, and from each string inside | |
/// the slice, calculate a new string and publish it into a channel, that | |
/// can then be listened by a receiver from the main thread, while the | |
/// children threads keep sending new elements to be processed. | |
/// | |
/// Finally, print the result from the main thread, after all children | |
/// threads finished. | |
use rand::random; | |
use std::sync::Arc; | |
use std::sync::mpsc::{Sender, Receiver}; | |
use std::sync::mpsc; | |
use std::time::Duration; | |
use std::thread; | |
static MAX_NTHREADS: usize = 4; | |
/// Function that pretends to do something useful, getting | |
/// a new string from input data, and taking some time to do it | |
fn transformer(tid: usize, element: &str) -> String { | |
thread::sleep(Duration::from_millis(3 * random::<u8>() as u64)); | |
format!("{}:{}", tid, element) | |
} | |
fn main() { | |
// read-only vector, only with Arc is enough to share it across unknown number of threads | |
let input = Arc::new((0..40).step_by(1).map(|n| n.to_string()).collect::<Vec<String>>()); | |
let nthreads = MAX_NTHREADS; | |
let chunk_size = input.len() / nthreads; | |
let (tx, rx): (Sender<String>, Receiver<String>) = mpsc::channel(); | |
let mut tchildren = Vec::new(); | |
println!("Spawning {} threads ...", nthreads); | |
for tid in 0..nthreads { | |
let input = Arc::clone(&input); | |
let thread_tx = tx.clone(); | |
let child = thread::spawn(move || { | |
println!("Running scoped thread {} ...", tid); | |
let slice_start = tid * chunk_size; | |
let slice_end = slice_start + chunk_size; | |
for e in &input[slice_start..slice_end] { | |
thread_tx.send(transformer(tid, e)).unwrap(); | |
} | |
}); | |
tchildren.push(child); | |
} | |
println!("... spawning ended."); | |
// Here, all the messages are collected | |
let mut output: Vec<String> = Vec::with_capacity(input.len()); | |
for _ in 0..input.len() { | |
// The `recv` method picks a message from the channel | |
// `recv` will block the current thread if there are no messages available | |
let out = rx.recv().unwrap(); | |
println!(">> {out}"); | |
output.push(out); | |
} | |
// Wait for the threads to complete any remaining work | |
for child in tchildren { | |
child.join().expect("oops! the child thread panicked"); | |
} | |
dbg!(output); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A more efficient implementation that takes elements out of the input vector one by one but from multiple threads is here: https://gist.github.com/mrsarm/28ff763ccc710e6a2875c257cb03edee