Skip to content

Instantly share code, notes, and snippets.

@morenopc
Created April 8, 2014 15:19
Show Gist options
  • Save morenopc/10141260 to your computer and use it in GitHub Desktop.
Save morenopc/10141260 to your computer and use it in GitHub Desktop.
from celery.registry import tasks
from celery.signals import task_sent, task_prerun, task_postrun
def on_task_postrun(**kwargs):
print "on_task_postrun"
def on_task_sent(**kwargs):
print "on_task_sent"
task_sent.connect(on_task_sent, tasks['documents.tasks.CreateBoardbook'])
task_postrun.connect(on_task_postrun, tasks['documents.tasks.CreateBoardbook'])
from celery.task import task
from celery.task.sets import subtask
from accounts.models import BRUser
from meetings.models import Meeting
from documents.tasks import CreateBoardbook
s = subtask("hello", args=(), kwargs={})
@task
def hello():
print "Hello"
boardbook_task = CreateBoardbook()
user = BRUser.objects.get(email='[email protected]')
meeting = Meeting.objects.get(id=14)
boardbook_task.delay(meeting, user, True, True)
boardbook_task.subtask(dict(s))
from celery.signals import task_sent, task_prerun, task_postrun
@task_sent.connect
def task_sent_handler(sender="documents.tasks.CreateBoardbook", **kwds):
print "Got signal task_sent"
# Not working
@task_postrun.connect
def task_postrun_handler(sender="documents.tasks.CreateBoardbook", **kwds):
print "Got signal task_postrun"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment