Skip to content

Instantly share code, notes, and snippets.

@ddelange
Created March 25, 2024 11:38
Show Gist options
  • Save ddelange/8d7b50eafb0c90fd02ef3591d1bbd11e to your computer and use it in GitHub Desktop.
Save ddelange/8d7b50eafb0c90fd02ef3591d1bbd11e to your computer and use it in GitHub Desktop.
Multiprocessing numpy using zero-copy SharedNumpyArray and ProcessPoolExecutor.imap
from collections import deque
from concurrent.futures import ProcessPoolExecutor as _ProcessPoolExecutor
from multiprocessing.shared_memory import SharedMemory
import numpy as np
class ProcessPoolExecutor(_ProcessPoolExecutor):
"""Subclass with a lazy consuming imap method."""
def imap(self, fn, *iterables, timeout=None, queued_tasks_per_worker=1):
"""Ordered imap that lazily consumes iterables ref https://gist.github.com/ddelange/c98b05437f80e4b16bf4fc20fde9c999."""
futures, maxlen = deque(), self._max_workers * (queued_tasks_per_worker + 1)
popleft, append, submit = futures.popleft, futures.append, self.submit
def get():
"""Block until the next task is done and return the result."""
return popleft().result(timeout)
for args in zip(*iterables, strict=True):
append(submit(fn, *args))
if len(futures) == maxlen:
yield get()
while futures:
yield get()
class SharedNumpyArray:
"""Wrap a numpy array so that it can be shared quickly among processes, avoiding unnecessary copying and (de)serializing ref https://github.com/e-dorigatti/e-dorigatti.github.io/blob/e171ec093a64dbc864eb0e5d261a1916157066ef/_posts/2020-06-19-multiprocessing-large-objects.md?plain=1#L170.
Usage:
>>> iterable = (SharedNumpyArray(arr) for arr in arrs)
>>> def fn(arr_shared: SharedNumpyArr) -> SharedNumpyArray:
... with arr_shared as arr: # np.ndarray
... return SharedNumpyArray(arr.transpose()) # do something
>>> for arr_shared in executor.imap(fn, iterable):
... with arr_shared as arr: # np.ndarray
... print(arr)
"""
def __init__(self, array: np.ndarray):
# create the shared memory location of the same size of the array
self._shared = SharedMemory(create=True, size=array.nbytes)
# save data type and shape, necessary to read the data correctly
self._dtype, self._shape = array.dtype, array.shape
# create a new numpy array that uses the shared memory we created.
# at first, it is filled with zeros
res: np.ndarray = np.ndarray(
self._shape,
dtype=self._dtype,
buffer=self._shared.buf,
)
# copy data from the array to the shared memory. numpy will
# take care of copying everything in the correct format
res[:] = array[:]
def read(self) -> np.ndarray:
"""Read the array from the shared memory without unnecessary copying."""
# simply create an array of the correct shape and type,
# using the shared memory location we created earlier
return np.ndarray(self._shape, self._dtype, buffer=self._shared.buf)
def copy(self):
"""Return a new copy of the array stored in shared memory."""
return np.copy(self.read_array())
def unlink(self):
"""Releases the allocated memory. Call when finished using the data, or when the data was copied somewhere else."""
self._shared.close()
self._shared.unlink()
def __enter__(self) -> np.ndarray:
"""Support with-statement."""
return self.read()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Support with-statement."""
self.unlink()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment