Created
May 12, 2017 08:05
-
-
Save WorldException/8bfe0ae098af96b1c2f9f08f59ab3461 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#coding:utf8 | |
from __future__ import unicode_literals | |
from celery import current_app, task | |
from functools import wraps | |
import logging | |
import simplejson | |
import time | |
mylog = logging.getLogger(__name__) | |
def active_tasks(): | |
""" | |
Текущие запущенные задачи | |
:return: | |
""" | |
i = current_app.control.inspect() | |
workers = i.active() | |
if workers: | |
for worker, tasks in workers.items(): | |
for run_task in tasks: | |
yield worker, run_task | |
def skip_if_running(f): | |
u""" | |
не запускает задачу с такими же параметрами если она уже в обработке | |
обязательно обрамлять так | |
@task(bind=True) | |
@run_utils.skip_if_running | |
def evalute_by_id(id, name=None): | |
pass | |
https://celery.readthedocs.org/en/latest/userguide/tasks.html#context | |
http://docs.celeryproject.org/en/latest/userguide/workers.html#inspecting-workers | |
""" | |
task_name = '%s.%s' % (f.__module__, f.__name__) | |
mylog.info('task single instance only %s' % task_name) | |
@wraps(f) | |
def fun(self, *args, **kwargs): | |
uargs = str(args) | |
ukwargs = str(kwargs) | |
for worker, run_task in active_tasks(): | |
if task_name == run_task['name'] and uargs == run_task['args'] and ukwargs == run_task['kwargs'] and self.request.id != run_task['id']: | |
ret_str = 'task %s (%s, %s) is started on %s, skip current %s' % (task_name, uargs, ukwargs, worker, self.request.id) | |
mylog.warning(ret_str) | |
return 'skip' | |
#else: | |
# mylog.warning('Нет активных воркеров, такого не должно быть i.active()=%s, task: %s' % (unicode(workers), task_name)) | |
return f(*args, **kwargs) | |
return fun | |
def task_one_instance(f=None, **kwargs): | |
""" | |
декоратор для задачи celery запускаемой в одном экземпляре | |
Не требует дополнительных декораторов и поддерживает дополнительные атрибуты task | |
@task_one_instance | |
def task_simple(): | |
pass | |
@task_one_instance(queue='test') | |
def task.... | |
:param f: | |
:param kwargs: | |
:return: | |
""" | |
if not f is None: | |
# значит без параметров | |
return task(bind=True)(skip_if_running(f)) | |
else: | |
return task(bind=True, **kwargs)(skip_if_running(f)) | |
@task_one_instance | |
def one_instance_test_task(*args, **kwargs): | |
""" задачка для тестирования """ | |
mylog.warning('one_instance_test_task({0}, {1})'.format(simplejson.dumps(args), simplejson.dumps(kwargs))) | |
time.sleep(10) | |
return 'ok' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment