Created
March 12, 2024 19:58
-
-
Save victorusachev/556d95338179f6c40d97dc0560524382 to your computer and use it in GitHub Desktop.
apscheduler.BlockingScheduler with retry
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
from typing import Callable, Any | |
from loguru import logger | |
from integration.utils import retry | |
def run_something(f: Callable, *args: Any, **kwargs: Any) -> None: | |
try: | |
func = functools.partial(f, *args, **kwargs) | |
retry_kwargs = { | |
'tries': 5, | |
'delay': 1, | |
'max_delay': 60, | |
'backoff': 2, | |
} | |
retry(func, exceptions=Exception, logger=logger, **retry_kwargs) | |
except Exception as exc: | |
logger.error(f'An error has occurred: {exc}') | |
def run_one() -> None: | |
run_something(do_work, job_name='ONE') | |
def run_two() -> None: | |
run_something(do_work, job_name='TWO') | |
def do_work(job_name: str) -> None: | |
import random | |
import time | |
import uuid | |
task_id = uuid.uuid4() | |
logger.info('{} - {}: start task ...', job_name, task_id) | |
seconds = random.choice([10, 15, 30, 65]) | |
has_error = random.choice([False, False, True]) | |
logger.debug(f'{{}} - {{}}: {seconds=}, {has_error=}', job_name, task_id) | |
time.sleep(seconds) | |
if has_error: | |
raise Exception(f'An error occurred while executing the task {job_name} with ID {task_id}') | |
logger.info('{} - {}: completed in {}', job_name, task_id, seconds) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from apscheduler.schedulers.blocking import BlockingScheduler | |
from apscheduler.triggers.cron import CronTrigger | |
from loguru import logger | |
TASKS_CONFIGURATION = [ | |
{ | |
'func': 'application.jobs:job_one', | |
'cron_expression': '*/1 * * * *', | |
'max_instances': 1, | |
}, | |
{ | |
'func': 'application.jobs:job_two', | |
'cron_expression': '*/1 * * * *', | |
'max_instances': 1, | |
}, | |
] | |
def main() -> None: | |
tasks = TASKS_CONFIGURATION.copy() | |
scheduler = BlockingScheduler(job_defaults={'max_instances': 6}) | |
for task in tasks: | |
func = task.pop('func') | |
trigger = CronTrigger.from_crontab(task.pop('cron_expression')) | |
kwargs = {**task} | |
scheduler.add_job(func=func, trigger=trigger, **kwargs) | |
scheduler.start() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import random | |
import time | |
from typing import Callable, Type | |
def retry( | |
func: Callable[[], None], | |
exceptions: Type[Exception] | tuple[Type[Exception], ...], | |
tries: int = -1, | |
delay: int = 0, | |
max_delay: int | None = None, | |
backoff: int = 1, | |
jitter: int = 0, | |
logger: logging.Logger | None = None, | |
): | |
""" | |
Executes a function and retries it if it failed. | |
:param func: the function to execute. | |
:param exceptions: an exception or a tuple of exceptions to catch. default: Exception. | |
:param tries: the maximum number of attempts. default: -1 (infinite). | |
:param delay: initial delay between attempts. default: 0. | |
:param max_delay: the maximum value of delay. default: None (no limit). | |
:param backoff: multiplier applied to delay between attempts. default: 1 (no backoff). | |
:param jitter: extra seconds added to delay between attempts. default: 0. | |
fixed if a number, random if a range tuple (min, max) | |
:param logger: logger.warning(fmt, error, delay) will be called on failed attempts. | |
default: retry.logging_logger. if None, logging is disabled. | |
:returns: the result of the f function. | |
""" | |
while tries: | |
try: | |
return func() | |
except exceptions as e: | |
tries -= 1 | |
if not tries: | |
raise | |
if logger is not None: | |
logger.warning(f'{e}, retrying in {delay} seconds...') | |
time.sleep(delay) | |
delay *= backoff | |
if isinstance(jitter, tuple): | |
delay += random.uniform(*jitter) | |
else: | |
delay += jitter | |
if max_delay is not None: | |
delay = min(delay, max_delay) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment