Skip to content

Instantly share code, notes, and snippets.

@WorldException
Created May 12, 2017 08:05
Show Gist options
  • Save WorldException/8bfe0ae098af96b1c2f9f08f59ab3461 to your computer and use it in GitHub Desktop.
Save WorldException/8bfe0ae098af96b1c2f9f08f59ab3461 to your computer and use it in GitHub Desktop.
#coding:utf8
from __future__ import unicode_literals
from celery import current_app, task
from functools import wraps
import logging
import simplejson
import time
mylog = logging.getLogger(__name__)
def active_tasks():
"""
Текущие запущенные задачи
:return:
"""
i = current_app.control.inspect()
workers = i.active()
if workers:
for worker, tasks in workers.items():
for run_task in tasks:
yield worker, run_task
def skip_if_running(f):
u"""
не запускает задачу с такими же параметрами если она уже в обработке
обязательно обрамлять так
@task(bind=True)
@run_utils.skip_if_running
def evalute_by_id(id, name=None):
pass
https://celery.readthedocs.org/en/latest/userguide/tasks.html#context
http://docs.celeryproject.org/en/latest/userguide/workers.html#inspecting-workers
"""
task_name = '%s.%s' % (f.__module__, f.__name__)
mylog.info('task single instance only %s' % task_name)
@wraps(f)
def fun(self, *args, **kwargs):
uargs = str(args)
ukwargs = str(kwargs)
for worker, run_task in active_tasks():
if task_name == run_task['name'] and uargs == run_task['args'] and ukwargs == run_task['kwargs'] and self.request.id != run_task['id']:
ret_str = 'task %s (%s, %s) is started on %s, skip current %s' % (task_name, uargs, ukwargs, worker, self.request.id)
mylog.warning(ret_str)
return 'skip'
#else:
# mylog.warning('Нет активных воркеров, такого не должно быть i.active()=%s, task: %s' % (unicode(workers), task_name))
return f(*args, **kwargs)
return fun
def task_one_instance(f=None, **kwargs):
"""
декоратор для задачи celery запускаемой в одном экземпляре
Не требует дополнительных декораторов и поддерживает дополнительные атрибуты task
@task_one_instance
def task_simple():
pass
@task_one_instance(queue='test')
def task....
:param f:
:param kwargs:
:return:
"""
if not f is None:
# значит без параметров
return task(bind=True)(skip_if_running(f))
else:
return task(bind=True, **kwargs)(skip_if_running(f))
@task_one_instance
def one_instance_test_task(*args, **kwargs):
""" задачка для тестирования """
mylog.warning('one_instance_test_task({0}, {1})'.format(simplejson.dumps(args), simplejson.dumps(kwargs)))
time.sleep(10)
return 'ok'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment