Created
October 3, 2018 20:29
-
-
Save howardhamilton/11d6ee0a8b0acb29b9ed82338ca1e849 to your computer and use it in GitHub Desktop.
Multiprocessing pattern
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
class Consumer(multiprocessing.Process): | |
"""Consumer processes. Subclassed from Process in multiprocessing.""" | |
def __init__(self, task_queue, result_queue): | |
multiprocessing.Process.__init__(self) | |
self.task_queue = task_queue | |
self.result_queue = result_queue | |
def run(self): | |
proc_name = self.name | |
while True: | |
next_task = self.task_queue.get() | |
if next_task is None: | |
# Poison pill means shutdown | |
print '%s: Exiting' % proc_name | |
self.task_queue.task_done() | |
break | |
print '%s: %s' % (proc_name, next_task) | |
answer = next_task() # carry out task | |
self.task_queue.task_done() # mark task as complete | |
self.result_queue.put(answer) # put answer in result queue | |
return | |
class Task(object): | |
def __call__(self): | |
return foo() | |
def __str__(self): | |
return "Processing {}".format(self.file_name) | |
def foo(): | |
return | |
if __name__ == "__main__": | |
# Establish communication queues | |
tasks = multiprocessing.JoinableQueue() | |
results = multiprocessing.Queue() | |
# Start consumers | |
num_consumers = multiprocessing.cpu_count() * 2 | |
print('Creating {} consumers'.format(num_consumers)) | |
consumers = [Consumer(tasks, results) for i in range(num_consumers)] | |
for w in consumers: | |
w.start() | |
# Enqueue jobs | |
tasks.put(Task()) | |
# Add poison pill for each consumer | |
for i in range(num_consumers): | |
tasks.put(None) | |
# Wait for all tasks to finish | |
tasks.join() | |
# Print results | |
while num_jobs: | |
result = results.get() | |
print(result) | |
num_jobs -= 1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment