Last active
June 11, 2021 15:52
-
-
Save sirrobot01/091a9bd4d8441dc511497e8b2ce367ae to your computer and use it in GitHub Desktop.
Celery Task runner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery.utils.log import get_logger | |
from django.conf import settings | |
logger = get_logger(__name__) | |
class TaskRunner: | |
def __init__(self, func_code, func, args=[], kwargs={}, task_kwargs={}): | |
self.func_code = func_code # This could be used to save to db | |
self.func = func | |
self.args = args | |
self.kwargs = kwargs | |
self.task_kwargs = task_kwargs | |
@property | |
def _task_kwargs(self): | |
# retries | |
retries_policy = getattr(settings, "RETRIES_POLICY") | |
if not self.task_kwargs.get('retries_policy') and retries_policy: | |
self.task_kwargs['retries_policy'] = retries_policy | |
self.task_kwargs['retry'] = True | |
return self.task_kwargs | |
def run(self): | |
try: | |
return self.func.s(*self.args, **self.kwargs).apply_async(**self._task_kwargs) | |
except self.func.OperationalError as exc: | |
logger.exception('Sending task raised: %r', exc) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add RETRIES_POLICY dictionary to your settings.py file
Running task, send_email using this util
task = TaskRunner('send_email', send_email, args=['[email protected]'], task_kwargs={'countdown': 3}).run()