Skip to content

Instantly share code, notes, and snippets.

@adamruzicka
Created November 11, 2019 14:02
Show Gist options
  • Save adamruzicka/32d13bfecbc64f8338a2bcd3056e96bf to your computer and use it in GitHub Desktop.
Save adamruzicka/32d13bfecbc64f8338a2bcd3056e96bf to your computer and use it in GitHub Desktop.
import asyncio
import json
class ResponseQueue(asyncio.Queue):
def __init__(self, *args, **kwargs):
self.done = False
super().__init__(*args, **kwargs)
def __aiter__(self):
return self
async def __anext__(self):
print(f"response queue __anext__ {self.done}")
if self.done:
raise StopAsyncIteration
retval = await self.get()
print(f"response queue __anext__ {self.done} returned {retval}")
return retval
async def request(queue, duration, repeat):
await asyncio.gather(*[iteration(idx, queue, duration) for idx in range(0, repeat)])
print("SETTING QUEUE TO DONE")
queue.done = True
# Need to "flush" the queue, otherwise self.done will never be true in __anext__
# and the execution will get stuck on await self.get()
await queue.put(None)
async def iteration(idx, queue, duration):
await asyncio.sleep(duration)
await queue.put("iteration {}".format(idx))
def execute(message, config = {}):
loop = asyncio.get_event_loop()
queue = ResponseQueue(loop=loop)
payload = json.loads(message.raw_payload)
loop.create_task(request(queue,
payload.pop("duration", 30),
payload.pop("repeat", 1)))
return queue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment