Created
May 10, 2020 18:46
-
-
Save DannyAziz/46a33b3a0ce3e56a961f7a4464c289ce to your computer and use it in GitHub Desktop.
celerystalk.io - Celery Class Tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Celery | |
app = Celery('NAME', broker='BROKER') | |
class BaseTask(app.Task): | |
def on_failure(self, exc, task_id, args, kwargs, einfo): | |
logging_tool.log(exc, task_id, args, kwargs, einfo) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from base_task import BaseTask | |
@app.task(base=BaseTask) | |
def add(x, y): | |
return x + y |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Celery | |
app = Celery('NAME', task_cls='your.module.path:BaseTask') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Celery | |
app = Celery('NAME', broker='BROKER') | |
@task(rate_limit='100/s') | |
def task(): | |
pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Celery | |
import time | |
app = Celery('NAME', broker='BROKER') | |
class Count(): | |
def __init__(self): | |
self.pos = 1 | |
def counter(self): | |
self.pos += 1 | |
class Counter(app.Task, Count): | |
def run(self, wait_time): | |
time.sleep(wait_time) | |
self.counter() | |
print(self.pos) | |
CounterTask = app.register_task(Counter()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Celery | |
import time | |
app = Celery('NAME', broker='BROKER') | |
class Count(): | |
def __init__(self): | |
self.pos = 1 | |
def counter(self): | |
self.pos += 1 | |
class Counter(app.Task): | |
def run(self, wait_time): | |
self.count_handler = Count() | |
time.sleep(wait_time) | |
self.count_handler.counter() | |
print(self.count_handler.pos) | |
CounterTask = app.register_task(Counter()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Celery | |
app = Celery('NAME', broker='BROKER') | |
class ProfileCreate(app.Task): | |
def run(self, user_id, name, email, avatar): | |
self.add_email_to_mailing_list(email) | |
self.upload_avatar_to_s3(avatar) | |
def add_email_to_mailing_list(self, email): | |
mailchimp.add_to_list(email) | |
def upload_avatar_to_s3(self, avatar): | |
s3.upload(avatar) | |
ProfileCreateTask = app.register_task(ProfileCreate()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ProfileCreateTask.delay((42, "Tony Stark", "[email protected]", image_data)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from base_task import BaseTask | |
class ProfileCreate(BaseTask): | |
def run(self, user_id, first_name, last_name, email, avatar): | |
self.add_email_to_mailing_list(email) | |
self.upload_avatar_to_s3(avatar) | |
def add_email_to_mailing_list(self, email): | |
mailchimp.add_to_list(email) | |
def upload_avatar_to_s3(self, avatar): | |
s3.upload(avatar) | |
ProfileCreateTask = app.register_task(ProfileCreate()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment