Created
April 22, 2018 20:48
-
-
Save andreif/d4c32d02eeaf3b861ed23b0f2f54a937 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # coding=utf-8 | |
| import json | |
| from urlparse import urljoin | |
| import celery.states | |
| from django import http | |
| from django.conf import settings | |
| from django.conf.urls import url | |
| from django.contrib import admin | |
| from django.http import Http404 | |
| from django.shortcuts import render | |
| from django.utils.translation import ugettext_lazy as _ | |
| from utils.admin import choice_filter, admin_link | |
| from . import api, enums, models | |
| class JobLogInline(admin.TabularInline): | |
| model = models.JobLog | |
| fields = readonly_fields = ['created', 'status', 'message', 'user', | |
| 'attempt', 'task_id', 'path'] | |
| can_delete = False | |
| extra = 0 | |
| ordering = ('-created',) | |
| def has_add_permission(self, request): | |
| return False | |
| class JobAdmin(admin.ModelAdmin): | |
| list_display = ['id', '_name', 'status', 'started_at', 'current', | |
| 'progress', 'finished_at', 'attempt', 'user', 'created'] | |
| fieldsets = [(None, {'fields': [ | |
| 'name', '_link', 'path', 'status', 'started_at', 'finished_at', | |
| 'current', 'progress', 'description', 'created', 'modified', 'user', | |
| 'lock', 'attempt', 'task_id', 'params', | |
| ]})] | |
| search_fields = ['name'] | |
| list_filter = [ | |
| 'status', | |
| choice_filter('name', get_value=lambda x: x.split('.')[-1]), | |
| ] | |
| inlines = [JobLogInline] | |
| ordering = ('-created',) | |
| date_hierarchy = 'created' | |
| actions = ['cancel_job', 'abort_job', 'retry_job', 'unlock_job'] | |
| def has_add_permission(self, request): | |
| return False | |
| def has_delete_permission(self, request, obj=None): | |
| return False | |
| def get_readonly_fields(self, request, obj=None): | |
| if self.declared_fieldsets: | |
| return sum((list(conf['fields']) | |
| for fs, conf in self.declared_fieldsets), []) | |
| return super(JobAdmin, self).get_readonly_fields( | |
| request=request, obj=obj) | |
| def get_actions(self, request): | |
| actions = super(JobAdmin, self).get_actions(request) | |
| actions.pop('delete_selected', None) | |
| return actions | |
| def get_urls(self): | |
| return [ | |
| url(r'^task/(.+)/progress/$', | |
| self.admin_site.admin_view(self.task_progress), | |
| name='task_progress'), | |
| ] + super(JobAdmin, self).get_urls() | |
| def abort_job(self, request, queryset): | |
| for job_id in queryset.filter(status=enums.JobStatus.STARTED)\ | |
| .values_list('id', flat=True): | |
| api.aborting_job(job_id=job_id, user_id=request.user.id) | |
| abort_job.short_description = _('Abort') | |
| def unlock_job(self, request, queryset): | |
| queryset.filter(status__in=[ | |
| enums.JobStatus.CANCELED, | |
| enums.JobStatus.ABORTED, | |
| enums.JobStatus.FAILED, | |
| enums.JobStatus.PENDING, | |
| ]).update(lock=None) | |
| unlock_job.short_description = _('Unlock') | |
| def cancel_job(self, request, queryset): | |
| for job_id in queryset.filter(status=enums.JobStatus.PENDING)\ | |
| .values_list('id', flat=True): | |
| api.cancel_job(job_id=job_id, user_id=request.user.id) | |
| cancel_job.short_description = _('Cancel job') | |
| def retry_job(self, request, queryset): | |
| for job_id in queryset.filter(status__in=[ | |
| enums.JobStatus.CANCELED, | |
| enums.JobStatus.ABORTED, | |
| enums.JobStatus.FAILED, | |
| enums.JobStatus.PENDING, | |
| ]).values_list('id', flat=True): | |
| api.retry_job(job_id=job_id, user_id=request.user.id) | |
| retry_job.short_description = _('Retry') | |
| def _link(self, obj): | |
| if obj.url: | |
| if obj.url.startswith('/'): | |
| base = 'https://{}'.format(settings.BASE_HOST) | |
| url = urljoin(base, obj.url) | |
| else: | |
| url = obj.url | |
| if obj.status == enums.JobStatus.SUCCESS: | |
| return '<a href="%s">%s</a>' % (url, obj.url) | |
| else: | |
| return obj.url | |
| _link.allow_tags = True | |
| _link.short_description = _('link') | |
| def _name(self, obj): | |
| name = obj.short_name | |
| if obj.lock: | |
| name = u'🔒 ' + name | |
| return name | |
| _name.allow_tags = True | |
| _name.short_description = _('name') | |
| def task_progress(self, request, task_id): | |
| from proj.celery import app | |
| name = request.GET.get('name') | |
| _job = api.get_job_by_task_id(task_id=task_id) | |
| if not name: | |
| if not _job: | |
| raise Http404 | |
| name = _job.name | |
| task = app.tasks.get(name) | |
| if not task: | |
| raise Http404 | |
| result = task.AsyncResult(task_id) | |
| if result.state == celery.states.SUCCESS and \ | |
| _job and _job.status == enums.JobStatus.STARTED: | |
| result.state = celery.states.STARTED | |
| status = enums.JobStatus.get(_job.status).name if _job else None | |
| is_failed = result.state == celery.states.FAILURE or ( | |
| _job and _job.status == enums.JobStatus.FAILED) | |
| is_success = result.state == celery.states.SUCCESS and ( | |
| not _job or _job.status == enums.JobStatus.SUCCESS) | |
| is_pending = result.state == celery.states.PENDING or ( | |
| _job and _job.status == enums.JobStatus.PENDING) | |
| progress = int(_job.progress or 100 if _job else 100) | |
| if is_failed: | |
| progress = progress or 100 | |
| url = _job.url if is_success and _job and _job.url else None | |
| if request.is_ajax(): | |
| data = { | |
| 'status': result.state, | |
| 'job_status': status, | |
| 'progress': progress, | |
| 'url': url, | |
| } | |
| return http.HttpResponse(json.dumps(data), | |
| content_type='application/json') | |
| context = { | |
| 'name': name.split('.')[-1], | |
| 'task_id': task_id, | |
| 'status': result.state, | |
| 'job_status': status, | |
| 'is_failed': is_failed, | |
| 'is_success': is_success, | |
| 'is_pending': is_pending, | |
| 'is_active': result.state not in 'SUCCESS FAILURE'.split(), | |
| 'progress': progress, | |
| 'url': url, | |
| } | |
| return render(request, 'admin/job/task_progress.html', context) | |
| class JobLogAdmin(admin.ModelAdmin): | |
| list_display = ['id', '_job_id', 'status', 'message', 'attempt', | |
| '_user', 'created', 'task_id'] | |
| fieldsets = [(None, {'fields': [ | |
| 'id', '_job_id', 'status', 'message', '_user', 'path', 'attempt', | |
| 'task_id', 'created', 'modified', | |
| ]})] | |
| search_fields = ['name'] | |
| list_filter = ['status'] | |
| ordering = ('-created',) | |
| date_hierarchy = 'created' | |
| def has_add_permission(self, request): | |
| return False | |
| def has_delete_permission(self, request, obj=None): | |
| return False | |
| def get_readonly_fields(self, request, obj=None): | |
| if self.declared_fieldsets: | |
| return sum((list(conf['fields']) | |
| for fs, conf in self.declared_fieldsets), []) | |
| return super(JobLogAdmin, self).get_readonly_fields( | |
| request=request, obj=obj) | |
| def get_actions(self, request): | |
| actions = super(JobLogAdmin, self).get_actions(request) | |
| actions.pop('delete_selected', None) | |
| return actions | |
| def _job_id(self, obj): | |
| return str(obj.job.id) + ' ' + admin_link(obj.job, text=obj.job.short_name) | |
| _job_id.allow_tags = True | |
| _job_id.short_description = _('job') | |
| def _user(self, obj): | |
| if obj.user: | |
| return admin_link(obj.user) | |
| _user.allow_tags = True | |
| _user.short_description = _('user') | |
| class CeleryTaskAdmin(admin.ModelAdmin): | |
| list_display = ['id', 'task_id', 'status', 'date_done'] | |
| fieldsets = [(None, {'fields': [ | |
| 'id', 'task_id', 'status', 'date_done', | |
| # 'traceback', | |
| ]})] | |
| search_fields = ['task_id'] | |
| list_filter = ['status'] | |
| ordering = ('-date_done',) | |
| date_hierarchy = 'date_done' | |
| def has_add_permission(self, request): | |
| return False | |
| def has_delete_permission(self, request, obj=None): | |
| return False | |
| def get_readonly_fields(self, request, obj=None): | |
| if self.declared_fieldsets: | |
| return sum((list(conf['fields']) | |
| for fs, conf in self.declared_fieldsets), []) | |
| return super(CeleryTaskAdmin, self).get_readonly_fields( | |
| request=request, obj=obj) | |
| def get_actions(self, request): | |
| actions = super(CeleryTaskAdmin, self).get_actions(request) | |
| actions.pop('delete_selected', None) | |
| return actions | |
| class CeleryTaskSetAdmin(admin.ModelAdmin): | |
| list_display = ['id', 'taskset_id', 'date_done'] | |
| fieldsets = [(None, {'fields': ['id', 'taskset_id', 'date_done']})] | |
| search_fields = ['taskset_id'] | |
| ordering = ('-date_done',) | |
| date_hierarchy = 'date_done' | |
| def has_add_permission(self, request): | |
| return False | |
| def has_delete_permission(self, request, obj=None): | |
| return False | |
| def get_readonly_fields(self, request, obj=None): | |
| if self.declared_fieldsets: | |
| return sum((list(conf['fields']) | |
| for fs, conf in self.declared_fieldsets), []) | |
| return super(CeleryTaskSetAdmin, self).get_readonly_fields( | |
| request=request, obj=obj) | |
| def get_actions(self, request): | |
| actions = super(CeleryTaskSetAdmin, self).get_actions(request) | |
| actions.pop('delete_selected', None) | |
| return actions | |
| admin.site.register(models.Job, JobAdmin) | |
| admin.site.register(models.JobLog, JobLogAdmin) | |
| admin.site.register(models.CeleryTask, CeleryTaskAdmin) | |
| admin.site.register(models.CeleryTaskSet, CeleryTaskSetAdmin) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import multiprocessing | |
| import socket | |
| import threading | |
| from contextlib import contextmanager | |
| from functools import partial | |
| import logging | |
| import time | |
| from celery import Task | |
| from django.core.exceptions import ImproperlyConfigured | |
| from django.db import transaction | |
| from django.utils import timezone | |
| from django.utils.module_loading import import_string | |
| from .exceptions import JobAborted | |
| from . import enums, models | |
| __log__ = logging.getLogger(__name__) | |
| def create_job(name, user_id=None, description=None, params=None): | |
| __log__.debug(u"job.api.create_job [%s, %s, %s, %r]", | |
| name, user_id, description, params) | |
| job = models.Job.objects.create( | |
| name=name, description=description, user_id=user_id, params=params) | |
| return job.as_named_tuple() | |
| def _get_job_instance(job_id, for_update=False): | |
| try: | |
| qs = models.Job.objects | |
| if for_update: | |
| qs = qs.select_for_update() | |
| job = qs.get(id=job_id) | |
| except models.Job.DoesNotExist: | |
| __log__.exception(u"Cannot find job %r", job_id) | |
| return | |
| else: | |
| return job | |
| def get_job(job_id): | |
| __log__.debug(u"job.api.get_job [%s]", job_id) | |
| job = _get_job_instance(job_id=job_id) | |
| if job: | |
| return job.as_named_tuple() | |
| def get_job_by_task_id(task_id): | |
| try: | |
| job = models.Job.objects.get(task_id=task_id) | |
| except models.Job.DoesNotExist: | |
| __log__.debug(u"Job not found by task id: %r", task_id) | |
| return | |
| else: | |
| return job.as_named_tuple() | |
| def list_succeeded_jobs(name, limit=5): | |
| if isinstance(name, Task): | |
| name = name.run.__module__ + '.' + name.__name__ | |
| elif callable(name): | |
| name = name.__module__ + '.' + name.__name__ | |
| qs = models.Job.objects.filter(name=name, status=enums.JobStatus.SUCCESS) | |
| qs = qs.order_by('-created')[:limit] | |
| return qs.tuples() | |
| @transaction.atomic | |
| def update_job(job_id, user_id=None, message=None, **kwargs): | |
| __log__.debug(u"job.api.update_job %r", | |
| [job_id, user_id, message, kwargs]) | |
| job = _get_job_instance(job_id=job_id, for_update=True) | |
| if not job: | |
| __log__.warning(u"Job not found: %s", job_id) | |
| return False | |
| if user_id: | |
| job.user_id = user_id | |
| for key, value in kwargs.items(): | |
| if key not in models.Job.allowed_fields: | |
| __log__.warning(u"Field not allowed: %r", key) | |
| continue | |
| if key == 'status': | |
| if job.status != value == enums.JobStatus.STARTED: | |
| job.started_at = timezone.now() | |
| job.progress = 0 | |
| job.attempt += 1 | |
| if job.status != value in [enums.JobStatus.SUCCESS, | |
| enums.JobStatus.FAILED, | |
| enums.JobStatus.ABORTED]: | |
| job.finished_at = timezone.now() | |
| if value == enums.JobStatus.SUCCESS: | |
| job.progress = 100 | |
| if job.status != value: | |
| models.JobLog.objects.create( | |
| job=job, status=value, user_id=user_id, attempt=job.attempt, | |
| message=message, path=kwargs.get('path') or job.path, | |
| task_id=kwargs.get('task_id', job.task_id)) | |
| setattr(job, key, value) | |
| job.save() | |
| return job.as_named_tuple() | |
| start_job = partial(update_job, status=enums.JobStatus.STARTED) | |
| cancel_job = partial(update_job, status=enums.JobStatus.CANCELED) | |
| fail_job = partial(update_job, status=enums.JobStatus.FAILED) | |
| aborting_job = partial(update_job, status=enums.JobStatus.ABORTING) | |
| abort_job = partial(update_job, status=enums.JobStatus.ABORTED) | |
| finish_job = partial(update_job, status=enums.JobStatus.SUCCESS) | |
| def is_job_aborting(job_id): | |
| job = get_job(job_id) | |
| if job: | |
| return job.status == enums.JobStatus.ABORTING | |
| def progress_job(job_id, current=0, total=None, start_time=None, | |
| last_time=None, interval=5, current_fmt=None): | |
| curr_time = time.time() | |
| if not last_time or curr_time - last_time > interval: | |
| pct = 0 | |
| if start_time and total: | |
| eta = (total - current) / current * (curr_time - start_time) | |
| eta = u", running for %d sec, ETA in %d sec" % ( | |
| curr_time - start_time, eta) | |
| else: | |
| eta = '' | |
| j = u"Job %s" % job_id if job_id else '' | |
| if total: | |
| pct = 100 * float(current) / total | |
| __log__.info(u"%s progress: %r of %r (%d%%%s)", j, current, total, pct, eta) | |
| else: | |
| __log__.info(u"%s progress: %r", j, current) | |
| if job_id: | |
| if last_time: | |
| if not current_fmt: | |
| current_fmt = '{current}' | |
| if total: | |
| current_fmt += ' of {total}' | |
| current_formatted = current_fmt.format( | |
| current=current, total=total) | |
| update_job(job_id=job_id, progress=int(pct), | |
| current=current_formatted) | |
| if is_job_aborting(job_id=job_id): | |
| raise JobAborted(job_id=job_id) | |
| last_time = curr_time | |
| return last_time, current | |
| def progress_partial(job_id, total=None, **kwargs): | |
| progress = partial(progress_job, job_id=job_id, total=total, **kwargs) | |
| last_time, current = progress() | |
| progress = partial(progress, start_time=last_time) | |
| return progress, last_time, current | |
| @transaction.atomic | |
| def lock_job(job_id, lock=None, wait=1): | |
| __log__.debug(u"job.api.lock_job %r", [job_id, lock, wait]) | |
| if lock is None: | |
| lock = '%s:%s:%s:%s:%s' % ( | |
| socket.gethostname(), | |
| multiprocessing.current_process().name, | |
| multiprocessing.current_process().ident, | |
| threading.current_thread().name, | |
| threading.current_thread().ident) | |
| job = _get_job_instance(job_id=job_id, for_update=True) | |
| if not job: | |
| return False | |
| if job.lock: | |
| if job.lock == lock: | |
| __log__.debug(u"job.api.lock_job [%s, %s]: already locked to %s", | |
| job_id, lock, job.lock) | |
| return lock | |
| else: | |
| __log__.warning( | |
| u"job.api.lock_job [%s, %s]: already locked by another lock %r", | |
| job_id, lock, job.lock) | |
| return False | |
| else: | |
| # TODO: filter and update? | |
| job.lock = lock | |
| job.save() | |
| time.sleep(wait) | |
| job = _get_job_instance(job_id=job_id) | |
| if job.lock == lock: | |
| __log__.debug(u"job.api.lock_job [%r, %r]: success", job_id, lock) | |
| return lock | |
| __log__.debug( | |
| u"job.api.lock_job [%r, %r]: failed, locked over by %r", | |
| job_id, lock, job.lock) | |
| @transaction.atomic | |
| def unlock_job(job_id, lock): | |
| __log__.debug(u"job.api.unlock_job %r", [job_id, lock]) | |
| job = _get_job_instance(job_id=job_id, for_update=True) | |
| if job.lock == lock: | |
| job.lock = None | |
| job.save() | |
| return True | |
| else: | |
| __log__.warning(u"Cannot unlock job %r with lock %r, it locked by %r!", | |
| job_id, lock, job.lock) | |
| @contextmanager | |
| def locked_job(job_id): | |
| lock = lock_job(job_id=job_id) | |
| yield lock | |
| unlock_job(job_id=job_id, lock=lock) | |
| def lock_and_start_job(job_id, **kwargs): | |
| if not job_id: | |
| return | |
| lock = lock_job(job_id=job_id) | |
| assert lock | |
| assert start_job(job_id=job_id, **kwargs) | |
| return lock | |
| def finish_and_unlock_job(job_id, lock, **kwargs): | |
| if job_id: | |
| assert finish_job(job_id=job_id, **kwargs) | |
| assert unlock_job(job_id=job_id, lock=lock) | |
| def abort_and_unlock_job(job_id, lock, exc=None, **kwargs): | |
| if job_id: | |
| __log__.debug(u"Job aborted: %r", job_id) | |
| if exc: | |
| assert exc.job_id == job_id | |
| assert abort_job(job_id=job_id, **kwargs) | |
| assert unlock_job(job_id=job_id, lock=lock) | |
| def fail_and_unlock_job(job_id, lock, exc=None): | |
| if job_id: | |
| assert fail_job(job_id=job_id, message=repr(exc)) | |
| assert unlock_job(job_id=job_id, lock=lock) | |
| def get_job_admin_url(job_id): | |
| return models.Job.get_admin_url(job_id=job_id) | |
| def _get_func_and_path(func): | |
| if isinstance(func, basestring): | |
| func_path = func | |
| func = import_string(dotted_path=func_path) | |
| elif isinstance(func, Task): | |
| func_path = func.run.__module__ + '.' + func.__name__ | |
| else: | |
| func_path = func.__module__ + '.' + func.__name__ | |
| return func, func_path | |
| def _get_user_id(user_id=None, user=None, request=None): | |
| if request and not user: | |
| user = request.user | |
| if user and not user_id: | |
| user_id = user.id | |
| return user_id | |
| def delay_task(func, user_id=None, user=None, request=None, params=None): | |
| func, func_path = _get_func_and_path(func=func) | |
| user_id = _get_user_id(user_id=user_id, user=user, request=request) | |
| desc = unicode(params) if params is not None else None | |
| if params is None: | |
| params = {} | |
| _job = create_job(name=func_path, description=desc, user_id=user_id, | |
| params=params) | |
| params['job_id'] = _job.id | |
| async_result = func.delay(**params) | |
| _job = update_job(job_id=_job.id, task_id=async_result.id) | |
| return _job | |
| @transaction.atomic | |
| def retry_job(job_id, user_id=None): | |
| job = get_job(job_id=job_id) | |
| if not job: | |
| return | |
| try: | |
| func = import_string(job.name) | |
| except (ImportError, ImproperlyConfigured): | |
| __log__.exception(u"Cannot import job function: %r", [job_id, job.name]) | |
| return | |
| assert update_job(job_id=job_id, status=enums.JobStatus.PENDING, | |
| user_id=user_id, progress=None, task_id=None) | |
| params = dict(job_id=job_id, **job.params) | |
| async_result = func.delay(**params) | |
| _job = update_job(job_id=job_id, task_id=async_result.id) | |
| return _job | |
| class JobException(Exception): | |
| def __init__(self, job_id, *args): | |
| self.job_id = job_id | |
| super(JobException, self).__init__(*args) | |
| class JobAborted(JobException): | |
| pass | |
| from django_enumfield.enum import Enum | |
| class JobStatus(Enum): | |
| PENDING = 10 | |
| STARTED = 20 | |
| SUCCESS = 30 | |
| FAILED = 40 | |
| CANCELED = 50 | |
| ABORTED = 60 | |
| ABORTING = 70 | |
| _transitions = { | |
| CANCELED: (PENDING,), | |
| STARTED: (PENDING,), | |
| ABORTING: (STARTED,), | |
| ABORTED: (ABORTING,), | |
| FAILED: (STARTED, ABORTING, PENDING), | |
| SUCCESS: (STARTED, ABORTING,), | |
| PENDING: (CANCELED, ABORTED, FAILED), | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import celery.states | |
| from django.conf import settings | |
| from django.db import models | |
| from django.utils.translation import ugettext_lazy as _ | |
| from django_enumfield.db.fields import EnumField | |
| from django_extensions.db.fields.json import JSONField | |
| from model_utils.models import TimeStampedModel | |
| from monkey_api import ApiModel, ApiForeignKey | |
| from utils.admin import admin_url | |
| from . import enums | |
| class Job(ApiModel, TimeStampedModel): | |
| name = models.CharField(max_length=255) | |
| url = models.URLField(null=True) | |
| path = models.CharField(max_length=255, null=True) | |
| status = EnumField(enums.JobStatus) | |
| started_at = models.DateTimeField(null=True) | |
| finished_at = models.DateTimeField(null=True) | |
| progress = models.FloatField(null=True) | |
| current = models.CharField(max_length=255, null=True) | |
| description = models.TextField(null=True) | |
| user = ApiForeignKey(settings.AUTH_USER_MODEL, null=True) | |
| lock = models.CharField(max_length=255, null=True) | |
| attempt = models.PositiveSmallIntegerField(default=0) | |
| task_id = models.CharField(max_length=255, null=True) | |
| params = JSONField(null=True) | |
| allowed_fields = ['url', 'path', 'status', 'progress', 'description', | |
| 'task_id', 'current'] | |
| class Meta: | |
| verbose_name = _('job') | |
| verbose_name_plural = _('jobs') | |
| class Api: | |
| fields = ('name', 'url', 'path', 'status', 'started_at', 'finished_at', | |
| 'progress', 'description', 'user_id', 'lock', 'attempt', 'id', | |
| 'task_id', 'params', 'current', 'created', 'modified') | |
| extra = ('admin_url', 'short_name') | |
| def __unicode__(self): | |
| return 'Job %s: %s' % (self.pk, self.name) | |
| @classmethod | |
| def get_admin_url(cls, job_id): | |
| return admin_url(instance=(cls._meta.app_label, | |
| cls._meta.model_name, job_id)) | |
| @property | |
| def admin_url(self): | |
| return self.get_admin_url(job_id=self.pk) | |
| @property | |
| def short_name(self): | |
| return self.name.split('.')[-1] | |
| class JobLog(ApiModel, TimeStampedModel): | |
| job = ApiForeignKey(to=Job) | |
| status = EnumField(enums.JobStatus) | |
| message = models.TextField(null=True) | |
| user = ApiForeignKey(settings.AUTH_USER_MODEL, null=True) | |
| attempt = models.PositiveSmallIntegerField(default=0) | |
| path = models.CharField(max_length=255, null=True) | |
| task_id = models.CharField(max_length=255, null=True) | |
| class Meta: | |
| verbose_name = _('job log') | |
| verbose_name_plural = _('job logs') | |
| class Api: | |
| fields = ('id', 'job_id', 'status', 'message', 'user_id', 'attempt', | |
| 'path', 'created', 'modified') | |
| extra = ('admin_url',) | |
| class CeleryTask(models.Model): | |
| status_choices = [(s, s) for s in celery.states.ALL_STATES] | |
| task_id = models.CharField(max_length=255, unique=True) | |
| status = models.CharField(max_length=50, choices=status_choices) | |
| result = models.BinaryField(null=True) | |
| date_done = models.DateTimeField(auto_now=True, null=True) | |
| traceback = models.TextField(null=True) | |
| class Meta: | |
| db_table = 'celery_taskmeta' | |
| class CeleryTaskSet(models.Model): | |
| taskset_id = models.CharField(max_length=255, unique=True) | |
| result = models.BinaryField(null=True) | |
| date_done = models.DateTimeField(auto_now_add=True, null=True) | |
| class Meta: | |
| db_table = 'celery_tasksetmeta' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| CELERYBEAT_SCHEDULE = { | |
| 'do_stuff': { | |
| 'task': 'schedule_job_task', | |
| 'kwargs': { | |
| 'func': 'some_task', | |
| 'notify': True, | |
| }, | |
| 'schedule': crontab(hour=0, minute=0, day_of_week=0), | |
| }, | |
| } | |
| @app.task | |
| def schedule_job_task(func, **params): | |
| log.debug('schedule_job_task %r', [func, params]) | |
| _job = job.api.delay_task(func=func, params=params) | |
| _id = _job.id if _job else _job | |
| log.debug('job.id = %r', _id) | |
| return _id |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment