Last active
March 13, 2023 12:26
-
-
Save anapaulagomes/8ae00bad282aafd1ec08471ac2944015 to your computer and use it in GitHub Desktop.
Example of Celery signals
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery.signals import task_failure, after_task_publish | |
@app.task(bind=True) | |
def my_task(self): | |
print(1/0) # this line will cause an exception | |
@after_task_publish.connect(sender='mymodule.tasks.my_task') | |
def task_sent_handler(sender=None, headers=None, body=None, **kwargs): | |
info = headers if 'task' in headers else body | |
print('after_task_publish for task id {info[id]}'.format( | |
info=info, | |
)) | |
@task_failure.connect(sender=my_task) # FIXME by name it doesn't work | |
def task_failure_handler(sender=None, headers=None, body=None, **kwargs): | |
print('This will be executed after if `my task` fails') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment