Skip to content

Instantly share code, notes, and snippets.

@dkirkby
Last active February 12, 2021 17:34
Show Gist options
  • Save dkirkby/6f5e07f6bc950b1c42739b54e8b3d046 to your computer and use it in GitHub Desktop.
Save dkirkby/6f5e07f6bc950b1c42739b54e8b3d046 to your computer and use it in GitHub Desktop.
Demo of multiprocessing with shared numpy arrays
# Test parallel processing with shared memory.
import time
import multiprocessing
import multiprocessing.shared_memory # needs python >= 3.8
import numpy as np
class MultiDemo(object):
def __init__(self, names=['Bobby', 'Mo', 'Mane'], size=4):
self.names = names
self.shared = {}
self.arrays = {}
self.procs = {}
self.parents = {}
context = multiprocessing.get_context('spawn')
for i, name in enumerate(names):
# Allocate shared memory. The returned object must not go out of scope
# as long as the memory is being used somewhere.
self.shared[name] = multiprocessing.shared_memory.SharedMemory(name=name, size=size, create=True)
# Create numpy array using the shared memory.
self.arrays[name] = np.ndarray(size, dtype=np.int64, buffer=self.shared[name].buf)
self.arrays[name][:] = i
# Create pipe and process.
self.parents[name], child = context.Pipe()
self.procs[name] = context.Process(target=MultiDemo.worker, args=(name, size, child))
# Start the process.
self.procs[name].start()
def run(self):
for action in ('ones', 'zeros', 'arange'):
time.sleep(0.5)
print(f'sending action="{action}" to all workers...')
for i, name in enumerate(self.names):
# Set our copy of the array.
self.arrays[name][0] = i
# Launch the worker
self.parents[name].send(action)
worker_array = self.parents[name].recv()
# Check that we agree on the array contents.
print(self.arrays[name], worker_array)
print('quitting...')
for name in self.names:
self.parents[name].send('quit')
@staticmethod
def dowork(array, action):
if action == 'zeros':
array[:] = np.zeros(array.size)
elif action == 'ones':
array[:] = np.ones(array.size)
elif action == 'arange':
array[:] = np.arange(array.size)
@staticmethod
def worker(name, size, pipe):
# Attach our array to the shared memory associated with this name.
shm = multiprocessing.shared_memory.SharedMemory(name=name)
array = np.ndarray(size, dtype=np.int64, buffer=shm.buf)
while True:
action = pipe.recv()
if action == 'quit':
shm.close()
pipe.send('bye')
pipe.close()
break
else:
MultiDemo.dowork(array, action)
pipe.send(array)
def shutdown(self):
for name in self.names:
# Shutdown process.
print(f'Shutting down {name}...')
self.procs[name].join()
# Release shared memory.
print(f'Releasing shared mem for {name}...')
shm = multiprocessing.shared_memory.SharedMemory(name=name)
shm.close()
shm.unlink()
if __name__ == '__main__':
demo = MultiDemo()
try:
demo.run()
finally:
demo.shutdown()
# Test parallel processing with shared memory.
import time
import multiprocessing
import multiprocessing.shared_memory # needs python >= 3.8
import numpy as np
class MultiDemo(object):
def __init__(self, names=['Bobby', 'Mo', 'Mane'], size=4):
self.names = names
self.shared = {}
self.arrays = {}
self.procs = {}
self.parents = {}
context = multiprocessing.get_context('spawn')
for i, name in enumerate(names):
# Allocate shared memory. The returned object must not go out of scope
# as long as the memory is being used somewhere.
self.shared[name] = multiprocessing.shared_memory.SharedMemory(name=name, size=size, create=True)
# Create numpy array using the shared memory.
self.arrays[name] = np.ndarray(size, dtype=np.int64, buffer=self.shared[name].buf)
self.arrays[name][:] = i
# Create pipe and process.
self.parents[name], child = context.Pipe()
self.procs[name] = context.Process(target=MultiDemo.worker, args=(name, size, child))
# Start the process.
self.procs[name].start()
def run(self):
for action in ('ones', 'zeros', 'arange'):
time.sleep(0.5)
print(f'sending action="{action}" to all workers...')
for i, name in enumerate(self.names):
# Set our copy of the array.
self.arrays[name][0] = i
# Launch the worker
self.parents[name].send(action)
worker_array = self.parents[name].recv()
# Check that we agree on the array contents.
print(self.arrays[name], worker_array)
print('quitting...')
for name in self.names:
self.parents[name].send('quit')
@staticmethod
def dowork(array, action):
if action == 'zeros':
array[:] = np.zeros(array.size)
elif action == 'ones':
array[:] = np.ones(array.size)
elif action == 'arange':
array[:] = np.arange(array.size)
@staticmethod
def worker(name, size, pipe):
# Attach our array to the shared memory associated with this name.
shm = multiprocessing.shared_memory.SharedMemory(name=name)
array = np.ndarray(size, dtype=np.int64, buffer=shm.buf)
while True:
action = pipe.recv()
if action == 'quit':
shm.close()
pipe.send('bye')
pipe.close()
break
else:
MultiDemo.dowork(array, action)
pipe.send(array)
def shutdown(self):
for name in self.names:
# Shutdown process.
print(f'Shutting down {name}...')
self.procs[name].join()
# Release shared memory.
print(f'Releasing shared mem for {name}...')
shm = multiprocessing.shared_memory.SharedMemory(name=name)
shm.close()
shm.unlink()
if __name__ == '__main__':
demo = MultiDemo()
try:
demo.run()
finally:
demo.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment