Skip to content

Instantly share code, notes, and snippets.

@tomschr
Last active January 31, 2020 21:38
Show Gist options
  • Save tomschr/39734f0151a14187fd8f4844f66be6ba to your computer and use it in GitHub Desktop.
Save tomschr/39734f0151a14187fd8f4844f66be6ba to your computer and use it in GitHub Desktop.
Producer/Consumer pattern for asyncio (Python >=3.4)
# Original source from http://asyncio.readthedocs.io/en/latest/producer_consumer.html
# Rewritten for Python >=3.4
import asyncio
import random
@asyncio.coroutine
def produce(queue, n):
for x in range(n):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
yield from asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
yield from queue.put(item)
@asyncio.coroutine
def consume(queue):
while True:
# wait for an item from the producer
item = yield from queue.get()
# process the item
print('consuming {}...'.format(item))
# simulate i/o operation using sleep
yield from asyncio.sleep(random.random())
# Notify the queue that the item has been processed
queue.task_done()
@asyncio.coroutine
def run(n):
queue = asyncio.Queue()
# schedule the consumer
consumer = asyncio.ensure_future(consume(queue))
# run the producer and wait for completion
yield from produce(queue, n)
# wait until the consumer has processed all items
yield from queue.join()
# the consumer is still awaiting for an item, cancel it
consumer.cancel()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run(10))
loop.close()
# Original source from http://asyncio.readthedocs.io/en/latest/producer_consumer.html
# Rewritten for Python >=3.4
import asyncio
import random
@asyncio.coroutine
def produce(queue, n):
for x in range(1, n + 1):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
yield from asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
yield from queue.put(item)
# indicate the producer is done
yield from queue.put(None)
@asyncio.coroutine
def consume(queue):
while True:
# wait for an item from the producer
item = yield from queue.get()
if item is None:
# the producer emits None to indicate that it is done
break
# process the item
print('consuming item {}...'.format(item))
# simulate i/o operation using sleep
yield from asyncio.sleep(random.random())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()
# From: https://pymotw.com/2/multiprocessing/communication.html
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print('%s: Exiting' % proc_name)
self.task_queue.task_done()
break
print('%s: %s' % (proc_name, next_task))
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print('Creating %d consumers' % num_consumers)
consumers = [Consumer(tasks, results) for i in range(num_consumers)]
start = time.time()
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in range(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
end = time.time()
# Start printing results
while num_jobs:
result = results.get()
print('Result:', result)
num_jobs -= 1
print("Running for %.3fs" % (end - start))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment