Skip to content

Instantly share code, notes, and snippets.

@erebe
Last active December 14, 2023 19:27
Show Gist options
  • Save erebe/424c6eb05a9ed1414e5d98b8112723a8 to your computer and use it in GitHub Desktop.
Save erebe/424c6eb05a9ed1414e5d98b8112723a8 to your computer and use it in GitHub Desktop.
Common Mistake in Async Rust

At Qovery, we start to have our fair share of Async Rust and to say the least it is not without caveats. Let’s be honest, Async Rust is hard. It has many more rough edges than Sync Rust and requires a different mindset, but it solves a problem space well, that is hard to tackle otherwise. It allows safer network concurrency than C++ Boost.Asio and I would start this post by giving a big thanks to the Tokio team & ecosystem for the amazing work they provide to the community.

So this post is not to rant about Async Rust or to talk about an Async ecosystem fracture, but to share common mistakes to raise awareness and, in turn, help you avoid them.

Forgetting about task cancellation

Let’s start with the root of all evil in async: task cancellation!

async fn spawn_tasks() {
    let task_counter = Arc::new(AtomicUsize::new(0));

    for _ in 0..100 {
        let task_counter = task_counter.clone();
        tokio::spawn(my_task(task_counter));
    }

    while task_counter.load(Ordering::Relaxed) > 0 {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    }
}

async fn my_task(task_counter: Arc<AtomicUsize>) {
    task_counter.fetch_add(1, Ordering::Relaxed);

    let _ =  do_something().await;

    task_counter.fetch_sub(1, Ordering::Relaxed);
}

Can you spot it ? No ? And if we change, the code of the line that spawn our task to something like below ?

tokio::spawn(tokio::time::timeout(Duration::from_millis(10), my_task(task_counter)));

There are a lot of chances that spawn_tasks, will never return.

The issue lie down, that we often forget that async task in Rust have no guarantee to be run until completion. They can be cancelled, or stopped being polled, at any await point. So my_task is not correct, as the task_counter may never be decremented.

async fn my_task(task_counter: Arc<AtomicUsize>) {
    task_counter.fetch_add(1, Ordering::Relaxed);

    let _ =  do_something().await;

		// This instruction may never be run, because task has been cancelled.
		// or do_something panicked but it is not relevant for our explanation
    task_counter.fetch_sub(1, Ordering::Relaxed);
}

The only guarantee we have about future completion, is that at some point the future will be dropped. But beside that, we can’t guarantee anything about the future being able to make progress.

Solution: Hook your code when the task is dropped.

async fn my_task(task_counter: Arc<AtomicUsize>) {
    task_counter.fetch_add(1, Ordering::Relaxed);
    let _guard = scopeguard::guard((), |_| {
        task_counter.fetch_sub(1, Ordering::Relaxed);
    });

    let _ =  do_something().await;
}

By using scopeguard crate, we can register some block of code to be run when _guard will be dropped. If you like macro, you can even use defer! Go-like syntax

async fn my_task(task_counter: Arc<AtomicUsize>) {
    task_counter.fetch_add(1, Ordering::Relaxed);
    defer! {
        task_counter.fetch_sub(1, Ordering::Relaxed);
    }

    let _ =  do_something().await;
}

Select and task cancellation

This one is the little sister of the previous one but conducting an asteroid aimed at you. It is easy to miss task's cancellation issue during code review, because we are used to reading code from top to bottom and looking at the return points as the only exit flow in the function.

Task cancellation issues are often spotted after a code refactoring to use a select!.

async fn my_task(mut should_abort: tokio::sync::oneshot:Sender<()>) {
    loop {
        select! {
            biased;

            _ = should_abort.closed() => {
                println!("Aborting task");
                return;
            }
            _ = process.recv_msg() => {
                println!("Task completed");
            }
        }
    }
}

The code above is valid, but is doing an extra operation that is not necessary. When we are entering the select! macro, it evaluates all the branches, create a brand-new future for each branch and when it exits the select!, those futures are dropped !

It turns out that when we use a select in a loop, the natural syntax pushes us to create a loop of future creation and destruction, while it should not be necessary.

In this particular case, we don’t need to create a new future at each iteration to know if we should_abort.

Remember that not all futures are safe to cancel. For example, whatever you do, this future is not cancellable. Dropping it, means that your application will lose the buffered message.

async fn read_exact_3() -> Vec<()> {
  // When cancelled, all buffered msg will be lost
  let mut buffer = Vec::with_capacity(3);

  while buffer.len() < 3 {
     let msg = read_one().await;
     buffer.push_back(msg);
  }
  
  buffer
}

What we often want instead is re-using our future, across each iteration of the loop, to avoid re-creating a new one, which may require expensive initialization and/or drop code, at every iteration of the loop.

Solution: Use fuse and pin_mut to hoist your future above the loop

async fn my_task(mut should_abort: tokio::sync::oneshot:Sender<()>) {
	    
		let should_abort = should_abort.closed().fuse();
		pin_mut!(should_abort);

		loop {
        select! {
            biased;

            _ = should_abort => {
                println!("Aborting task");
                return;
            }

            _ = process.recv_msg() => {
                println!("Processing message");
            }
        }
    }
}

The code is now less straightforward, and requires a detour to introduce fuse and pin_mut! , but it does what we want, re-using our should_abort future across each iteration of the loop.

fuse is here to avoid polling the future after it completes/return a Poll::Ready(), doing it is undefined behavior. In this scenario it is not necessary, as we return from the function, but adding a fuse is more future-proof, as this kind of code is brittle and can often break during a refactoring or requirement change.

For pin_mut!, let say it is necessary without entering too much into the details.

So now when you see a loop and a select! ask yourself those 2 questions:

  • Is my future safe to cancel ?
  • Can’t I hoist my future outside the loop to avoid creating and destroying it each time

Not using sync Mutex

This one is fairly common when you start using Async in Rust. You tell yourself, ok, I am in the async/await world now, and mutex is blocking, so I must use Async Mutex.

async fn my_task() {
    let workers = Arc::new(tokio::sync::Mutex::new(HashMap::new()));

    for i in 1..10 {
       let workers = workers.clone();
        tokio::spawn(async move {
            let mut workers = workers.lock().await;
            workers.insert(i, i);
        });
    }
}

Well it turns out it is not necessary, and less performant than using a regular sync mutex, as it requires back-and-forth with the task executor. The only scenario when you should not use a sync Mutex, is if you are holding the guard across await point, but we will see that later.

Solution: Use regular sync Mutex

If the mutex guard is not held across await, the above code can be rewritten with regular mutex

async fn my_task() {
    let workers = Arc::new(std::sync::Mutex::new(HashMap::new()));

    for i in 1..10 {
       let workers = workers.clone();
        tokio::spawn(async move {
            let mut workers = workers.lock().unwrap();
            workers.insert(i, i);
        });
    }
}

The code is valid, acquiring your lock does not need to await anymore, and is more performant than the async version.

Sometime also, you have .await in your lock critical section, and you think you need to use Async Mutex because of it. It is often possible to use a sync version to avoid this .await and still let you use the Sync Mutex. As example, if you want to push msg in an unbounded channel you use try_send which is never going to return ErrorFull, and it allows you to not have an .await. A lot of async primitives have equivalent to be use in Sync code, so look for them.

Holding RAII/guard object across await point

A common pattern in Rust, is to return some object that is a thin wrapper adding extra behavior on Drop.

You can see this pattern in action with MutexGuard, and holding a lock across an await point is a pitfall that will lead you with assurance to a deadlock. This issue is so common, that Clippy has a special case to warn you for std::sync::Mutex. This pattern is pernicious, because those thin wrapper implement Deref, which allows you to use the inner value without having to worry and even knows about the wrapper.

While this is great to have error for MutexGuard in Clippy, the general issue remains valid for all others object using this pattern.

As an example, let’s take a look at a connection pool. The point of a connection pool is to mutualize and restrict the number of open connections to a remote entity.

async fn my_task() {
	let mut redis_cnx = redis_cnx_pool.get().await.unwrap();
	let rules_ret = redis::fetch(&mut redis_cnx).await;

  for rule in rules_ret {
		process_rule(rule).await;
  }

	redis::put(&mut redis_cnx, "done");
}

The code works fine, but it starves the connection pool without it being obvious at first glance.

redis_cnx is a RAII object, which add the behavior of returning the connection to the pool when dropped. As we hold this object across await point, we block the connection from returning to the pool, even if we don’t need it anymore, thus starving and creating contention on the connection pool.

You don’t even need the last line redis::fetch(&mut redis_cnx, "done") to create such a scenario. Even without it, the compiler guarantee that redis_cnx is going to live as long as its scope is reachable.

During a code change, it is easy to miss that redis_cnx is a RAII object and, without knowing it, extends its lifetime.

Again, this is no guarantee regarding futures making steady progression, so you may starve the pool for a potential unbounded amount of time.

Solution: Drop early

Drop early the RAII object, by either introducing a scope, or manually dropping it.

async fn my_task() {
	let rules_ret = {
    // New scope to avoid holding the connection for too long
    redis_cnx_pool.get().await.unwrap();
		redis::fetch(&mut redis_cnx).await;
  };

  for rule in rules_ret {
		process_rule(rule).await;
  }

  let mut redis_cnx = redis_cnx_pool.get().await.unwrap();
	redis::put(&mut redis_cnx, "done");
  drop(redis_cnx);
}

P.s: You can find this pattern in the most used concurrent hash-map DashMap. Holding a Ref across await points, will lead you to a deadlock, well at some point…

Future progress starvation

This one has many variants and is hard to troubleshoot, but it all comes down to starving a future by blocking it from being polled/executed.

async fn my_task(tcp_listener: tokio::net::TcpListener) {

  loop {
    let incoming_cnx = tcp_listener.accept().await;
     
    process_client(incoming_cnx).await;
   }
}

In this scenario, the tcp_listener.accept() requires to be polled often in order to keep accepting new clients. Under load, this code will break, the OS will reject new connections as we don’t accept new clients quickly enough. All because process_client is starving the execution out of tcp_listener.

This example seems obvious, but it is easy to miss. Either because we are stashing a future away while executing another one, or because due to the workload, the future is taking more time than usual.

async fn my_task() {
    let heartbeat = HeartbeatTicker::new();

    loop {
        select! {
           biased;

            _ = heartbeat.beat() => {
                send_heartbeat().await;
            }
            msg = work.recv() => {
                process_work(msg); // making it async don't change the issue
            }
        }
    }
}

In most cases, this code will run fine. But sometimes, you see in your logs that the application fails to send heartbeat and is killed by your watchdog.

After investigation with tokio console, you discover that process_work execution time depends on the msg, and in some rare case, a specific message is going to trigger a bad behavior makingprocess_work run for several minutes. Thus, the app fails to send heartbeat.

Some common async function are also known to create starvation, I am looking at you buffered.

But even with sync code, it is possible to create starvation by blocking the async runtime inadvertently.

fn process_work(msg: Msg) {
  // compress a very big msg
  // Msg contains an image and process it (i.e: crop/resize)
  // iterate over a map of million of elements
  // sync sleep in the function
}

All those example can work fine on day-to-day, but under load, or when meeting an outlier will potentially break your assumptions.

Solutions:

There is no one-fit-all solution in this case, but it is often fixed by either:

  • Allowing independent progression of tasks, by spawning new task
  • Chunking the work to avoid keeping the CPU for too long

But it often requires putting in place extra communication (i.e: channels) to communicate between the two tasks, thus more code/boilerplate.

async fn my_task(tcp_listener: tokio::net::TcpListener) {

  loop {
    let incoming_cnx = tcp_listener.accept().await;

    tokio::spawn(async {     
	    process_client(incoming_cnx).await;
		});
   }
}
async fn my_task() {
    let heartbeat = HeartbeatTicket::new();

    loop {
        select! {
           biased;

            _ = heartbeat.beat() => {
                send_heartbeat().await;
            }
            msg = work.recv() => {
                tokio::spawn_blocking(|| { process_work(msg) });
            }
        }
    }
}

Conclusion

All the mentioned issues are pernicious, because all the faulty code can pass tests and run fine until it is not. The compiler, nor Clippy, is going to help you discover them, and it will often be too late, when it runs in production, that you will stumble upon them.

Bonus mistake: Using Async when it is not needed

Rust Async is amazing for the problems it solves, but not every library or applications need to deal with massive concurrency. It is fairly easy to achieve a high degree of concurrency without an async runtime, but by simply relying on traditional primitives, threads (or process). You can launch thousands of threads on modern hardware without it causing any issue (see C10K problem), you will just use more memory.

Most of the time we default on using Async because one of our lib requires it (i.e: http server), so we back-propagate the requirement up to the whole architecture of our code. But this is a mistake because we lose a lot of nice properties by doing so

  • debuggability: Rust Async is hard to debug, because there is no usable stack anymore. Only task with their current state, but we lose the context/lineage of its execution. Tokio console is doing a lot to help on that point, but it does not come close to the effectiveness of a meaningful stacktrace/core-dump. This point alone should convince anybody to not use Async if not needed.
  • Team ownership: Using Async (not particular to Rust) requires a change of mindset when programming. As we have seen, it is a new mental model to get and open to a new class of bugs. On-boarding new people used to imperative programming, takes more time and make them less effective if they don’t have that kind of experience (which is common from experience)
  • More complexe code: You will pretty quickly stumble on all the language difficulties of Async Rust, and end-up adding more machinery code than logic. Why starting a task requires Send + Sync + ‘static ? What are those anyway ? Hum Pin<&mut> ? Do I really need this unsafe { } ? Ha yes stack pinning macro is doing it, so it must be normal. How can I add async to my trait ? Do I really need this extra crate? Do I need to implement my own Future ? Stream, Fuse what are those ?????

Solution: Don’t use Async or treat it as a library, not as the core part of your code architecture. For example, our deployer engine core architecture is sync, but still sprinkle async in some parts when it is needed (grpc communications, network calls, …)

// Start your async runtime externally from your main code
static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
    Builder::new_multi_thread()
        .thread_name("tokio")
        .max_blocking_threads(MAX_BLOCKING_THREADS)
        .enable_all()
        .build()
        .unwrap()
});

// You are in an async function because of your lib (i.e: http server)
// escape with spawn_blocking
async fn my_callback() {
   TOKIO_RUNTIME.spawn_blocking(my_sync_heaven_func).await
}

pub fn main() -> anyhow::Result<()> {

  // It is blocking the main thread until the async task terminate
  // So you can still use async librairy in your sync code.
  TOKIO_RUNTIME.block_on(my_async_lib_task)

 // You require some parallelism ? Use rayon crate or spawn some threads
 // With thread::scope, you can even borrow local variable, you don't need 'static !
 // https://doc.rust-lang.org/std/thread/fn.scope.html
 let mut a = vec![1, 2, 3];
 let mut x = 0;

 thread::scope(|s| {
    s.spawn(|| {
        println!("hello from the first scoped thread");
        // We can borrow `a` here.
        dbg!(&a);
    });
    s.spawn(|| {
        println!("hello from the second scoped thread");
        // We can even mutably borrow `x` here,
        // because no other threads are using it.
        x += a[0] + a[2];
    });
    println!("hello from the main thread");
 });

// After the scope, we can modify and access our variables again:
 a.push(4);
 assert_eq!(x, a.len());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment