Created
November 11, 2019 14:02
-
-
Save adamruzicka/32d13bfecbc64f8338a2bcd3056e96bf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
class ResponseQueue(asyncio.Queue): | |
def __init__(self, *args, **kwargs): | |
self.done = False | |
super().__init__(*args, **kwargs) | |
def __aiter__(self): | |
return self | |
async def __anext__(self): | |
print(f"response queue __anext__ {self.done}") | |
if self.done: | |
raise StopAsyncIteration | |
retval = await self.get() | |
print(f"response queue __anext__ {self.done} returned {retval}") | |
return retval | |
async def request(queue, duration, repeat): | |
await asyncio.gather(*[iteration(idx, queue, duration) for idx in range(0, repeat)]) | |
print("SETTING QUEUE TO DONE") | |
queue.done = True | |
# Need to "flush" the queue, otherwise self.done will never be true in __anext__ | |
# and the execution will get stuck on await self.get() | |
await queue.put(None) | |
async def iteration(idx, queue, duration): | |
await asyncio.sleep(duration) | |
await queue.put("iteration {}".format(idx)) | |
def execute(message, config = {}): | |
loop = asyncio.get_event_loop() | |
queue = ResponseQueue(loop=loop) | |
payload = json.loads(message.raw_payload) | |
loop.create_task(request(queue, | |
payload.pop("duration", 30), | |
payload.pop("repeat", 1))) | |
return queue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment