Last active
August 10, 2017 20:31
-
-
Save mynameisfiber/28f1f492d913641fcddbecfbcd367a7a to your computer and use it in GitHub Desktop.
Numpy optimized multiprocessing queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Numpy-optimized multiprocessing queue object. | |
$ python numpyqueue.py | |
Array shape: 2 | |
mp.queue: 0.4327036259928718 | |
numpyqueue: 0.53742205198796 | |
numpyqueue2: 0.5157967879931675 | |
Array shape: 128 | |
mp.queue: 1.7091998109972337 | |
numpyqueue: 0.9855174750118749 | |
numpyqueue2: 0.9052893449988915 | |
Array shape: 256 | |
mp.queue: 11.928916561999358 | |
numpyqueue: 1.832176352996612 | |
numpyqueue2: 1.8839863180037355 | |
""" | |
import sharedmem | |
import numpy as np | |
import multiprocessing | |
from multiprocessing.queues import Empty | |
from contextlib import contextmanager | |
class NumpyQueue(object): | |
def __init__(self, shape, maxsize=128, sentinal=None, dtype='float'): | |
self.queue = multiprocessing.Queue(maxsize=maxsize-1) | |
self.locks = [multiprocessing.Lock() for _ in range(maxsize)] | |
self.data = sharedmem.empty([maxsize, *shape], dtype=dtype) | |
self.idx = multiprocessing.Value('i', 0) | |
self.SENTINAL = sentinal | |
self.maxsize = maxsize | |
self.shape = shape | |
def __len__(self): | |
return self.maxsize | |
def put(self, item, metadata=None): | |
""" | |
Add a numpy array onto the queue. item must have the same shape as | |
NumpyQueue.shape. Optionally, you can provide metadata to associate | |
with this item. | |
""" | |
if item is self.SENTINAL: | |
self.queue.put((self.SENTINAL, metadata)) | |
else: | |
with self.idx: | |
idx = self.idx.value | |
with self.locks[idx]: | |
self.data[idx] = item | |
self.queue.put((idx, metadata)) | |
self.idx.value = (self.idx.value + 1) % self.maxsize | |
def __iter__(self): | |
""" | |
Iterate through the items in the queue. This will return tuples | |
containing the numpy arrays and their associated metadata | |
""" | |
while True: | |
try: | |
with self.get(metadata=True) as (item, meta): | |
yield item, meta | |
except Empty: | |
raise StopIteration | |
@contextmanager | |
def get(self, metadata=False): | |
""" | |
Get the next queued numpy array. Optionally also return the metadata | |
associated with the item. | |
""" | |
idx, meta = self.queue.get() | |
if idx is self.SENTINAL: | |
raise Empty | |
else: | |
with self.locks[idx]: | |
if metadata: | |
yield self.data[idx], meta | |
else: | |
yield self.data[idx] | |
class TestProcessNumpyQueue(multiprocessing.Process): | |
def __init__(self, slm): | |
super().__init__() | |
self.slm = slm | |
def run(self): | |
for c in range(10000): | |
a = np.zeros(shape=self.slm.shape) + c | |
self.slm.put(a, metadata=c) | |
self.slm.put(self.slm.SENTINAL) | |
class TestProcessQueue(multiprocessing.Process): | |
def __init__(self, q, shape): | |
super().__init__() | |
self.shape = shape | |
self.q = q | |
def run(self): | |
for c in range(10000): | |
a = np.zeros(shape=self.shape) + c | |
self.q.put((a, c)) | |
self.q.put((None, None)) | |
def test_queue(size): | |
q = multiprocessing.Queue(maxsize=1024) | |
TestProcessQueue(q, (size, size)).start() | |
a = 0 | |
while True: | |
sample, meta = q.get() | |
if sample is None: | |
break | |
a += sample[0, 0] | |
assert sample[0, 0] == meta | |
assert a == 49995000 | |
def test_numpy_queue(size): | |
slm = NumpyQueue((size, size), maxsize=1024) | |
TestProcessNumpyQueue(slm).start() | |
a = 0 | |
for sample, meta in slm: | |
a += sample[0, 0] | |
assert sample[0, 0] == meta | |
assert a == 49995000 | |
def test_numpy_queue2(size): | |
""" | |
Alternate numpyqueue API | |
""" | |
slm = NumpyQueue((size, size), maxsize=1024) | |
TestProcessNumpyQueue(slm).start() | |
a = 0 | |
while True: | |
try: | |
with slm.get(metadata=True) as (sample, meta): | |
a += sample[0, 0] | |
assert sample[0, 0] == meta | |
except Empty: | |
break | |
assert a == 49995000 | |
def run_experiment(fxn, size, N): | |
import timeit | |
stmt = "{}({})".format(fxn.__name__, size) | |
return timeit.timeit(stmt=stmt, globals=globals(), number=N) | |
if __name__ == "__main__": | |
N = 1 | |
for size in (2, 128, 256): | |
print("Array shape:", size) | |
print("mp.queue:", run_experiment(test_queue, size, N)) | |
print("numpyqueue:", run_experiment(test_numpy_queue, size, N)) | |
print("numpyqueue2:", run_experiment(test_numpy_queue2, size, N)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment