|
from collections import deque |
|
from concurrent.futures import ProcessPoolExecutor as _ProcessPoolExecutor |
|
from multiprocessing.shared_memory import SharedMemory |
|
|
|
import numpy as np |
|
|
|
|
|
class ProcessPoolExecutor(_ProcessPoolExecutor): |
|
"""Subclass with a lazy consuming imap method.""" |
|
|
|
def imap(self, fn, *iterables, timeout=None, queued_tasks_per_worker=1): |
|
"""Ordered imap that lazily consumes iterables ref https://gist.github.com/ddelange/c98b05437f80e4b16bf4fc20fde9c999.""" |
|
futures, maxlen = deque(), self._max_workers * (queued_tasks_per_worker + 1) |
|
popleft, append, submit = futures.popleft, futures.append, self.submit |
|
|
|
def get(): |
|
"""Block until the next task is done and return the result.""" |
|
return popleft().result(timeout) |
|
|
|
for args in zip(*iterables, strict=True): |
|
append(submit(fn, *args)) |
|
if len(futures) == maxlen: |
|
yield get() |
|
|
|
while futures: |
|
yield get() |
|
|
|
|
|
class SharedNumpyArray: |
|
"""Wrap a numpy array so that it can be shared quickly among processes, avoiding unnecessary copying and (de)serializing ref https://github.com/e-dorigatti/e-dorigatti.github.io/blob/e171ec093a64dbc864eb0e5d261a1916157066ef/_posts/2020-06-19-multiprocessing-large-objects.md?plain=1#L170. |
|
|
|
Usage: |
|
>>> iterable = (SharedNumpyArray(arr) for arr in arrs) |
|
>>> def fn(arr_shared: SharedNumpyArr) -> SharedNumpyArray: |
|
... with arr_shared as arr: # np.ndarray |
|
... return SharedNumpyArray(arr.transpose()) # do something |
|
>>> for arr_shared in executor.imap(fn, iterable): |
|
... with arr_shared as arr: # np.ndarray |
|
... print(arr) |
|
""" |
|
|
|
def __init__(self, array: np.ndarray): |
|
# create the shared memory location of the same size of the array |
|
self._shared = SharedMemory(create=True, size=array.nbytes) |
|
|
|
# save data type and shape, necessary to read the data correctly |
|
self._dtype, self._shape = array.dtype, array.shape |
|
|
|
# create a new numpy array that uses the shared memory we created. |
|
# at first, it is filled with zeros |
|
res: np.ndarray = np.ndarray( |
|
self._shape, |
|
dtype=self._dtype, |
|
buffer=self._shared.buf, |
|
) |
|
|
|
# copy data from the array to the shared memory. numpy will |
|
# take care of copying everything in the correct format |
|
res[:] = array[:] |
|
|
|
def read(self) -> np.ndarray: |
|
"""Read the array from the shared memory without unnecessary copying.""" |
|
# simply create an array of the correct shape and type, |
|
# using the shared memory location we created earlier |
|
return np.ndarray(self._shape, self._dtype, buffer=self._shared.buf) |
|
|
|
def copy(self): |
|
"""Return a new copy of the array stored in shared memory.""" |
|
return np.copy(self.read_array()) |
|
|
|
def unlink(self): |
|
"""Releases the allocated memory. Call when finished using the data, or when the data was copied somewhere else.""" |
|
self._shared.close() |
|
self._shared.unlink() |
|
|
|
def __enter__(self) -> np.ndarray: |
|
"""Support with-statement.""" |
|
return self.read() |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
"""Support with-statement.""" |
|
self.unlink() |