Last active
February 12, 2021 17:34
-
-
Save dkirkby/6f5e07f6bc950b1c42739b54e8b3d046 to your computer and use it in GitHub Desktop.
Demo of multiprocessing with shared numpy arrays
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Test parallel processing with shared memory. | |
import time | |
import multiprocessing | |
import multiprocessing.shared_memory # needs python >= 3.8 | |
import numpy as np | |
class MultiDemo(object): | |
def __init__(self, names=['Bobby', 'Mo', 'Mane'], size=4): | |
self.names = names | |
self.shared = {} | |
self.arrays = {} | |
self.procs = {} | |
self.parents = {} | |
context = multiprocessing.get_context('spawn') | |
for i, name in enumerate(names): | |
# Allocate shared memory. The returned object must not go out of scope | |
# as long as the memory is being used somewhere. | |
self.shared[name] = multiprocessing.shared_memory.SharedMemory(name=name, size=size, create=True) | |
# Create numpy array using the shared memory. | |
self.arrays[name] = np.ndarray(size, dtype=np.int64, buffer=self.shared[name].buf) | |
self.arrays[name][:] = i | |
# Create pipe and process. | |
self.parents[name], child = context.Pipe() | |
self.procs[name] = context.Process(target=MultiDemo.worker, args=(name, size, child)) | |
# Start the process. | |
self.procs[name].start() | |
def run(self): | |
for action in ('ones', 'zeros', 'arange'): | |
time.sleep(0.5) | |
print(f'sending action="{action}" to all workers...') | |
for i, name in enumerate(self.names): | |
# Set our copy of the array. | |
self.arrays[name][0] = i | |
# Launch the worker | |
self.parents[name].send(action) | |
worker_array = self.parents[name].recv() | |
# Check that we agree on the array contents. | |
print(self.arrays[name], worker_array) | |
print('quitting...') | |
for name in self.names: | |
self.parents[name].send('quit') | |
@staticmethod | |
def dowork(array, action): | |
if action == 'zeros': | |
array[:] = np.zeros(array.size) | |
elif action == 'ones': | |
array[:] = np.ones(array.size) | |
elif action == 'arange': | |
array[:] = np.arange(array.size) | |
@staticmethod | |
def worker(name, size, pipe): | |
# Attach our array to the shared memory associated with this name. | |
shm = multiprocessing.shared_memory.SharedMemory(name=name) | |
array = np.ndarray(size, dtype=np.int64, buffer=shm.buf) | |
while True: | |
action = pipe.recv() | |
if action == 'quit': | |
shm.close() | |
pipe.send('bye') | |
pipe.close() | |
break | |
else: | |
MultiDemo.dowork(array, action) | |
pipe.send(array) | |
def shutdown(self): | |
for name in self.names: | |
# Shutdown process. | |
print(f'Shutting down {name}...') | |
self.procs[name].join() | |
# Release shared memory. | |
print(f'Releasing shared mem for {name}...') | |
shm = multiprocessing.shared_memory.SharedMemory(name=name) | |
shm.close() | |
shm.unlink() | |
if __name__ == '__main__': | |
demo = MultiDemo() | |
try: | |
demo.run() | |
finally: | |
demo.shutdown() | |
# Test parallel processing with shared memory. | |
import time | |
import multiprocessing | |
import multiprocessing.shared_memory # needs python >= 3.8 | |
import numpy as np | |
class MultiDemo(object): | |
def __init__(self, names=['Bobby', 'Mo', 'Mane'], size=4): | |
self.names = names | |
self.shared = {} | |
self.arrays = {} | |
self.procs = {} | |
self.parents = {} | |
context = multiprocessing.get_context('spawn') | |
for i, name in enumerate(names): | |
# Allocate shared memory. The returned object must not go out of scope | |
# as long as the memory is being used somewhere. | |
self.shared[name] = multiprocessing.shared_memory.SharedMemory(name=name, size=size, create=True) | |
# Create numpy array using the shared memory. | |
self.arrays[name] = np.ndarray(size, dtype=np.int64, buffer=self.shared[name].buf) | |
self.arrays[name][:] = i | |
# Create pipe and process. | |
self.parents[name], child = context.Pipe() | |
self.procs[name] = context.Process(target=MultiDemo.worker, args=(name, size, child)) | |
# Start the process. | |
self.procs[name].start() | |
def run(self): | |
for action in ('ones', 'zeros', 'arange'): | |
time.sleep(0.5) | |
print(f'sending action="{action}" to all workers...') | |
for i, name in enumerate(self.names): | |
# Set our copy of the array. | |
self.arrays[name][0] = i | |
# Launch the worker | |
self.parents[name].send(action) | |
worker_array = self.parents[name].recv() | |
# Check that we agree on the array contents. | |
print(self.arrays[name], worker_array) | |
print('quitting...') | |
for name in self.names: | |
self.parents[name].send('quit') | |
@staticmethod | |
def dowork(array, action): | |
if action == 'zeros': | |
array[:] = np.zeros(array.size) | |
elif action == 'ones': | |
array[:] = np.ones(array.size) | |
elif action == 'arange': | |
array[:] = np.arange(array.size) | |
@staticmethod | |
def worker(name, size, pipe): | |
# Attach our array to the shared memory associated with this name. | |
shm = multiprocessing.shared_memory.SharedMemory(name=name) | |
array = np.ndarray(size, dtype=np.int64, buffer=shm.buf) | |
while True: | |
action = pipe.recv() | |
if action == 'quit': | |
shm.close() | |
pipe.send('bye') | |
pipe.close() | |
break | |
else: | |
MultiDemo.dowork(array, action) | |
pipe.send(array) | |
def shutdown(self): | |
for name in self.names: | |
# Shutdown process. | |
print(f'Shutting down {name}...') | |
self.procs[name].join() | |
# Release shared memory. | |
print(f'Releasing shared mem for {name}...') | |
shm = multiprocessing.shared_memory.SharedMemory(name=name) | |
shm.close() | |
shm.unlink() | |
if __name__ == '__main__': | |
demo = MultiDemo() | |
try: | |
demo.run() | |
finally: | |
demo.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment