Created
March 27, 2025 18:28
-
-
Save benkay86/8391e9d43664fccf0329c235e0db626e to your computer and use it in GitHub Desktop.
Fine-Grained Control of Concurrent IO using Python's asyncio.Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python's asynchronous input-output framework speeds up IO-bound operations | |
# by allowing your python program to make progress on multiple IO-bound tasks | |
# concurrently. In this example, we load many neuroimaging data files and | |
# then perform a compute-intensive operation on each of them. Parallelizing the | |
# compute-intensive operation is beyond the scope of this tutorial. However, | |
# we can still get a big speed up by reading multiple files concurrently. While | |
# the program waits for the operating system to make progress on reading one | |
# file, it can work on computational transformation of another file. | |
# | |
# Loading a file asynchronously is straightforward. Here, we define a helper | |
# method to load a neuroimaging data file for us. | |
import aiofiles | |
import aiopath | |
import nibabel | |
import numpy | |
import os | |
async def load_nifti(filename: os.PathLike) -> numpy.typing.ArrayLike: | |
"""Load a single nifti file into a numpy array. | |
Args: | |
filename: A path-like object | |
Returns: | |
A numpy array with extra nifti dimensions removed | |
""" | |
# Debugging statements so we can follow the order of execution. | |
print(f"Starting to load {filename}") | |
# Asynchronously open the file for reading in binary mode. | |
async with aiofiles.open(aiopath.AsyncPath(filename), mode='rb') as f: | |
# Nibabel doesn't provide asynchronous methods for parsing the file. | |
# Instead, we will asynchronously read the entire file into memory. | |
bytes = await f.read() | |
# Then parse the file in-memory using nibabel. | |
img = nibabel.Nifti2Image.from_bytes(bytes) | |
# Debugging statement. | |
print(f"Loaded {filename}") | |
# Return a numpy array with the extra nifti dimensions squeezed out. | |
return numpy.squeeze(numpy.asanyarray(img.dataobj)) | |
# Let's also define a method to pretend to perform some computation on the file. | |
# If the method isn't IO-bound then there's no need for it to be asynchronous. | |
import time | |
def process_nifti(mat: numpy.typing.ArrayLike) -> None: | |
"""Pretend to do some computation on matrix data from a nifti file.""" | |
time.sleep(0.1) | |
# Now let's get a list of files. To test this out, put some nifti files in your | |
# working directory. | |
import glob | |
nifti_files = glob.glob('*.dtseries.nii') | |
# We can read in the nifti files asynchronously. First, we define a helper | |
# method to do the work. Then we invoke the helper method in an asynchronous | |
# reactor using asyncio.run(). | |
import asyncio | |
# Define the helper method. | |
async def example1(nifti_files: list[os.PathLike]): | |
# Iterate over the files. | |
for file in nifti_files: | |
# Load in this file. | |
data = await load_nifti(file) | |
# Process the file. | |
process_nifti(data) | |
# Run the helper method. | |
print('Example 1: Asynchronous but sequential IO.') | |
asyncio.run(example1(nifti_files)) | |
# Look at the printed output -- what happened? Probably something like: | |
# | |
# Starting to load file1.nii. | |
# Loaded file1.nii | |
# Starting to load file2.nii | |
# Loaded file2.nii | |
# Starting to load file3.nii | |
# Loaded file3.nii | |
# Starting to load file4.nii | |
# Loaded file4.nii | |
# | |
# The files were technically loaded asynchronously, but they were loaded | |
# sequentially, one at a time. That defeats the point of asynchronous | |
# programming! We want to save time by loading the files concurrently. | |
# | |
# The official python documentation recommends using asyncio.gather() | |
# https://docs.python.org/3/library/asyncio-task.html#running-tasks-concurrently | |
# Let's try that and see how it works. | |
# Define an asynchronous helper method to load and process a nifti file. | |
async def load_and_process(nifti_file: os.PathLike): | |
data = await load_nifti(nifti_file) | |
process_nifti(data) | |
# And now the asynchronous helper method for the example. | |
async def example2(nifti_files: list[os.PathLike]): | |
# Make a list of futures or, colloquially, tasks. Note that we do not await | |
# the futures in the list... yet. | |
futs = [load_and_process(file) for file in nifti_files] | |
# Use asyncio.gather() to await all the futures concurrently. | |
await asyncio.gather(*futs) | |
# Run it in an async reactor. | |
print('Example 2: Asynchronous, massively-concurrent IO.') | |
asyncio.run(example2(nifti_files)) | |
# Let's dissect the output. You should see someting like: | |
# | |
# Starting to load file1.nii | |
# Starting to load file2.nii | |
# Starting to load file3.nii | |
# Starting to load file4.nii | |
# Loaded file1.nii | |
# Loaded file2.nii | |
# Loaded file3.nii | |
# Loaded file4.nii | |
# | |
# asyncio.gather() started to run *all* of the futures concurrently. That's not | |
# a big problem with 4 files, but it might be a big problem with 4,000 files. | |
# Your cluster administrator might not appreciate you DDoS'ing the file server. | |
# Your program might run out of memory and crash trying to process so many files | |
# at once. You need some control, some way to throttle to just 2 or 3 files | |
# concurrently. | |
# | |
# One way is to combine asyncio.gather() with a semaphore. | |
# Use a semaphore to limit concurrency. | |
async def load_and_process_with_limit(nifti_file: os.PathLike, limit: asyncio.Semaphore): | |
# Block and allow other concurrent tasks to make progress until we can | |
# acquire the semaphore. | |
async with limit: | |
data = await load_nifti(nifti_file) | |
process_nifti(data) | |
# And now the asynchronous helper method for the example. | |
async def example3(nifti_files: list[os.PathLike]): | |
# Declare a semaphore that can be acquired by two concurrent tasks at once. | |
limit = asyncio.Semaphore(2) | |
# Make a list of futures. | |
futs = [load_and_process_with_limit(file, limit) for file in nifti_files] | |
# Use asyncio.gather() to await all the futures concurrently. | |
await asyncio.gather(*futs) | |
# Run it in an async reactor. | |
print('Example 3: Asynchronous, concurrent IO with semaphore limit.') | |
asyncio.run(example3(nifti_files)) | |
# The output should now look something like: | |
# | |
# Starting to load file1.nii | |
# Starting to load file2.nii | |
# Loaded file1.nii | |
# Starting to load file3.nii | |
# Loaded file2.nii | |
# Starting to load file4.nii | |
# Loaded file3.nii | |
# Loaded file4.nii | |
# | |
# asyncio.gather() started to run all of the futures concurrently, but each | |
# task waited until it could acquire the semaphore before starting to load the | |
# file. This effectively limited our program to 2 concurrent file operations at | |
# any given time. | |
# | |
# This solution will be Good Enough™ for many applications. Beware, however, | |
# that using semaphores to limit nested concurrency (e.g. processing many | |
# directories each containing many nifti files) makes it all too easy to write | |
# a deadlock. We can use asyncio.Queue to give us even more control. | |
# NOTE | |
# This example requires python 3.13 or later due to | |
# https://github.com/python/cpython/pull/104228 | |
# Here is an example using queues. Although it is much longer, most of it is | |
# boilerplate. | |
async def example4(nifti_files: list[os.PathLike]): | |
# Define our limit... no semaphores involved here. | |
limit = 2 | |
# Instantiate a queue that can hold up to our limit of tasks. | |
queue = asyncio.Queue(limit) | |
# Define a worker task to process items in the queue. | |
async def worker(nifti_file: os.PathLike): | |
# Process items from the queue until the queue is shut down. | |
while True: | |
try: | |
# Get the next item to process from the queue. | |
# Blocks until an item is available or the queue is shut down. | |
this_subject_id = await queue.get() | |
except asyncio.QueueShutDown: | |
# The queue has been shut down. Break out of the loop. | |
break | |
try: | |
data = await load_nifti(file) | |
process_nifti(data) | |
except: | |
# Something bad happened. Shut down the queue so that calls to | |
# queue.put() will not deadlock. | |
queue.shutdown(immediate=True) | |
# Propagate the exception. | |
raise | |
finally: | |
# Indicate that this task is done, even if there was an | |
# exception. Otherwise queue.join() may deadlock waiting for | |
# tasks to finish. | |
queue.task_done() | |
# Spin up a pool of concurrently-running worker tasks. | |
# Each task runs in the background. | |
tasks = set() | |
for _ in range(limit): | |
tasks.add(asyncio.create_task(worker(queue))) | |
# Iterate over nifti files and put each one in the queue for processing. | |
for file in nifti_files: | |
try: | |
# This await will block until there is room in the queue, | |
# effectively limiting the number of concurrently running tasks to | |
# the capacity of the queue. | |
# | |
# The await will also return immediately if the queue is already | |
# shut down, or if the queue is shut down while we are awaiting the | |
# call to put(). | |
await queue.put(file) | |
except asyncio.QueueShutDown: | |
# The queue shut down prematurely. | |
# This was most likely due to an exception in a worker task. | |
# Try to propagate the root cause of the exception. | |
for task in tasks: | |
# The task that is done early is the task with the exception. | |
if task.done(): | |
# Calling result() raises the exception. | |
_ = task.result() | |
# Wait for tasks in the queue to finish. | |
await queue.join() | |
# Shut down the queue, triggering the worker tasks to break out of their | |
# infinite loops and finish. | |
queue.shutdown() | |
# Clean up the worker tasks. | |
for task in tasks: | |
# Wait for each worker task to finish. | |
await task | |
# Check for and propagate exceptions in the tasks's result. | |
_ = task.result() | |
print('Example 4: Asynchronous, concurrent IO with queue.') | |
asyncio.run(example4(nifti_files)) | |
# We should get the same output as with the semaphore example, processing at | |
# most 2 files at a time. | |
# | |
# Here's one final example layering on a little more sophistication. We add a | |
# progress bar using the excellent tqdm library. Instead of just pretending to | |
# process the nifti files, we'll concatenate them into one big matrix to show | |
# how to get results out of the worker tasks. | |
import tqdm | |
import tqdm.asyncio | |
async def load_nifti_quiet(filename: os.PathLike) -> numpy.typing.ArrayLike: | |
"""Load a single nifti file into a numpy array, without debugging output.""" | |
async with aiofiles.open(aiopath.AsyncPath(filename), mode='rb') as f: | |
bytes = await f.read() | |
img = nibabel.Nifti2Image.from_bytes(bytes) | |
return numpy.squeeze(numpy.asanyarray(img.dataobj)) | |
async def example5(nifti_files: list[os.PathLike]) -> numpy.typing.ArrayLike: | |
# Initialize an empty data matrix to build up. | |
mat = numpy.empty(0) | |
# Define our limit... no semaphores involved here. | |
limit = 2 | |
# Instantiate a queue that can hold up to our limit of tasks. | |
queue = asyncio.Queue(limit) | |
# Define a worker task to process items in the queue. | |
async def worker(nifti_file: os.PathLike): | |
# Bind variables we are going to mutate from enclosing scope. | |
nonlocal mat | |
# Process items from the queue until the queue is shut down. | |
while True: | |
try: | |
# Get the next item to process from the queue. | |
# Blocks until an item is available or the queue is shut down. | |
this_subject_id = await queue.get() | |
except asyncio.QueueShutDown: | |
# The queue has been shut down. Break out of the loop. | |
break | |
try: | |
# Load the nifti file. | |
data = await load_nifti_quiet(file) | |
# Make it a 2d matrix. | |
if len(data.shape) < 2: | |
data = data[:,numpy.newaxis] | |
# Append to existing data | |
mat = numpy.concat([mat, data], axis=0) if mat.size else data | |
# Slow things down a little so we can enjoy the progress bar. | |
await asyncio.sleep(1) | |
except: | |
# Something bad happened. Shut down the queue so that calls to | |
# queue.put() will not deadlock. | |
queue.shutdown(immediate=True) | |
# Propagate the exception. | |
raise | |
finally: | |
# Indicate that this task is done, even if there was an | |
# exception. Otherwise queue.join() may deadlock waiting for | |
# tasks to finish. | |
queue.task_done() | |
# Spin up a pool of concurrently-running worker tasks. | |
# Each task runs in the background. | |
tasks = set() | |
for _ in range(limit): | |
tasks.add(asyncio.create_task(worker(queue))) | |
# Prepare a progress bar, | |
with tqdm.asyncio.tqdm( | |
total=len(nifti_files), | |
desc="Processing imaging data", | |
unit="files", | |
) as pbar: | |
# Iterate over nifti files and put each one in the queue for processing. | |
for file in nifti_files: | |
try: | |
# This await will block until there is room in the queue, | |
# effectively limiting the number of concurrently running tasks | |
# to the capacity of the queue. | |
# | |
# The await will also return immediately if the queue is already | |
# shut down, or if the queue is shut down while we are awaiting | |
# the call to put(). | |
await queue.put(file) | |
except asyncio.QueueShutDown: | |
# The queue shut down prematurely. | |
# This was most likely due to an exception in a worker task. | |
# Try to propagate the root cause of the exception. | |
for task in tasks: | |
# The task that is done early is the task with the | |
# exception. | |
if task.done(): | |
# Calling result() raises the exception. | |
_ = task.result() | |
# Update the progress bar by one unit. | |
pbar.update(1) | |
# Wait for tasks in the queue to finish. | |
await queue.join() | |
# Shut down the queue, triggering the worker tasks to break out of their | |
# infinite loops and finish. | |
queue.shutdown() | |
# Clean up the worker tasks. | |
for task in tasks: | |
# Wait for each worker task to finish. | |
await task | |
# Check for and propagate exceptions in the tasks's result. | |
_ = task.result() | |
# All done. Return the data matrix. | |
return mat | |
print('Example 5: Asynchronous, concurrent IO with queue and progress bar.') | |
mat = asyncio.run(example5(nifti_files)) | |
print(f"Got data matrix of shape {mat.shape}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment