Skip to content

Instantly share code, notes, and snippets.

@mgalardini
Last active February 5, 2016 11:30
Show Gist options
  • Save mgalardini/7c74f6cf3831ca05d90e to your computer and use it in GitHub Desktop.
Save mgalardini/7c74f6cf3831ca05d90e to your computer and use it in GitHub Desktop.
Python multiprocessing module evolution
#!/usr/bin/env python2
'''
skeleton for parallel python 2 scripts
Marco Galardini 2015
GPL v3.0
'''
import sys
from multiprocessing.queues import Queue
import multiprocessing
import time
class Worker(object):
def __init__(self, number):
self.number = number
def __call__(self):
print('worker number %d'%self.number)
time.sleep(5)
return True
class Consumer(multiprocessing.Process):
def __init__(self,
task_queue = multiprocessing.Queue(),
result_queue = multiprocessing.Queue()):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
while True:
next_task = self.task_queue.get()
time.sleep(0.01)
if next_task is None:
# Poison pill means we should exit
break
answer = next_task()
self.result_queue.put(answer)
return
class MultiProcess(object):
'''
Class MultiProcess
An object that can perform multiprocesses
'''
def __init__(self,ncpus=1):
self.ncpus = int(ncpus)
# Parallelization
self._parallel = None
self._paralleltasks = Queue()
self._parallelresults = Queue()
def init(self):
self._parallel = [Consumer(self._paralleltasks,self._parallelresults)
for x in range(self.ncpus)]
for consumer in self._parallel:
consumer.start()
def add_poison(self):
for consumer in self._parallel:
self._paralleltasks.put(None)
def is_terminated(self):
for consumer in self._parallel:
if consumer.is_alive():
return False
return True
def kill(self):
for consumer in self._parallel:
consumer.terminate()
def work(self):
self.init()
for i in range(10):
obj = Worker(i)
self._paralleltasks.put(obj)
# Poison pill to stop the workers
self.add_poison()
while True:
while not self._parallelresults.empty():
result = self._parallelresults.get()
if self.is_terminated():
break
time.sleep(0.1)
# Get the last messages
while not self._parallelresults.empty():
result = self._parallelresults.get()
self.kill()
return True
MultiProcess(int(sys.argv[1])).work()
#!/usr/bin/env python3
'''
skeleton for parallel python 3 scripts
Marco Galardini 2015
GPL v3.0
'''
from multiprocessing import Pool
import time
import sys
def work(x):
print('worker number %d'%x)
time.sleep(5)
if __name__ == '__main__':
with Pool(int(sys.argv[1])) as p:
p.map(work, range(10))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment