Last active
September 12, 2016 19:43
-
-
Save mattbierbaum/e9424854cfb256091139e9301079b7ed to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import uuid | |
from celery import signals | |
from celery import Celery, Task | |
from celery.utils.log import get_task_logger | |
app = Celery('simple_celery') | |
logger = get_task_logger(__name__) | |
class Worker(object): | |
def __init__(self): | |
logger.fatal("INIT") | |
self.calls = 0 | |
self.logger = logger | |
self.uuid = uuid.uuid4() | |
def run(self, n): | |
self.calls += 1 | |
self.logger.fatal( | |
'PID: {}, UUID: {}, RUN: {}, CALLS: {}'.format( | |
os.getpid(), self.uuid, n, self.calls | |
) | |
) | |
return 2*n | |
def echo(self, text): | |
self.calls += 1 | |
self.logger.fatal( | |
'PID: {}, UUID: {}, ECHO: {}, CALLS: {}'.format( | |
os.getpid(), self.uuid, text, self.calls | |
) | |
) | |
return text | |
class AbstractTask(Task): | |
abstract = True | |
_obj = None | |
@property | |
def obj(self): | |
if AbstractTask._obj is None: | |
AbstractTask._obj = self.new() | |
return AbstractTask._obj | |
def new(self): | |
return Worker() | |
registry = ['run', 'echo'] | |
task_kwargs = {'bind': True} | |
def load_tasks(): | |
tasks = [] | |
for task in registry: | |
fullname = 'simple_celery.{}'.format(task) | |
wrap2 = app.task(base=AbstractTask, name=fullname, **task_kwargs) | |
wrap1 = taskcall(task) | |
def stub(*args, **kwargs): | |
pass | |
tasks.append(wrap2(wrap1(stub))) | |
return tasks | |
def taskcall(obj_func): | |
def task_wrapper(func): | |
def _wrap(self, *args, **kwargs): | |
# reference to self are given by `bind` task argument | |
try: | |
return getattr(self.obj, obj_func)(*args, **kwargs) | |
except (SystemExit, KeyboardInterrupt) as e: | |
self.retry(exc=e) | |
except Exception as e: | |
self.obj.logger.exception( | |
'%r had an exception\n%r' % (obj_func, e) | |
) | |
raise e | |
return _wrap | |
return task_wrapper | |
_tasks = load_tasks() | |
app.send_task('simple_celery.run', kwargs={'n': 10}) | |
app.send_task('simple_celery.run', kwargs={'n': 10}) | |
app.send_task('simple_celery.echo', kwargs={'text': 'break, please'}) | |
app.send_task('simple_celery.echo', kwargs={'text': 'break, please'}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Run with
celery -A simple_celery worker --concurrency=1