Last active
June 16, 2023 00:18
-
-
Save Integralist/dd65c727ea3aabe3333d7995eb9665c9 to your computer and use it in GitHub Desktop.
[Rust Tokio Spawn and Retry] #rust #async #retry
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::any::type_name; | |
use tokio_retry::strategy::{jitter, FixedInterval}; | |
use tokio_retry::Retry; | |
#[derive(Debug)] | |
struct ImageId { | |
id: Option<String>, | |
} | |
async fn action() -> Result<ImageId, ()> { | |
println!("action(): doing stuff async"); | |
Ok(ImageId { | |
id: Some(String::from("some id")), | |
}) | |
} | |
async fn something() -> Result<u64, ()> { | |
Ok(999) | |
} | |
async fn something_with_an_arg(n: u8) -> Result<u8, ()> { | |
Ok(n) | |
} | |
async fn something_with_an_arg_that_errors() -> Result<u8, ()> { | |
Err(()) | |
} | |
fn type_of<T>(_: &T) { | |
println!("type_of(): {}", type_name::<T>()) | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), ()> { | |
let handle = tokio::spawn(async move { action().await }); | |
type_of(&handle); | |
// Calling await returns a Result containing whatever was returned from the `async` closure | |
// executed from inside the `spawn` method. In this case another Result which was returned by | |
// the `action()` async function. That nested Result contains the ImageId struct. | |
let task = handle.await; | |
println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") })) | |
println!("task unwrapped: {:?}", task.unwrap().unwrap().id.unwrap()); // "some id" | |
let retry_strategy = FixedInterval::from_millis(1000) | |
.map(jitter) // add jitter to delays | |
.take(10); // limit to 10 retries | |
let result = Retry::spawn(retry_strategy.clone(), action).await?; | |
println!("retry result.id from action(): {:#?}", result.id); | |
// Demonstrating how to pass arguments to an asynchronous function by way of defining an async | |
// block first and calling the async function from within it (as the Retry::spawn method | |
// doesn't allow us to pass arguments to the specified 'action'). | |
// | |
// NOTE: You can't use the ? operator within an async block as a closure can't bubble up the | |
// error type. Although, that said I didn't try specifying a specific return type using -> so | |
// maybe it would work. For the purposes of this example it doesn't matter. | |
let result = Retry::spawn(retry_strategy.clone(), || async { | |
println!("doing stuff async in a closure"); | |
something().await | |
}) | |
.await?; | |
println!("retry result from closure: {}", result); | |
// We have to clone this to avoid 'move' semantic issues that we otherwise would encounter if | |
// we just tried to call retry_strategy.clone() as the argument inside the tokio::spawn's async | |
// block. This happens because we move retry_strategy into the first async block, and then when | |
// we do another spawn later we'd then try and move it when it has already been moved. I didn't | |
// realise that it would be moved because I was calling .clone() and so expected the cloned | |
// instance to be moved, but it kinda makes sense that the variable itself is moved. | |
// | |
// Maybe we could avoid this with a reference, but I'm trying to keep the example as close to a | |
// real project I'm trying to include Retry::spawn into. | |
let clone_strategy = retry_strategy.clone(); | |
// Demonstrating that the async block can return any type of data, and also that we're able to | |
// do a retry operation within a tokio spawned task. | |
let handle = tokio::spawn(async move { | |
( | |
"some random key", | |
Retry::spawn(clone_strategy, action).await, | |
) | |
}); | |
let task = handle.await; | |
println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") })) | |
// Demonstrating the same as above, but additionally the use of a closure to allow us to pass | |
// arguments to the Retry::spawn 'action'. | |
// | |
// NOTE: If the argument value 123 passed to something_with_an_arg was a complex type (e.g. a | |
// type that doesn't implement Copy), then this would cause an error related to the closure | |
// implementing FnOnce and not the required FnMut (that Retry::spawn expects). See here | |
// https://stackoverflow.com/a/30232500/14849316 explanation of FnOnce, FnMut and Fn. But in | |
// essence we might be moving a type from the closure's environment into the | |
// something_with_an_arg and that would mean it's FnOnce. The solution would be to change the | |
// signature for something_with_an_arg to accept a reference so we pass a reference as the | |
// argument type and thus the closure becomes FnMut because it doesn't move any variables. | |
let clone_strategy = retry_strategy.clone(); | |
let handle = tokio::spawn(async move { | |
( | |
"some random key", | |
Retry::spawn(clone_strategy, || async { | |
something_with_an_arg(123).await | |
}) | |
.await, | |
) | |
}); | |
let task = handle.await; | |
println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") })) | |
// NOTE: retry_strategy has now 'moved' into (i.e. been consumed by) the async block so it | |
// can't be used again after this point in the code (see above `clone_strategy` variables). | |
let handle = tokio::spawn(async move { | |
( | |
"some random key", | |
Retry::spawn(retry_strategy.clone(), || async { | |
println!("trying a function that errors"); | |
something_with_an_arg_that_errors().await | |
}) | |
.await, | |
) | |
}); | |
let task = handle.await; | |
println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") })) | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment