Skip to content

Instantly share code, notes, and snippets.

@davidrios
Last active August 29, 2015 14:26
Show Gist options
  • Save davidrios/011d044b5e7510f085dd to your computer and use it in GitHub Desktop.
Save davidrios/011d044b5e7510f085dd to your computer and use it in GitHub Desktop.
import asyncio
import signal
from random import random
_stop_all = []
simulate_erros = []
@asyncio.coroutine
def consumer(queue):
try:
while True:
if _stop_all:
break
try:
item = queue.get_nowait()
except asyncio.QueueEmpty:
yield from asyncio.sleep(1)
continue
if simulate_erros and random() < 0.005:
raise Exception('consumer noooooooo')
yield from asyncio.sleep(1) # doing work
print('consumer done: ', item)
except:
_stop_all.append(1)
raise
@asyncio.coroutine
def producer(queue):
try:
for i in range(200):
yield from asyncio.sleep(.2) # producing item
if simulate_erros and random() < 0.01:
raise Exception('producer noooooo')
while True:
if _stop_all:
return
try:
queue.put_nowait(i)
except asyncio.QueueFull:
yield from asyncio.sleep(1)
continue
break
if i % 7 == 0:
yield from subproducer(queue, i)
except:
_stop_all.append(1)
raise
@asyncio.coroutine
def subproducer(queue, current):
for i in range(200 * current, 200 * current + 10):
yield from asyncio.sleep(.2) # producing item
while True:
if _stop_all:
return
try:
queue.put_nowait(i)
except asyncio.QueueFull:
yield from asyncio.sleep(1)
continue
break
@asyncio.coroutine
def start(loop, workers):
queue = asyncio.Queue(maxsize=20)
producer_coro = asyncio.async(producer(queue), loop=loop)
consumer_coros = [asyncio.async(consumer(queue), loop=loop) for i in range(workers)]
def rs(*args):
print('received interrupt, finalizing...')
_stop_all.append(1)
loop.add_signal_handler(signal.SIGINT, rs)
yield from asyncio.wait([producer_coro] + consumer_coros)
def main():
import sys
if len(sys.argv) > 1:
simulate_erros.append(1)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start(loop, 24))
finally:
loop.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment