Last active
June 25, 2024 02:44
-
-
Save lu4nm3/b8bca9431cdcf19d73040ada13387e58 to your computer and use it in GitHub Desktop.
Tokio Async: Concurrent vs Parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::StreamExt; | |
use std::error::Error; | |
use tokio; | |
use tokio::macros::support::Pin; | |
use tokio::prelude::*; | |
use tokio::time::{Duration, Instant}; | |
pub fn main() -> Result<(), Box<dyn std::error::Error>> { | |
let mut multi_threaded_runtime = tokio::runtime::Builder::new() | |
.threaded_scheduler() | |
.enable_all() | |
.core_threads(10) | |
.max_threads(10) | |
.thread_name("multi-threaded") | |
.build()?; | |
// multi_threaded_runtime.block_on(concurrent()); | |
multi_threaded_runtime.block_on(concurrentAndParallel()); | |
Ok(()) | |
} | |
// As can be seen by the output below, despite specifying parallelism of 3, we are still bound by | |
// the fact that all of the future generated by this function execute within a single task. | |
async fn concurrent() { | |
let before = Instant::now(); | |
let paths = (0..6).rev(); | |
let fetches = futures::stream::iter(paths.into_iter().map(|path| make_request(path))) | |
.buffer_unordered(3) | |
.map(|r| println!("finished request: {}", r)) | |
.collect::<Vec<_>>(); | |
fetches.await; | |
println!("elapsed time: {:.2?}", before.elapsed()); | |
} | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 5 | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 4 | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 3 | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 2 | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 1 | |
// started request | |
// finished request: current thread ThreadId(1) | thread name main | request_duration 0 | |
// elapsed time: 15.01s | |
// Here, we wrap every future within its own task using tokio::spawn. This allows the "requests" to | |
// execute in parallel (depending on how many threads the runtime is configured with; 10 in this | |
// case) using the multiplexing that Tokio does between different tasks and threads. You can see | |
// from the output how 3 threads with ids, 9, 10, and 11, are consistently used to execute all of | |
// the 6 "requests". | |
async fn concurrentAndParallel() { | |
let before = Instant::now(); | |
let paths = (0..6).rev(); | |
let fetches = futures::stream::iter( | |
paths | |
.into_iter() | |
.map(|path| tokio::spawn(make_request(path))), | |
) | |
.buffer_unordered(3) | |
.map(|r| { | |
println!( | |
"finished request: {}", | |
match r { | |
Ok(rr) => rr, | |
Err(_) => String::from("Bad"), | |
} | |
); | |
}) | |
.collect::<Vec<_>>(); | |
fetches.await; | |
println!("elapsed time: {:.2?}", before.elapsed()); | |
} | |
// started request | |
// started request | |
// started request | |
// finished request: current thread ThreadId(9) | thread name multi-threaded | request_duration 3 | |
// started request | |
// finished request: current thread ThreadId(10) | thread name multi-threaded | request_duration 4 | |
// started request | |
// finished request: current thread ThreadId(11) | thread name multi-threaded | request_duration 5 | |
// started request | |
// finished request: current thread ThreadId(11) | thread name multi-threaded | request_duration 0 | |
// finished request: current thread ThreadId(10) | thread name multi-threaded | request_duration 1 | |
// finished request: current thread ThreadId(9) | thread name multi-threaded | request_duration 2 | |
// elapsed time: 5.01s | |
async fn make_request(sleep: u64) -> String { | |
println!("started request"); | |
std::thread::sleep(Duration::from_secs(sleep)); | |
format!( | |
"current thread {:?} | thread name {} | request_duration {:?}", | |
std::thread::current().id(), | |
std::thread::current() | |
.name() | |
.get_or_insert("default_thread_name"), | |
sleep | |
) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Right. I just wanted to use something that blocked to show how you can effectively parallelize work by using multiple tasks (unlike with the concurrent example which uses a single task for everything)