Last active
October 26, 2015 17:52
-
-
Save edvm/89ad310dc75f7a622002 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from application.notifications import notify_hipchat | |
from django.core.cache import cache | |
from django.utils import timezone | |
from contextlib import contextmanager | |
class TaskHandler(object): | |
def __init__(self, name='celery-tasks', cache=cache): | |
"""TaskHandler | |
- name: Name used as key in the cache backend to store tasks. | |
- cache: Django cache backend as defined at settings.CACHES. | |
Defaults to configured 'default' cache. | |
Usage: | |
>>> tasks = TaskHandler('tasks') | |
>>> tasks.as_list() | |
[] | |
>>> task = Task('frula', handler=tasks) | |
>>> tasks.as_list() | |
[<tools.redis.Task at 0x7f17ee9d9f90>] | |
>>> task.running | |
False | |
>>> task.start() | |
>>> task.running | |
True | |
>>> tasks.as_list()[0].running | |
True | |
>>> task.stop() | |
>>> tasks.as_list()[0].running | |
>>> False | |
>>> tasks.clear() | |
>>> tasks.as_list() | |
[] | |
""" | |
self._name = name | |
self._cache = cache | |
self._buffer = {} | |
def _fetch_tasks(self): | |
return self._cache.get(self._name, {}) | |
def _commit(self): | |
tasks = self._fetch_tasks() | |
tasks.update(self._buffer) | |
self._cache.set(self._name, tasks) | |
@property | |
def name(self): | |
return self._name | |
def add(self, task): | |
"""Add a Task.""" | |
task_name = task.name | |
task_info = task.as_dict() | |
self._buffer[task_name] = task_info | |
self._commit() | |
def clear(self): | |
"""Clear this cache.""" | |
self._cache.clear() | |
def update(self, task): | |
"""Update a task.""" | |
return self.add(task) | |
def get(self, task_name): | |
"""Get a Task by its task name. Return None if doesnt exists.""" | |
tasks = self._fetch_tasks() | |
task_dict = tasks.get(task_name, None) | |
if task_dict is not None: | |
task_dict.update({'handler': self}) | |
return Task(task_name, **task_dict) | |
def is_running(self, task_name): | |
"""Return bool type if task is running. Returns None if task not exists.""" | |
tasks = self._fetch_tasks() | |
task = tasks.get(task_name, None) | |
return None if task is None else bool(task.get('_in_process')) | |
def list(self): | |
"""Returns a generator of tasks from this handler.""" | |
tasks = self._fetch_tasks() | |
for task_name in tasks.keys(): | |
yield self.get(task_name) | |
def as_list(self): | |
return list(self.list()) | |
class Task(object): | |
def __init__(self, name, handler, **kw): | |
"""Task | |
- name: Name of the task, should be unique. | |
- handler: TaskHandler. | |
""" | |
self.name = name | |
self.handler = handler | |
self._ended = kw.get('ended', None) | |
self._started = kw.get('started', None) | |
self._in_process = kw.get('_in_process', False) | |
self.handler.add(self) | |
@property | |
def started(self): | |
return self._started | |
@property | |
def ended(self): | |
return self._ended | |
@property | |
def running(self): | |
# ask redis about us | |
return self.handler.is_running(self.name) | |
def commit(self): | |
self.handler.update(self) | |
def run(self): | |
now = timezone.now() | |
self._started = now.strftime('%m/%d/%Y %H:%M:%S') | |
self._in_process = True | |
self.commit() | |
def stop(self): | |
now = timezone.now() | |
self._ended = now.strftime('%m/%d/%Y %H:%M:%S') | |
self._in_process = False | |
self.commit() | |
def as_dict(self): | |
return {'ended': self.ended, | |
'started': self.started, | |
'running': self.running, | |
'_in_process': self._in_process} | |
@contextmanager | |
def Dry(task_name, group_name='tasks'): | |
"""Dont repeat yourself context manager. | |
Usage: | |
>>> with tools.Dry(task_name, task_group_name') as task: | |
... if not task.running: | |
... callable(*args) | |
""" | |
tasks_group = TaskHandler(group_name) | |
task = tasks_group.get(task_name) | |
task = Task(task_name, handler=tasks_group) if task is None else task | |
if task and task.running: | |
notify_hipchat('<b>{}:{}</b> is still running, started at {}'.format( | |
tasks_group.name, task.name, task.started)) | |
yield task | |
if not task.running: | |
task.run() | |
try: | |
notify_hipchat('<b>{}:{}</b> - started at {}'.format( | |
tasks_group.name, task.name, task.started)) | |
yield task | |
finally: | |
task.stop() | |
notify_hipchat('<b>{}:{}</b> - ended at {}'.format( | |
tasks_group.name, task.name, task.ended)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment