Skip to content

Instantly share code, notes, and snippets.

@dmontagu
Created December 17, 2019 10:00
Show Gist options
  • Save dmontagu/888a03196b1b8a2d00a0fb8313825865 to your computer and use it in GitHub Desktop.
Save dmontagu/888a03196b1b8a2d00a0fb8313825865 to your computer and use it in GitHub Desktop.
Task repeating
import asyncio
import logging
from asyncio import ensure_future
from functools import wraps
from traceback import format_exception
from typing import Any, Callable, Coroutine, Optional, Union
from starlette.concurrency import run_in_threadpool
NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[[Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT]
def repeat_every(
*,
seconds: float,
wait_first: bool = False,
logger: Optional[logging.Logger] = None,
raise_exceptions: bool = False,
max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
is_coroutine = asyncio.iscoroutinefunction(func)
@wraps(func)
async def wrapped() -> None:
repetitions = 0
async def loop() -> None:
nonlocal repetitions
if wait_first:
await asyncio.sleep(seconds)
while max_repetitions is None or repetitions < max_repetitions:
try:
if is_coroutine:
await func() # type: ignore
else:
await run_in_threadpool(func)
repetitions += 1
except Exception as exc:
if logger is not None:
formatted_exception = "".join(format_exception(type(exc), exc, exc.__traceback__))
logger.error(formatted_exception)
if raise_exceptions:
raise exc
await asyncio.sleep(seconds)
ensure_future(loop())
return wrapped
return decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment