Skip to content

Instantly share code, notes, and snippets.

@lacek
Last active August 23, 2018 02:59
Show Gist options
  • Save lacek/910e3c76bf0817bba8e94bc3ed2fbbf9 to your computer and use it in GitHub Desktop.
Save lacek/910e3c76bf0817bba8e94bc3ed2fbbf9 to your computer and use it in GitHub Desktop.
Pythhon multiprocessing with queue
#!/usr/bin/env python2.7
# single producer-multiple consumer parallelism using a queue
from ctypes import c_bool
import logging
import multiprocessing as mp
import time
TASK_END = 'TASK_END'
NUM_CONSUMERS = 2
NUM_TASKS = 4
# workaround for python2 not having Pool.starmap
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
def producer(input, num_consumers, q):
logger = logging.getLogger('producer')
logger.info('Producer start')
try:
for task in input.split():
q.put(task)
logger.info('Enqueued task: %s', task)
time.sleep(1)
for i in xrange(num_consumers):
q.put(TASK_END)
logger.info('Producer done')
except (KeyboardInterrupt, SystemExit):
logger.error('Producer interrupted')
@unpack_args
def consumer(name, q):
logger = logging.getLogger(name)
logger.info('Consumer start')
count = 0
try:
for task in iter(q.get, TASK_END):
logger.info('Working on task: %s', task)
time.sleep(2)
count += 1
logger.info('Consumer done')
except (KeyboardInterrupt, SystemExit):
logger.error('Consumer interrupted')
return (name, count)
def main():
logging.basicConfig(level=logging.INFO,
format="[%(asctime)s][%(name)s] - %(message)s")
logger = logging.getLogger('main')
man = mp.Manager()
q = man.Queue()
pool = mp.Pool(processes=NUM_CONSUMERS + 1)
tasks = ' '.join(map(str, range(NUM_TASKS)))
pool.apply_async(producer, (tasks, NUM_CONSUMERS, q))
logger.info('Created producer')
results = pool.map_async(consumer,
map(lambda i: ('consumer-%d' % i, q),
range(NUM_CONSUMERS)))
logger.info('Created consumers')
pool.close()
pool.join()
for (name, count) in results.get():
logger.info('%s done %d tasks', name, count)
logger.info('Main done')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment