Last active
August 29, 2015 14:26
-
-
Save davidrios/011d044b5e7510f085dd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import signal | |
| from random import random | |
| _stop_all = [] | |
| simulate_erros = [] | |
| @asyncio.coroutine | |
| def consumer(queue): | |
| try: | |
| while True: | |
| if _stop_all: | |
| break | |
| try: | |
| item = queue.get_nowait() | |
| except asyncio.QueueEmpty: | |
| yield from asyncio.sleep(1) | |
| continue | |
| if simulate_erros and random() < 0.005: | |
| raise Exception('consumer noooooooo') | |
| yield from asyncio.sleep(1) # doing work | |
| print('consumer done: ', item) | |
| except: | |
| _stop_all.append(1) | |
| raise | |
| @asyncio.coroutine | |
| def producer(queue): | |
| try: | |
| for i in range(200): | |
| yield from asyncio.sleep(.2) # producing item | |
| if simulate_erros and random() < 0.01: | |
| raise Exception('producer noooooo') | |
| while True: | |
| if _stop_all: | |
| return | |
| try: | |
| queue.put_nowait(i) | |
| except asyncio.QueueFull: | |
| yield from asyncio.sleep(1) | |
| continue | |
| break | |
| if i % 7 == 0: | |
| yield from subproducer(queue, i) | |
| except: | |
| _stop_all.append(1) | |
| raise | |
| @asyncio.coroutine | |
| def subproducer(queue, current): | |
| for i in range(200 * current, 200 * current + 10): | |
| yield from asyncio.sleep(.2) # producing item | |
| while True: | |
| if _stop_all: | |
| return | |
| try: | |
| queue.put_nowait(i) | |
| except asyncio.QueueFull: | |
| yield from asyncio.sleep(1) | |
| continue | |
| break | |
| @asyncio.coroutine | |
| def start(loop, workers): | |
| queue = asyncio.Queue(maxsize=20) | |
| producer_coro = asyncio.async(producer(queue), loop=loop) | |
| consumer_coros = [asyncio.async(consumer(queue), loop=loop) for i in range(workers)] | |
| def rs(*args): | |
| print('received interrupt, finalizing...') | |
| _stop_all.append(1) | |
| loop.add_signal_handler(signal.SIGINT, rs) | |
| yield from asyncio.wait([producer_coro] + consumer_coros) | |
| def main(): | |
| import sys | |
| if len(sys.argv) > 1: | |
| simulate_erros.append(1) | |
| loop = asyncio.get_event_loop() | |
| try: | |
| loop.run_until_complete(start(loop, 24)) | |
| finally: | |
| loop.close() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment