Forked from tomschr/asyncio-producer-consumer-task_done.py
Created
May 30, 2018 06:05
-
-
Save kenwoodjw/f7e12af2aacef65e60e876e5a789e4b0 to your computer and use it in GitHub Desktop.
Producer/Consumer pattern for asyncio (Python >=3.4)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Original source from http://asyncio.readthedocs.io/en/latest/producer_consumer.html | |
# Rewritten for Python >=3.4 | |
import asyncio | |
import random | |
@asyncio.coroutine | |
def produce(queue, n): | |
for x in range(n): | |
# produce an item | |
print('producing {}/{}'.format(x, n)) | |
# simulate i/o operation using sleep | |
yield from asyncio.sleep(random.random()) | |
item = str(x) | |
# put the item in the queue | |
yield from queue.put(item) | |
@asyncio.coroutine | |
def consume(queue): | |
while True: | |
# wait for an item from the producer | |
item = yield from queue.get() | |
# process the item | |
print('consuming {}...'.format(item)) | |
# simulate i/o operation using sleep | |
yield from asyncio.sleep(random.random()) | |
# Notify the queue that the item has been processed | |
queue.task_done() | |
@asyncio.coroutine | |
def run(n): | |
queue = asyncio.Queue() | |
# schedule the consumer | |
consumer = asyncio.ensure_future(consume(queue)) | |
# run the producer and wait for completion | |
yield from produce(queue, n) | |
# wait until the consumer has processed all items | |
yield from queue.join() | |
# the consumer is still awaiting for an item, cancel it | |
consumer.cancel() | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(run(10)) | |
loop.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Original source from http://asyncio.readthedocs.io/en/latest/producer_consumer.html | |
# Rewritten for Python >=3.4 | |
import asyncio | |
import random | |
@asyncio.coroutine | |
def produce(queue, n): | |
for x in range(1, n + 1): | |
# produce an item | |
print('producing {}/{}'.format(x, n)) | |
# simulate i/o operation using sleep | |
yield from asyncio.sleep(random.random()) | |
item = str(x) | |
# put the item in the queue | |
yield from queue.put(item) | |
# indicate the producer is done | |
yield from queue.put(None) | |
@asyncio.coroutine | |
def consume(queue): | |
while True: | |
# wait for an item from the producer | |
item = yield from queue.get() | |
if item is None: | |
# the producer emits None to indicate that it is done | |
break | |
# process the item | |
print('consuming item {}...'.format(item)) | |
# simulate i/o operation using sleep | |
yield from asyncio.sleep(random.random()) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
queue = asyncio.Queue(loop=loop) | |
producer_coro = produce(queue, 10) | |
consumer_coro = consume(queue) | |
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro)) | |
loop.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# From: https://pymotw.com/2/multiprocessing/communication.html | |
import multiprocessing | |
import time | |
class Consumer(multiprocessing.Process): | |
def __init__(self, task_queue, result_queue): | |
multiprocessing.Process.__init__(self) | |
self.task_queue = task_queue | |
self.result_queue = result_queue | |
def run(self): | |
proc_name = self.name | |
while True: | |
next_task = self.task_queue.get() | |
if next_task is None: | |
# Poison pill means shutdown | |
print('%s: Exiting' % proc_name) | |
self.task_queue.task_done() | |
break | |
print('%s: %s' % (proc_name, next_task)) | |
answer = next_task() | |
self.task_queue.task_done() | |
self.result_queue.put(answer) | |
return | |
class Task(object): | |
def __init__(self, a, b): | |
self.a = a | |
self.b = b | |
def __call__(self): | |
time.sleep(0.1) # pretend to take some time to do the work | |
return '%s * %s = %s' % (self.a, self.b, self.a * self.b) | |
def __str__(self): | |
return '%s * %s' % (self.a, self.b) | |
if __name__ == '__main__': | |
# Establish communication queues | |
tasks = multiprocessing.JoinableQueue() | |
results = multiprocessing.Queue() | |
# Start consumers | |
num_consumers = multiprocessing.cpu_count() * 2 | |
print('Creating %d consumers' % num_consumers) | |
consumers = [Consumer(tasks, results) for i in range(num_consumers)] | |
start = time.time() | |
for w in consumers: | |
w.start() | |
# Enqueue jobs | |
num_jobs = 10 | |
for i in range(num_jobs): | |
tasks.put(Task(i, i)) | |
# Add a poison pill for each consumer | |
for i in range(num_consumers): | |
tasks.put(None) | |
# Wait for all of the tasks to finish | |
tasks.join() | |
end = time.time() | |
# Start printing results | |
while num_jobs: | |
result = results.get() | |
print('Result:', result) | |
num_jobs -= 1 | |
print("Running for %.3fs" % (end - start)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment