Last active
December 12, 2017 19:32
-
-
Save skade/25f9b0c87b1d5bd39d9fc6ffe0d1840a to your computer and use it in GitHub Desktop.
Passing data between a reactor and a thread pool, back and forth. Playground link: https://play.rust-lang.org/?gist=25f9b0c87b1d5bd39d9fc6ffe0d1840a
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate tokio_core; | |
extern crate futures_cpupool; | |
extern crate futures; | |
use futures::future::lazy; | |
use std::sync::Arc; | |
// Make sure data is not copy | |
#[derive(Debug)] | |
struct Data { | |
inner: i32 | |
} | |
// This isn't very interesting and that's kind of the point. | |
// What this code does is take a value and pass it through | |
// multiple concurrent systems. It passes the value on when | |
// a suspension point is reached. | |
// | |
// This goes: | |
// 1. Run a future on a tokio core | |
// 2. When it finishes, pass the value to a thread on a thread pool | |
// 3. By the end of the thread, pass the value back to a reactor | |
// 4. Once the future finishes, put it on the pool again | |
// | |
// How is that useful? Imagine having the following scenario: | |
// 1. You accept an HTTP request using async IO | |
// 2. You start handling the request in a thread | |
// 3. You start a couple of backend calls, using an async client | |
// b. You join on all of them | |
// 4. You continue running | |
// | |
// Often, you'd block the request handling thread at point 3, waiting | |
// for the async operations to finish. | |
// | |
// Using this technique, you can release the request handling thread | |
// while waiting for the backends to respond and then hop back on one | |
// when things are done. | |
// | |
// Obviously, depending on your scenario, this might or might not make | |
// sense. | |
fn main() { | |
let mut core = tokio_core::reactor::Core::new().unwrap(); | |
let pool = Arc::new(futures_cpupool::CpuPool::new(4)); | |
let data = Data { inner: 42 }; | |
let remote = core.remote(); | |
let on_reactor = lazy(move || { | |
println!("on_reactor: {:?}", data); | |
let remote_clone = remote.clone(); | |
let pool_handle = pool.clone(); | |
let on_pool = lazy(move || { | |
println!("on_pool: {:?}", data); | |
let another_remote = remote_clone.clone(); | |
let on_reactor_again = lazy(move || { | |
println!("on_reactor_again: {:?}", data); | |
let back_to_the_pool = lazy(move || { | |
println!("back_to_the_pool: {:?}", data); | |
let result: Result<(),()> = Ok(()); | |
result | |
}); | |
let pool_future = pool_handle.spawn(back_to_the_pool); | |
another_remote.spawn(|_| { pool_future }); | |
let result: Result<(),()> = Ok(()); | |
result | |
}); | |
remote_clone.spawn(|_| { on_reactor_again }); | |
let result: Result<(),()> = Ok(()); | |
result | |
}); | |
let pool_future = pool.spawn(on_pool); | |
remote.spawn(|_| { pool_future }); | |
let result: Result<(),()> = Ok(()); | |
result | |
}); | |
core.run(on_reactor).unwrap(); | |
// This avoids exiting before we are done. | |
core.turn(None); | |
core.turn(None); | |
core.turn(None); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment