Skip to content

Instantly share code, notes, and snippets.

@andreif
Created April 22, 2018 20:48
Show Gist options
  • Select an option

  • Save andreif/d4c32d02eeaf3b861ed23b0f2f54a937 to your computer and use it in GitHub Desktop.

Select an option

Save andreif/d4c32d02eeaf3b861ed23b0f2f54a937 to your computer and use it in GitHub Desktop.
# coding=utf-8
import json
from urlparse import urljoin
import celery.states
from django import http
from django.conf import settings
from django.conf.urls import url
from django.contrib import admin
from django.http import Http404
from django.shortcuts import render
from django.utils.translation import ugettext_lazy as _
from utils.admin import choice_filter, admin_link
from . import api, enums, models
class JobLogInline(admin.TabularInline):
model = models.JobLog
fields = readonly_fields = ['created', 'status', 'message', 'user',
'attempt', 'task_id', 'path']
can_delete = False
extra = 0
ordering = ('-created',)
def has_add_permission(self, request):
return False
class JobAdmin(admin.ModelAdmin):
list_display = ['id', '_name', 'status', 'started_at', 'current',
'progress', 'finished_at', 'attempt', 'user', 'created']
fieldsets = [(None, {'fields': [
'name', '_link', 'path', 'status', 'started_at', 'finished_at',
'current', 'progress', 'description', 'created', 'modified', 'user',
'lock', 'attempt', 'task_id', 'params',
]})]
search_fields = ['name']
list_filter = [
'status',
choice_filter('name', get_value=lambda x: x.split('.')[-1]),
]
inlines = [JobLogInline]
ordering = ('-created',)
date_hierarchy = 'created'
actions = ['cancel_job', 'abort_job', 'retry_job', 'unlock_job']
def has_add_permission(self, request):
return False
def has_delete_permission(self, request, obj=None):
return False
def get_readonly_fields(self, request, obj=None):
if self.declared_fieldsets:
return sum((list(conf['fields'])
for fs, conf in self.declared_fieldsets), [])
return super(JobAdmin, self).get_readonly_fields(
request=request, obj=obj)
def get_actions(self, request):
actions = super(JobAdmin, self).get_actions(request)
actions.pop('delete_selected', None)
return actions
def get_urls(self):
return [
url(r'^task/(.+)/progress/$',
self.admin_site.admin_view(self.task_progress),
name='task_progress'),
] + super(JobAdmin, self).get_urls()
def abort_job(self, request, queryset):
for job_id in queryset.filter(status=enums.JobStatus.STARTED)\
.values_list('id', flat=True):
api.aborting_job(job_id=job_id, user_id=request.user.id)
abort_job.short_description = _('Abort')
def unlock_job(self, request, queryset):
queryset.filter(status__in=[
enums.JobStatus.CANCELED,
enums.JobStatus.ABORTED,
enums.JobStatus.FAILED,
enums.JobStatus.PENDING,
]).update(lock=None)
unlock_job.short_description = _('Unlock')
def cancel_job(self, request, queryset):
for job_id in queryset.filter(status=enums.JobStatus.PENDING)\
.values_list('id', flat=True):
api.cancel_job(job_id=job_id, user_id=request.user.id)
cancel_job.short_description = _('Cancel job')
def retry_job(self, request, queryset):
for job_id in queryset.filter(status__in=[
enums.JobStatus.CANCELED,
enums.JobStatus.ABORTED,
enums.JobStatus.FAILED,
enums.JobStatus.PENDING,
]).values_list('id', flat=True):
api.retry_job(job_id=job_id, user_id=request.user.id)
retry_job.short_description = _('Retry')
def _link(self, obj):
if obj.url:
if obj.url.startswith('/'):
base = 'https://{}'.format(settings.BASE_HOST)
url = urljoin(base, obj.url)
else:
url = obj.url
if obj.status == enums.JobStatus.SUCCESS:
return '<a href="%s">%s</a>' % (url, obj.url)
else:
return obj.url
_link.allow_tags = True
_link.short_description = _('link')
def _name(self, obj):
name = obj.short_name
if obj.lock:
name = u'🔒&nbsp;' + name
return name
_name.allow_tags = True
_name.short_description = _('name')
def task_progress(self, request, task_id):
from proj.celery import app
name = request.GET.get('name')
_job = api.get_job_by_task_id(task_id=task_id)
if not name:
if not _job:
raise Http404
name = _job.name
task = app.tasks.get(name)
if not task:
raise Http404
result = task.AsyncResult(task_id)
if result.state == celery.states.SUCCESS and \
_job and _job.status == enums.JobStatus.STARTED:
result.state = celery.states.STARTED
status = enums.JobStatus.get(_job.status).name if _job else None
is_failed = result.state == celery.states.FAILURE or (
_job and _job.status == enums.JobStatus.FAILED)
is_success = result.state == celery.states.SUCCESS and (
not _job or _job.status == enums.JobStatus.SUCCESS)
is_pending = result.state == celery.states.PENDING or (
_job and _job.status == enums.JobStatus.PENDING)
progress = int(_job.progress or 100 if _job else 100)
if is_failed:
progress = progress or 100
url = _job.url if is_success and _job and _job.url else None
if request.is_ajax():
data = {
'status': result.state,
'job_status': status,
'progress': progress,
'url': url,
}
return http.HttpResponse(json.dumps(data),
content_type='application/json')
context = {
'name': name.split('.')[-1],
'task_id': task_id,
'status': result.state,
'job_status': status,
'is_failed': is_failed,
'is_success': is_success,
'is_pending': is_pending,
'is_active': result.state not in 'SUCCESS FAILURE'.split(),
'progress': progress,
'url': url,
}
return render(request, 'admin/job/task_progress.html', context)
class JobLogAdmin(admin.ModelAdmin):
list_display = ['id', '_job_id', 'status', 'message', 'attempt',
'_user', 'created', 'task_id']
fieldsets = [(None, {'fields': [
'id', '_job_id', 'status', 'message', '_user', 'path', 'attempt',
'task_id', 'created', 'modified',
]})]
search_fields = ['name']
list_filter = ['status']
ordering = ('-created',)
date_hierarchy = 'created'
def has_add_permission(self, request):
return False
def has_delete_permission(self, request, obj=None):
return False
def get_readonly_fields(self, request, obj=None):
if self.declared_fieldsets:
return sum((list(conf['fields'])
for fs, conf in self.declared_fieldsets), [])
return super(JobLogAdmin, self).get_readonly_fields(
request=request, obj=obj)
def get_actions(self, request):
actions = super(JobLogAdmin, self).get_actions(request)
actions.pop('delete_selected', None)
return actions
def _job_id(self, obj):
return str(obj.job.id) + ' ' + admin_link(obj.job, text=obj.job.short_name)
_job_id.allow_tags = True
_job_id.short_description = _('job')
def _user(self, obj):
if obj.user:
return admin_link(obj.user)
_user.allow_tags = True
_user.short_description = _('user')
class CeleryTaskAdmin(admin.ModelAdmin):
list_display = ['id', 'task_id', 'status', 'date_done']
fieldsets = [(None, {'fields': [
'id', 'task_id', 'status', 'date_done',
# 'traceback',
]})]
search_fields = ['task_id']
list_filter = ['status']
ordering = ('-date_done',)
date_hierarchy = 'date_done'
def has_add_permission(self, request):
return False
def has_delete_permission(self, request, obj=None):
return False
def get_readonly_fields(self, request, obj=None):
if self.declared_fieldsets:
return sum((list(conf['fields'])
for fs, conf in self.declared_fieldsets), [])
return super(CeleryTaskAdmin, self).get_readonly_fields(
request=request, obj=obj)
def get_actions(self, request):
actions = super(CeleryTaskAdmin, self).get_actions(request)
actions.pop('delete_selected', None)
return actions
class CeleryTaskSetAdmin(admin.ModelAdmin):
list_display = ['id', 'taskset_id', 'date_done']
fieldsets = [(None, {'fields': ['id', 'taskset_id', 'date_done']})]
search_fields = ['taskset_id']
ordering = ('-date_done',)
date_hierarchy = 'date_done'
def has_add_permission(self, request):
return False
def has_delete_permission(self, request, obj=None):
return False
def get_readonly_fields(self, request, obj=None):
if self.declared_fieldsets:
return sum((list(conf['fields'])
for fs, conf in self.declared_fieldsets), [])
return super(CeleryTaskSetAdmin, self).get_readonly_fields(
request=request, obj=obj)
def get_actions(self, request):
actions = super(CeleryTaskSetAdmin, self).get_actions(request)
actions.pop('delete_selected', None)
return actions
admin.site.register(models.Job, JobAdmin)
admin.site.register(models.JobLog, JobLogAdmin)
admin.site.register(models.CeleryTask, CeleryTaskAdmin)
admin.site.register(models.CeleryTaskSet, CeleryTaskSetAdmin)
import multiprocessing
import socket
import threading
from contextlib import contextmanager
from functools import partial
import logging
import time
from celery import Task
from django.core.exceptions import ImproperlyConfigured
from django.db import transaction
from django.utils import timezone
from django.utils.module_loading import import_string
from .exceptions import JobAborted
from . import enums, models
__log__ = logging.getLogger(__name__)
def create_job(name, user_id=None, description=None, params=None):
__log__.debug(u"job.api.create_job [%s, %s, %s, %r]",
name, user_id, description, params)
job = models.Job.objects.create(
name=name, description=description, user_id=user_id, params=params)
return job.as_named_tuple()
def _get_job_instance(job_id, for_update=False):
try:
qs = models.Job.objects
if for_update:
qs = qs.select_for_update()
job = qs.get(id=job_id)
except models.Job.DoesNotExist:
__log__.exception(u"Cannot find job %r", job_id)
return
else:
return job
def get_job(job_id):
__log__.debug(u"job.api.get_job [%s]", job_id)
job = _get_job_instance(job_id=job_id)
if job:
return job.as_named_tuple()
def get_job_by_task_id(task_id):
try:
job = models.Job.objects.get(task_id=task_id)
except models.Job.DoesNotExist:
__log__.debug(u"Job not found by task id: %r", task_id)
return
else:
return job.as_named_tuple()
def list_succeeded_jobs(name, limit=5):
if isinstance(name, Task):
name = name.run.__module__ + '.' + name.__name__
elif callable(name):
name = name.__module__ + '.' + name.__name__
qs = models.Job.objects.filter(name=name, status=enums.JobStatus.SUCCESS)
qs = qs.order_by('-created')[:limit]
return qs.tuples()
@transaction.atomic
def update_job(job_id, user_id=None, message=None, **kwargs):
__log__.debug(u"job.api.update_job %r",
[job_id, user_id, message, kwargs])
job = _get_job_instance(job_id=job_id, for_update=True)
if not job:
__log__.warning(u"Job not found: %s", job_id)
return False
if user_id:
job.user_id = user_id
for key, value in kwargs.items():
if key not in models.Job.allowed_fields:
__log__.warning(u"Field not allowed: %r", key)
continue
if key == 'status':
if job.status != value == enums.JobStatus.STARTED:
job.started_at = timezone.now()
job.progress = 0
job.attempt += 1
if job.status != value in [enums.JobStatus.SUCCESS,
enums.JobStatus.FAILED,
enums.JobStatus.ABORTED]:
job.finished_at = timezone.now()
if value == enums.JobStatus.SUCCESS:
job.progress = 100
if job.status != value:
models.JobLog.objects.create(
job=job, status=value, user_id=user_id, attempt=job.attempt,
message=message, path=kwargs.get('path') or job.path,
task_id=kwargs.get('task_id', job.task_id))
setattr(job, key, value)
job.save()
return job.as_named_tuple()
start_job = partial(update_job, status=enums.JobStatus.STARTED)
cancel_job = partial(update_job, status=enums.JobStatus.CANCELED)
fail_job = partial(update_job, status=enums.JobStatus.FAILED)
aborting_job = partial(update_job, status=enums.JobStatus.ABORTING)
abort_job = partial(update_job, status=enums.JobStatus.ABORTED)
finish_job = partial(update_job, status=enums.JobStatus.SUCCESS)
def is_job_aborting(job_id):
job = get_job(job_id)
if job:
return job.status == enums.JobStatus.ABORTING
def progress_job(job_id, current=0, total=None, start_time=None,
last_time=None, interval=5, current_fmt=None):
curr_time = time.time()
if not last_time or curr_time - last_time > interval:
pct = 0
if start_time and total:
eta = (total - current) / current * (curr_time - start_time)
eta = u", running for %d sec, ETA in %d sec" % (
curr_time - start_time, eta)
else:
eta = ''
j = u"Job %s" % job_id if job_id else ''
if total:
pct = 100 * float(current) / total
__log__.info(u"%s progress: %r of %r (%d%%%s)", j, current, total, pct, eta)
else:
__log__.info(u"%s progress: %r", j, current)
if job_id:
if last_time:
if not current_fmt:
current_fmt = '{current}'
if total:
current_fmt += ' of {total}'
current_formatted = current_fmt.format(
current=current, total=total)
update_job(job_id=job_id, progress=int(pct),
current=current_formatted)
if is_job_aborting(job_id=job_id):
raise JobAborted(job_id=job_id)
last_time = curr_time
return last_time, current
def progress_partial(job_id, total=None, **kwargs):
progress = partial(progress_job, job_id=job_id, total=total, **kwargs)
last_time, current = progress()
progress = partial(progress, start_time=last_time)
return progress, last_time, current
@transaction.atomic
def lock_job(job_id, lock=None, wait=1):
__log__.debug(u"job.api.lock_job %r", [job_id, lock, wait])
if lock is None:
lock = '%s:%s:%s:%s:%s' % (
socket.gethostname(),
multiprocessing.current_process().name,
multiprocessing.current_process().ident,
threading.current_thread().name,
threading.current_thread().ident)
job = _get_job_instance(job_id=job_id, for_update=True)
if not job:
return False
if job.lock:
if job.lock == lock:
__log__.debug(u"job.api.lock_job [%s, %s]: already locked to %s",
job_id, lock, job.lock)
return lock
else:
__log__.warning(
u"job.api.lock_job [%s, %s]: already locked by another lock %r",
job_id, lock, job.lock)
return False
else:
# TODO: filter and update?
job.lock = lock
job.save()
time.sleep(wait)
job = _get_job_instance(job_id=job_id)
if job.lock == lock:
__log__.debug(u"job.api.lock_job [%r, %r]: success", job_id, lock)
return lock
__log__.debug(
u"job.api.lock_job [%r, %r]: failed, locked over by %r",
job_id, lock, job.lock)
@transaction.atomic
def unlock_job(job_id, lock):
__log__.debug(u"job.api.unlock_job %r", [job_id, lock])
job = _get_job_instance(job_id=job_id, for_update=True)
if job.lock == lock:
job.lock = None
job.save()
return True
else:
__log__.warning(u"Cannot unlock job %r with lock %r, it locked by %r!",
job_id, lock, job.lock)
@contextmanager
def locked_job(job_id):
lock = lock_job(job_id=job_id)
yield lock
unlock_job(job_id=job_id, lock=lock)
def lock_and_start_job(job_id, **kwargs):
if not job_id:
return
lock = lock_job(job_id=job_id)
assert lock
assert start_job(job_id=job_id, **kwargs)
return lock
def finish_and_unlock_job(job_id, lock, **kwargs):
if job_id:
assert finish_job(job_id=job_id, **kwargs)
assert unlock_job(job_id=job_id, lock=lock)
def abort_and_unlock_job(job_id, lock, exc=None, **kwargs):
if job_id:
__log__.debug(u"Job aborted: %r", job_id)
if exc:
assert exc.job_id == job_id
assert abort_job(job_id=job_id, **kwargs)
assert unlock_job(job_id=job_id, lock=lock)
def fail_and_unlock_job(job_id, lock, exc=None):
if job_id:
assert fail_job(job_id=job_id, message=repr(exc))
assert unlock_job(job_id=job_id, lock=lock)
def get_job_admin_url(job_id):
return models.Job.get_admin_url(job_id=job_id)
def _get_func_and_path(func):
if isinstance(func, basestring):
func_path = func
func = import_string(dotted_path=func_path)
elif isinstance(func, Task):
func_path = func.run.__module__ + '.' + func.__name__
else:
func_path = func.__module__ + '.' + func.__name__
return func, func_path
def _get_user_id(user_id=None, user=None, request=None):
if request and not user:
user = request.user
if user and not user_id:
user_id = user.id
return user_id
def delay_task(func, user_id=None, user=None, request=None, params=None):
func, func_path = _get_func_and_path(func=func)
user_id = _get_user_id(user_id=user_id, user=user, request=request)
desc = unicode(params) if params is not None else None
if params is None:
params = {}
_job = create_job(name=func_path, description=desc, user_id=user_id,
params=params)
params['job_id'] = _job.id
async_result = func.delay(**params)
_job = update_job(job_id=_job.id, task_id=async_result.id)
return _job
@transaction.atomic
def retry_job(job_id, user_id=None):
job = get_job(job_id=job_id)
if not job:
return
try:
func = import_string(job.name)
except (ImportError, ImproperlyConfigured):
__log__.exception(u"Cannot import job function: %r", [job_id, job.name])
return
assert update_job(job_id=job_id, status=enums.JobStatus.PENDING,
user_id=user_id, progress=None, task_id=None)
params = dict(job_id=job_id, **job.params)
async_result = func.delay(**params)
_job = update_job(job_id=job_id, task_id=async_result.id)
return _job
class JobException(Exception):
def __init__(self, job_id, *args):
self.job_id = job_id
super(JobException, self).__init__(*args)
class JobAborted(JobException):
pass
from django_enumfield.enum import Enum
class JobStatus(Enum):
PENDING = 10
STARTED = 20
SUCCESS = 30
FAILED = 40
CANCELED = 50
ABORTED = 60
ABORTING = 70
_transitions = {
CANCELED: (PENDING,),
STARTED: (PENDING,),
ABORTING: (STARTED,),
ABORTED: (ABORTING,),
FAILED: (STARTED, ABORTING, PENDING),
SUCCESS: (STARTED, ABORTING,),
PENDING: (CANCELED, ABORTED, FAILED),
}
import celery.states
from django.conf import settings
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django_enumfield.db.fields import EnumField
from django_extensions.db.fields.json import JSONField
from model_utils.models import TimeStampedModel
from monkey_api import ApiModel, ApiForeignKey
from utils.admin import admin_url
from . import enums
class Job(ApiModel, TimeStampedModel):
name = models.CharField(max_length=255)
url = models.URLField(null=True)
path = models.CharField(max_length=255, null=True)
status = EnumField(enums.JobStatus)
started_at = models.DateTimeField(null=True)
finished_at = models.DateTimeField(null=True)
progress = models.FloatField(null=True)
current = models.CharField(max_length=255, null=True)
description = models.TextField(null=True)
user = ApiForeignKey(settings.AUTH_USER_MODEL, null=True)
lock = models.CharField(max_length=255, null=True)
attempt = models.PositiveSmallIntegerField(default=0)
task_id = models.CharField(max_length=255, null=True)
params = JSONField(null=True)
allowed_fields = ['url', 'path', 'status', 'progress', 'description',
'task_id', 'current']
class Meta:
verbose_name = _('job')
verbose_name_plural = _('jobs')
class Api:
fields = ('name', 'url', 'path', 'status', 'started_at', 'finished_at',
'progress', 'description', 'user_id', 'lock', 'attempt', 'id',
'task_id', 'params', 'current', 'created', 'modified')
extra = ('admin_url', 'short_name')
def __unicode__(self):
return 'Job %s: %s' % (self.pk, self.name)
@classmethod
def get_admin_url(cls, job_id):
return admin_url(instance=(cls._meta.app_label,
cls._meta.model_name, job_id))
@property
def admin_url(self):
return self.get_admin_url(job_id=self.pk)
@property
def short_name(self):
return self.name.split('.')[-1]
class JobLog(ApiModel, TimeStampedModel):
job = ApiForeignKey(to=Job)
status = EnumField(enums.JobStatus)
message = models.TextField(null=True)
user = ApiForeignKey(settings.AUTH_USER_MODEL, null=True)
attempt = models.PositiveSmallIntegerField(default=0)
path = models.CharField(max_length=255, null=True)
task_id = models.CharField(max_length=255, null=True)
class Meta:
verbose_name = _('job log')
verbose_name_plural = _('job logs')
class Api:
fields = ('id', 'job_id', 'status', 'message', 'user_id', 'attempt',
'path', 'created', 'modified')
extra = ('admin_url',)
class CeleryTask(models.Model):
status_choices = [(s, s) for s in celery.states.ALL_STATES]
task_id = models.CharField(max_length=255, unique=True)
status = models.CharField(max_length=50, choices=status_choices)
result = models.BinaryField(null=True)
date_done = models.DateTimeField(auto_now=True, null=True)
traceback = models.TextField(null=True)
class Meta:
db_table = 'celery_taskmeta'
class CeleryTaskSet(models.Model):
taskset_id = models.CharField(max_length=255, unique=True)
result = models.BinaryField(null=True)
date_done = models.DateTimeField(auto_now_add=True, null=True)
class Meta:
db_table = 'celery_tasksetmeta'
CELERYBEAT_SCHEDULE = {
'do_stuff': {
'task': 'schedule_job_task',
'kwargs': {
'func': 'some_task',
'notify': True,
},
'schedule': crontab(hour=0, minute=0, day_of_week=0),
},
}
@app.task
def schedule_job_task(func, **params):
log.debug('schedule_job_task %r', [func, params])
_job = job.api.delay_task(func=func, params=params)
_id = _job.id if _job else _job
log.debug('job.id = %r', _id)
return _id
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment