Skip to content

Instantly share code, notes, and snippets.

@Integralist
Last active June 16, 2023 00:18
Show Gist options
  • Save Integralist/dd65c727ea3aabe3333d7995eb9665c9 to your computer and use it in GitHub Desktop.
Save Integralist/dd65c727ea3aabe3333d7995eb9665c9 to your computer and use it in GitHub Desktop.
[Rust Tokio Spawn and Retry] #rust #async #retry
use std::any::type_name;
use tokio_retry::strategy::{jitter, FixedInterval};
use tokio_retry::Retry;
#[derive(Debug)]
struct ImageId {
id: Option<String>,
}
async fn action() -> Result<ImageId, ()> {
println!("action(): doing stuff async");
Ok(ImageId {
id: Some(String::from("some id")),
})
}
async fn something() -> Result<u64, ()> {
Ok(999)
}
async fn something_with_an_arg(n: u8) -> Result<u8, ()> {
Ok(n)
}
async fn something_with_an_arg_that_errors() -> Result<u8, ()> {
Err(())
}
fn type_of<T>(_: &T) {
println!("type_of(): {}", type_name::<T>())
}
#[tokio::main]
async fn main() -> Result<(), ()> {
let handle = tokio::spawn(async move { action().await });
type_of(&handle);
// Calling await returns a Result containing whatever was returned from the `async` closure
// executed from inside the `spawn` method. In this case another Result which was returned by
// the `action()` async function. That nested Result contains the ImageId struct.
let task = handle.await;
println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") }))
println!("task unwrapped: {:?}", task.unwrap().unwrap().id.unwrap()); // "some id"
let retry_strategy = FixedInterval::from_millis(1000)
.map(jitter) // add jitter to delays
.take(10); // limit to 10 retries
let result = Retry::spawn(retry_strategy.clone(), action).await?;
println!("retry result.id from action(): {:#?}", result.id);
// Demonstrating how to pass arguments to an asynchronous function by way of defining an async
// block first and calling the async function from within it (as the Retry::spawn method
// doesn't allow us to pass arguments to the specified 'action').
//
// NOTE: You can't use the ? operator within an async block as a closure can't bubble up the
// error type. Although, that said I didn't try specifying a specific return type using -> so
// maybe it would work. For the purposes of this example it doesn't matter.
let result = Retry::spawn(retry_strategy.clone(), || async {
println!("doing stuff async in a closure");
something().await
})
.await?;
println!("retry result from closure: {}", result);
// We have to clone this to avoid 'move' semantic issues that we otherwise would encounter if
// we just tried to call retry_strategy.clone() as the argument inside the tokio::spawn's async
// block. This happens because we move retry_strategy into the first async block, and then when
// we do another spawn later we'd then try and move it when it has already been moved. I didn't
// realise that it would be moved because I was calling .clone() and so expected the cloned
// instance to be moved, but it kinda makes sense that the variable itself is moved.
//
// Maybe we could avoid this with a reference, but I'm trying to keep the example as close to a
// real project I'm trying to include Retry::spawn into.
let clone_strategy = retry_strategy.clone();
// Demonstrating that the async block can return any type of data, and also that we're able to
// do a retry operation within a tokio spawned task.
let handle = tokio::spawn(async move {
(
"some random key",
Retry::spawn(clone_strategy, action).await,
)
});
let task = handle.await;
println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") }))
// Demonstrating the same as above, but additionally the use of a closure to allow us to pass
// arguments to the Retry::spawn 'action'.
//
// NOTE: If the argument value 123 passed to something_with_an_arg was a complex type (e.g. a
// type that doesn't implement Copy), then this would cause an error related to the closure
// implementing FnOnce and not the required FnMut (that Retry::spawn expects). See here
// https://stackoverflow.com/a/30232500/14849316 explanation of FnOnce, FnMut and Fn. But in
// essence we might be moving a type from the closure's environment into the
// something_with_an_arg and that would mean it's FnOnce. The solution would be to change the
// signature for something_with_an_arg to accept a reference so we pass a reference as the
// argument type and thus the closure becomes FnMut because it doesn't move any variables.
let clone_strategy = retry_strategy.clone();
let handle = tokio::spawn(async move {
(
"some random key",
Retry::spawn(clone_strategy, || async {
something_with_an_arg(123).await
})
.await,
)
});
let task = handle.await;
println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") }))
// NOTE: retry_strategy has now 'moved' into (i.e. been consumed by) the async block so it
// can't be used again after this point in the code (see above `clone_strategy` variables).
let handle = tokio::spawn(async move {
(
"some random key",
Retry::spawn(retry_strategy.clone(), || async {
println!("trying a function that errors");
something_with_an_arg_that_errors().await
})
.await,
)
});
let task = handle.await;
println!("task: {:?}", task); // Ok(Ok(ImageId { id: Some("some id") }))
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment