Skip to content

Instantly share code, notes, and snippets.

@benkay86
Created March 27, 2025 18:28
Show Gist options
  • Save benkay86/8391e9d43664fccf0329c235e0db626e to your computer and use it in GitHub Desktop.
Save benkay86/8391e9d43664fccf0329c235e0db626e to your computer and use it in GitHub Desktop.
Fine-Grained Control of Concurrent IO using Python's asyncio.Queue
# Python's asynchronous input-output framework speeds up IO-bound operations
# by allowing your python program to make progress on multiple IO-bound tasks
# concurrently. In this example, we load many neuroimaging data files and
# then perform a compute-intensive operation on each of them. Parallelizing the
# compute-intensive operation is beyond the scope of this tutorial. However,
# we can still get a big speed up by reading multiple files concurrently. While
# the program waits for the operating system to make progress on reading one
# file, it can work on computational transformation of another file.
#
# Loading a file asynchronously is straightforward. Here, we define a helper
# method to load a neuroimaging data file for us.
import aiofiles
import aiopath
import nibabel
import numpy
import os
async def load_nifti(filename: os.PathLike) -> numpy.typing.ArrayLike:
"""Load a single nifti file into a numpy array.
Args:
filename: A path-like object
Returns:
A numpy array with extra nifti dimensions removed
"""
# Debugging statements so we can follow the order of execution.
print(f"Starting to load {filename}")
# Asynchronously open the file for reading in binary mode.
async with aiofiles.open(aiopath.AsyncPath(filename), mode='rb') as f:
# Nibabel doesn't provide asynchronous methods for parsing the file.
# Instead, we will asynchronously read the entire file into memory.
bytes = await f.read()
# Then parse the file in-memory using nibabel.
img = nibabel.Nifti2Image.from_bytes(bytes)
# Debugging statement.
print(f"Loaded {filename}")
# Return a numpy array with the extra nifti dimensions squeezed out.
return numpy.squeeze(numpy.asanyarray(img.dataobj))
# Let's also define a method to pretend to perform some computation on the file.
# If the method isn't IO-bound then there's no need for it to be asynchronous.
import time
def process_nifti(mat: numpy.typing.ArrayLike) -> None:
"""Pretend to do some computation on matrix data from a nifti file."""
time.sleep(0.1)
# Now let's get a list of files. To test this out, put some nifti files in your
# working directory.
import glob
nifti_files = glob.glob('*.dtseries.nii')
# We can read in the nifti files asynchronously. First, we define a helper
# method to do the work. Then we invoke the helper method in an asynchronous
# reactor using asyncio.run().
import asyncio
# Define the helper method.
async def example1(nifti_files: list[os.PathLike]):
# Iterate over the files.
for file in nifti_files:
# Load in this file.
data = await load_nifti(file)
# Process the file.
process_nifti(data)
# Run the helper method.
print('Example 1: Asynchronous but sequential IO.')
asyncio.run(example1(nifti_files))
# Look at the printed output -- what happened? Probably something like:
#
# Starting to load file1.nii.
# Loaded file1.nii
# Starting to load file2.nii
# Loaded file2.nii
# Starting to load file3.nii
# Loaded file3.nii
# Starting to load file4.nii
# Loaded file4.nii
#
# The files were technically loaded asynchronously, but they were loaded
# sequentially, one at a time. That defeats the point of asynchronous
# programming! We want to save time by loading the files concurrently.
#
# The official python documentation recommends using asyncio.gather()
# https://docs.python.org/3/library/asyncio-task.html#running-tasks-concurrently
# Let's try that and see how it works.
# Define an asynchronous helper method to load and process a nifti file.
async def load_and_process(nifti_file: os.PathLike):
data = await load_nifti(nifti_file)
process_nifti(data)
# And now the asynchronous helper method for the example.
async def example2(nifti_files: list[os.PathLike]):
# Make a list of futures or, colloquially, tasks. Note that we do not await
# the futures in the list... yet.
futs = [load_and_process(file) for file in nifti_files]
# Use asyncio.gather() to await all the futures concurrently.
await asyncio.gather(*futs)
# Run it in an async reactor.
print('Example 2: Asynchronous, massively-concurrent IO.')
asyncio.run(example2(nifti_files))
# Let's dissect the output. You should see someting like:
#
# Starting to load file1.nii
# Starting to load file2.nii
# Starting to load file3.nii
# Starting to load file4.nii
# Loaded file1.nii
# Loaded file2.nii
# Loaded file3.nii
# Loaded file4.nii
#
# asyncio.gather() started to run *all* of the futures concurrently. That's not
# a big problem with 4 files, but it might be a big problem with 4,000 files.
# Your cluster administrator might not appreciate you DDoS'ing the file server.
# Your program might run out of memory and crash trying to process so many files
# at once. You need some control, some way to throttle to just 2 or 3 files
# concurrently.
#
# One way is to combine asyncio.gather() with a semaphore.
# Use a semaphore to limit concurrency.
async def load_and_process_with_limit(nifti_file: os.PathLike, limit: asyncio.Semaphore):
# Block and allow other concurrent tasks to make progress until we can
# acquire the semaphore.
async with limit:
data = await load_nifti(nifti_file)
process_nifti(data)
# And now the asynchronous helper method for the example.
async def example3(nifti_files: list[os.PathLike]):
# Declare a semaphore that can be acquired by two concurrent tasks at once.
limit = asyncio.Semaphore(2)
# Make a list of futures.
futs = [load_and_process_with_limit(file, limit) for file in nifti_files]
# Use asyncio.gather() to await all the futures concurrently.
await asyncio.gather(*futs)
# Run it in an async reactor.
print('Example 3: Asynchronous, concurrent IO with semaphore limit.')
asyncio.run(example3(nifti_files))
# The output should now look something like:
#
# Starting to load file1.nii
# Starting to load file2.nii
# Loaded file1.nii
# Starting to load file3.nii
# Loaded file2.nii
# Starting to load file4.nii
# Loaded file3.nii
# Loaded file4.nii
#
# asyncio.gather() started to run all of the futures concurrently, but each
# task waited until it could acquire the semaphore before starting to load the
# file. This effectively limited our program to 2 concurrent file operations at
# any given time.
#
# This solution will be Good Enough™ for many applications. Beware, however,
# that using semaphores to limit nested concurrency (e.g. processing many
# directories each containing many nifti files) makes it all too easy to write
# a deadlock. We can use asyncio.Queue to give us even more control.
# NOTE
# This example requires python 3.13 or later due to
# https://github.com/python/cpython/pull/104228
# Here is an example using queues. Although it is much longer, most of it is
# boilerplate.
async def example4(nifti_files: list[os.PathLike]):
# Define our limit... no semaphores involved here.
limit = 2
# Instantiate a queue that can hold up to our limit of tasks.
queue = asyncio.Queue(limit)
# Define a worker task to process items in the queue.
async def worker(nifti_file: os.PathLike):
# Process items from the queue until the queue is shut down.
while True:
try:
# Get the next item to process from the queue.
# Blocks until an item is available or the queue is shut down.
this_subject_id = await queue.get()
except asyncio.QueueShutDown:
# The queue has been shut down. Break out of the loop.
break
try:
data = await load_nifti(file)
process_nifti(data)
except:
# Something bad happened. Shut down the queue so that calls to
# queue.put() will not deadlock.
queue.shutdown(immediate=True)
# Propagate the exception.
raise
finally:
# Indicate that this task is done, even if there was an
# exception. Otherwise queue.join() may deadlock waiting for
# tasks to finish.
queue.task_done()
# Spin up a pool of concurrently-running worker tasks.
# Each task runs in the background.
tasks = set()
for _ in range(limit):
tasks.add(asyncio.create_task(worker(queue)))
# Iterate over nifti files and put each one in the queue for processing.
for file in nifti_files:
try:
# This await will block until there is room in the queue,
# effectively limiting the number of concurrently running tasks to
# the capacity of the queue.
#
# The await will also return immediately if the queue is already
# shut down, or if the queue is shut down while we are awaiting the
# call to put().
await queue.put(file)
except asyncio.QueueShutDown:
# The queue shut down prematurely.
# This was most likely due to an exception in a worker task.
# Try to propagate the root cause of the exception.
for task in tasks:
# The task that is done early is the task with the exception.
if task.done():
# Calling result() raises the exception.
_ = task.result()
# Wait for tasks in the queue to finish.
await queue.join()
# Shut down the queue, triggering the worker tasks to break out of their
# infinite loops and finish.
queue.shutdown()
# Clean up the worker tasks.
for task in tasks:
# Wait for each worker task to finish.
await task
# Check for and propagate exceptions in the tasks's result.
_ = task.result()
print('Example 4: Asynchronous, concurrent IO with queue.')
asyncio.run(example4(nifti_files))
# We should get the same output as with the semaphore example, processing at
# most 2 files at a time.
#
# Here's one final example layering on a little more sophistication. We add a
# progress bar using the excellent tqdm library. Instead of just pretending to
# process the nifti files, we'll concatenate them into one big matrix to show
# how to get results out of the worker tasks.
import tqdm
import tqdm.asyncio
async def load_nifti_quiet(filename: os.PathLike) -> numpy.typing.ArrayLike:
"""Load a single nifti file into a numpy array, without debugging output."""
async with aiofiles.open(aiopath.AsyncPath(filename), mode='rb') as f:
bytes = await f.read()
img = nibabel.Nifti2Image.from_bytes(bytes)
return numpy.squeeze(numpy.asanyarray(img.dataobj))
async def example5(nifti_files: list[os.PathLike]) -> numpy.typing.ArrayLike:
# Initialize an empty data matrix to build up.
mat = numpy.empty(0)
# Define our limit... no semaphores involved here.
limit = 2
# Instantiate a queue that can hold up to our limit of tasks.
queue = asyncio.Queue(limit)
# Define a worker task to process items in the queue.
async def worker(nifti_file: os.PathLike):
# Bind variables we are going to mutate from enclosing scope.
nonlocal mat
# Process items from the queue until the queue is shut down.
while True:
try:
# Get the next item to process from the queue.
# Blocks until an item is available or the queue is shut down.
this_subject_id = await queue.get()
except asyncio.QueueShutDown:
# The queue has been shut down. Break out of the loop.
break
try:
# Load the nifti file.
data = await load_nifti_quiet(file)
# Make it a 2d matrix.
if len(data.shape) < 2:
data = data[:,numpy.newaxis]
# Append to existing data
mat = numpy.concat([mat, data], axis=0) if mat.size else data
# Slow things down a little so we can enjoy the progress bar.
await asyncio.sleep(1)
except:
# Something bad happened. Shut down the queue so that calls to
# queue.put() will not deadlock.
queue.shutdown(immediate=True)
# Propagate the exception.
raise
finally:
# Indicate that this task is done, even if there was an
# exception. Otherwise queue.join() may deadlock waiting for
# tasks to finish.
queue.task_done()
# Spin up a pool of concurrently-running worker tasks.
# Each task runs in the background.
tasks = set()
for _ in range(limit):
tasks.add(asyncio.create_task(worker(queue)))
# Prepare a progress bar,
with tqdm.asyncio.tqdm(
total=len(nifti_files),
desc="Processing imaging data",
unit="files",
) as pbar:
# Iterate over nifti files and put each one in the queue for processing.
for file in nifti_files:
try:
# This await will block until there is room in the queue,
# effectively limiting the number of concurrently running tasks
# to the capacity of the queue.
#
# The await will also return immediately if the queue is already
# shut down, or if the queue is shut down while we are awaiting
# the call to put().
await queue.put(file)
except asyncio.QueueShutDown:
# The queue shut down prematurely.
# This was most likely due to an exception in a worker task.
# Try to propagate the root cause of the exception.
for task in tasks:
# The task that is done early is the task with the
# exception.
if task.done():
# Calling result() raises the exception.
_ = task.result()
# Update the progress bar by one unit.
pbar.update(1)
# Wait for tasks in the queue to finish.
await queue.join()
# Shut down the queue, triggering the worker tasks to break out of their
# infinite loops and finish.
queue.shutdown()
# Clean up the worker tasks.
for task in tasks:
# Wait for each worker task to finish.
await task
# Check for and propagate exceptions in the tasks's result.
_ = task.result()
# All done. Return the data matrix.
return mat
print('Example 5: Asynchronous, concurrent IO with queue and progress bar.')
mat = asyncio.run(example5(nifti_files))
print(f"Got data matrix of shape {mat.shape}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment