Last active
December 15, 2023 01:20
-
-
Save jgarvin/abdec3b42fb6f3532185cd747b308802 to your computer and use it in GitHub Desktop.
Randomize asyncio awaits
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import asyncio | |
import random | |
import logging as log # noqa | |
from typing import TypeVar, Generator | |
import pytest | |
_T = TypeVar("_T") | |
class RandomizedFuture(asyncio.Future[_T]): | |
def __init__(self, *, loop: asyncio.AbstractEventLoop | None = None) -> None: | |
super().__init__(loop=loop or asyncio.get_event_loop()) | |
self._actual_result: _T | None = None | |
self._result_set = False | |
self._result_really_set = False | |
def set_result(self, result: _T) -> None: | |
if self._result_set or self.done(): | |
# Future already has a result set or is cancelled, raise an exception | |
raise asyncio.InvalidStateError("The future already has a result or is cancelled") | |
self._actual_result = result | |
self._result_set = True | |
self._randomly_set_or_delay_result() | |
def _randomly_set_or_delay_result(self) -> None: | |
if not self._result_really_set: | |
if random.choice([True, False]): | |
self._really_set_result() | |
else: | |
# Schedule another attempt with a slight delay, we use | |
# a nonzero delay b/c we are worried the event loop | |
# will push the call on the list it is currently | |
# iterating through and pick it up right away without | |
# actually giving anything else a chance to run. | |
self.get_loop().call_later(random.random()*0.01, self._randomly_set_or_delay_result) | |
def _really_set_result(self) -> None: | |
self._result_really_set = True | |
# We can't assert this b/c sometimes _T itself actually is None. | |
# So instead we disable the typecheck on `super().set_result()` | |
# | |
# assert self._actual_result is not None | |
super().set_result(self._actual_result) # type: ignore | |
def set_exception(self, exception: type | BaseException) -> None: | |
if self._result_set or self.done(): | |
raise asyncio.InvalidStateError("The future already has a result or is cancelled") | |
# Directly set the exception, as we don't need to randomize this | |
self._result_really_set = True | |
super().set_exception(exception) | |
def done(self) -> bool: | |
# Override 'done' to consider if the result is set | |
return super().done() or self._result_set | |
def result(self) -> _T: | |
# Force setting the actual result if it's pending | |
if not super().done() and self._result_set: | |
self._really_set_result() | |
return super().result() | |
def exception(self) -> None | BaseException: | |
# Force setting the actual result if it's pending | |
if not super().done() and self._result_set: | |
self._really_set_result() | |
return super().exception() | |
class RandomizedEventLoop(asyncio.SelectorEventLoop): | |
def _run_once(self) -> None: | |
# Shuffle the scheduled callbacks | |
# not sure why mypy doesn't think _scheduled exists... | |
random.shuffle(self._scheduled) # type: ignore | |
super()._run_once() # type: ignore | |
def create_future(self) -> RandomizedFuture[_T]: | |
# Override to return an instance of RandomizedFuture | |
return RandomizedFuture(loop=self) | |
# Fixture to use the custom event loop, to use in a test do: | |
# | |
# from random_event_loop import event_loop | |
# | |
# pytest automatically recognizes the name "event_loop" being defined | |
# at the top level of the test module | |
@pytest.fixture(scope="function") | |
def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]: | |
log.info("Using random loop") | |
loop = RandomizedEventLoop() | |
yield loop | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment