Skip to content

Instantly share code, notes, and snippets.

@lu4nm3
Last active June 25, 2024 02:44
Show Gist options
  • Save lu4nm3/b8bca9431cdcf19d73040ada13387e58 to your computer and use it in GitHub Desktop.
Save lu4nm3/b8bca9431cdcf19d73040ada13387e58 to your computer and use it in GitHub Desktop.
Tokio Async: Concurrent vs Parallel
use futures::StreamExt;
use std::error::Error;
use tokio;
use tokio::macros::support::Pin;
use tokio::prelude::*;
use tokio::time::{Duration, Instant};
pub fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut multi_threaded_runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.core_threads(10)
.max_threads(10)
.thread_name("multi-threaded")
.build()?;
// multi_threaded_runtime.block_on(concurrent());
multi_threaded_runtime.block_on(concurrentAndParallel());
Ok(())
}
// As can be seen by the output below, despite specifying parallelism of 3, we are still bound by
// the fact that all of the future generated by this function execute within a single task.
async fn concurrent() {
let before = Instant::now();
let paths = (0..6).rev();
let fetches = futures::stream::iter(paths.into_iter().map(|path| make_request(path)))
.buffer_unordered(3)
.map(|r| println!("finished request: {}", r))
.collect::<Vec<_>>();
fetches.await;
println!("elapsed time: {:.2?}", before.elapsed());
}
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 5
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 4
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 3
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 2
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 1
// started request
// finished request: current thread ThreadId(1) | thread name main | request_duration 0
// elapsed time: 15.01s
// Here, we wrap every future within its own task using tokio::spawn. This allows the "requests" to
// execute in parallel (depending on how many threads the runtime is configured with; 10 in this
// case) using the multiplexing that Tokio does between different tasks and threads. You can see
// from the output how 3 threads with ids, 9, 10, and 11, are consistently used to execute all of
// the 6 "requests".
async fn concurrentAndParallel() {
let before = Instant::now();
let paths = (0..6).rev();
let fetches = futures::stream::iter(
paths
.into_iter()
.map(|path| tokio::spawn(make_request(path))),
)
.buffer_unordered(3)
.map(|r| {
println!(
"finished request: {}",
match r {
Ok(rr) => rr,
Err(_) => String::from("Bad"),
}
);
})
.collect::<Vec<_>>();
fetches.await;
println!("elapsed time: {:.2?}", before.elapsed());
}
// started request
// started request
// started request
// finished request: current thread ThreadId(9) | thread name multi-threaded | request_duration 3
// started request
// finished request: current thread ThreadId(10) | thread name multi-threaded | request_duration 4
// started request
// finished request: current thread ThreadId(11) | thread name multi-threaded | request_duration 5
// started request
// finished request: current thread ThreadId(11) | thread name multi-threaded | request_duration 0
// finished request: current thread ThreadId(10) | thread name multi-threaded | request_duration 1
// finished request: current thread ThreadId(9) | thread name multi-threaded | request_duration 2
// elapsed time: 5.01s
async fn make_request(sleep: u64) -> String {
println!("started request");
std::thread::sleep(Duration::from_secs(sleep));
format!(
"current thread {:?} | thread name {} | request_duration {:?}",
std::thread::current().id(),
std::thread::current()
.name()
.get_or_insert("default_thread_name"),
sleep
)
}
@kroeckx
Copy link

kroeckx commented Aug 25, 2021

The documentation for std::thread::sleep says: This function is blocking, and should not be used in async functions. I assume that is the reason you need to run it parallel and that the concurrent version is slower.

@lu4nm3
Copy link
Author

lu4nm3 commented Aug 25, 2021

Right. I just wanted to use something that blocked to show how you can effectively parallelize work by using multiple tasks (unlike with the concurrent example which uses a single task for everything)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment