Skip to content

Instantly share code, notes, and snippets.

@nlm
Last active November 16, 2016 08:26
Show Gist options
  • Save nlm/81acc8f74b71d01f6b57800b8df1f852 to your computer and use it in GitHub Desktop.
Save nlm/81acc8f74b71d01f6b57800b8df1f852 to your computer and use it in GitHub Desktop.
Demo of asyncio feeder/worker model using queue + interruption handling
import asyncio
import random
queue = asyncio.Queue()
@asyncio.coroutine
def feeder():
try:
fid = random.randint(100, 999)
print('feeder {} activated'.format(fid))
while True:
value = random.randint(1000, 9999)
print('enqueueing {} -> {}'.format(fid, value))
yield from queue.put(value)
yield from asyncio.sleep(1)
except asyncio.CancelledError:
print('feeder {} canceled'.format(fid))
def worker():
try:
wid = random.randint(100, 999)
print('worker {} activated'.format(wid))
while True:
value = yield from queue.get()
print('dequeueing {} -> {}'.format(wid, value))
# yield to prevent other workers from starving
yield from asyncio.sleep(0)
except asyncio.CancelledError:
print('worker {} canceled'.format(wid))
loop = asyncio.get_event_loop()
tasks = []
tasks.append(asyncio.ensure_future(feeder()))
tasks.append(asyncio.ensure_future(worker()))
task = asyncio.gather(*tasks)
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
task.cancel()
loop.run_forever()
# handle exceptions raised by tasks
# optional here because CancelledError
# are handled in coroutines
#task.exception()
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment