Skip to content

Instantly share code, notes, and snippets.

@edvm
Last active October 26, 2015 17:52
Show Gist options
  • Save edvm/89ad310dc75f7a622002 to your computer and use it in GitHub Desktop.
Save edvm/89ad310dc75f7a622002 to your computer and use it in GitHub Desktop.
from application.notifications import notify_hipchat
from django.core.cache import cache
from django.utils import timezone
from contextlib import contextmanager
class TaskHandler(object):
def __init__(self, name='celery-tasks', cache=cache):
"""TaskHandler
- name: Name used as key in the cache backend to store tasks.
- cache: Django cache backend as defined at settings.CACHES.
Defaults to configured 'default' cache.
Usage:
>>> tasks = TaskHandler('tasks')
>>> tasks.as_list()
[]
>>> task = Task('frula', handler=tasks)
>>> tasks.as_list()
[<tools.redis.Task at 0x7f17ee9d9f90>]
>>> task.running
False
>>> task.start()
>>> task.running
True
>>> tasks.as_list()[0].running
True
>>> task.stop()
>>> tasks.as_list()[0].running
>>> False
>>> tasks.clear()
>>> tasks.as_list()
[]
"""
self._name = name
self._cache = cache
self._buffer = {}
def _fetch_tasks(self):
return self._cache.get(self._name, {})
def _commit(self):
tasks = self._fetch_tasks()
tasks.update(self._buffer)
self._cache.set(self._name, tasks)
@property
def name(self):
return self._name
def add(self, task):
"""Add a Task."""
task_name = task.name
task_info = task.as_dict()
self._buffer[task_name] = task_info
self._commit()
def clear(self):
"""Clear this cache."""
self._cache.clear()
def update(self, task):
"""Update a task."""
return self.add(task)
def get(self, task_name):
"""Get a Task by its task name. Return None if doesnt exists."""
tasks = self._fetch_tasks()
task_dict = tasks.get(task_name, None)
if task_dict is not None:
task_dict.update({'handler': self})
return Task(task_name, **task_dict)
def is_running(self, task_name):
"""Return bool type if task is running. Returns None if task not exists."""
tasks = self._fetch_tasks()
task = tasks.get(task_name, None)
return None if task is None else bool(task.get('_in_process'))
def list(self):
"""Returns a generator of tasks from this handler."""
tasks = self._fetch_tasks()
for task_name in tasks.keys():
yield self.get(task_name)
def as_list(self):
return list(self.list())
class Task(object):
def __init__(self, name, handler, **kw):
"""Task
- name: Name of the task, should be unique.
- handler: TaskHandler.
"""
self.name = name
self.handler = handler
self._ended = kw.get('ended', None)
self._started = kw.get('started', None)
self._in_process = kw.get('_in_process', False)
self.handler.add(self)
@property
def started(self):
return self._started
@property
def ended(self):
return self._ended
@property
def running(self):
# ask redis about us
return self.handler.is_running(self.name)
def commit(self):
self.handler.update(self)
def run(self):
now = timezone.now()
self._started = now.strftime('%m/%d/%Y %H:%M:%S')
self._in_process = True
self.commit()
def stop(self):
now = timezone.now()
self._ended = now.strftime('%m/%d/%Y %H:%M:%S')
self._in_process = False
self.commit()
def as_dict(self):
return {'ended': self.ended,
'started': self.started,
'running': self.running,
'_in_process': self._in_process}
@contextmanager
def Dry(task_name, group_name='tasks'):
"""Dont repeat yourself context manager.
Usage:
>>> with tools.Dry(task_name, task_group_name') as task:
... if not task.running:
... callable(*args)
"""
tasks_group = TaskHandler(group_name)
task = tasks_group.get(task_name)
task = Task(task_name, handler=tasks_group) if task is None else task
if task and task.running:
notify_hipchat('<b>{}:{}</b> is still running, started at {}'.format(
tasks_group.name, task.name, task.started))
yield task
if not task.running:
task.run()
try:
notify_hipchat('<b>{}:{}</b> - started at {}'.format(
tasks_group.name, task.name, task.started))
yield task
finally:
task.stop()
notify_hipchat('<b>{}:{}</b> - ended at {}'.format(
tasks_group.name, task.name, task.ended))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment