Skip to content

Instantly share code, notes, and snippets.

@shazow
Created January 4, 2011 01:08
Show Gist options
  • Save shazow/764253 to your computer and use it in GitHub Desktop.
Save shazow/764253 to your computer and use it in GitHub Desktop.
SQLAlchemy-based task manager (like Celery but standalone)
class Task(BaseModel):
__tablename__ = 'task'
id = Column(types.Integer, primary_key=True)
time_created = Column(types.DateTime, default=datetime.now, nullable=False)
time_updated = Column(types.DateTime, onupdate=datetime.now)
time_wait_until = Column(types.DateTime, default=datetime.now, nullable=False)
seconds_elapsed = Column(types.Float)
# Tag tasks that depend on a specific resource so they can be queried
# Example: An error from a 'twitter-api' tag would trigger a 1hr delay on all pending tasks of that group.
resource_group = Column(types.String(32))
method = Column(types.String(32), nullable=False)
params = Column(types.PickleType, default=dict, nullable=False)
lock_key = Column(types.String(16))
recurring_cron = Column(types.String(128)) # Cron-like string for recurring. If not set, wont recur.
parent_task_id = Column(types.Integer, ForeignKey('task.id'))
parent_task = orm.relationship('Task', remote_side=id)
parent_type = Column(mytypes.Enum(['master', 'retry', 'recurring']), default='master', nullable=False)
state = Column(mytypes.Enum(['pending', 'started', 'completed', 'failed']), default='pending', nullable=False, index=True)
@staticmethod
def delay_resource_group(resource_group, delta=None):
# Make sure all tasks of given resource_group are scheduled for no sooner than now+delta.
affected_tasks = Session.query(Task).filter(and_(Task.time_wait_until > (datetime.now() - delta), Task.state<'completed', Task.resource_group==resource_group))
for t in affected_tasks:
t.time_wait_until += delta
@staticmethod
def create_from_time(delta=None, now=None, resolution='hour', **kw):
if not now:
now = datetime.now()
if not delta:
return Task.create(time_wait_until=now, **kw)
resolutions = ['year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond']
if resolution not in resolutions:
raise KeyError("Resolution is not valid: {0}".format(resolution))
t = now + delta
args = []
for r in resolutions:
args += [getattr(t, r)]
if r == resolution:
break
return Task.create(time_wait_until=datetime(*args), **kw)
@staticmethod
def next():
q = Session.query(Task).filter_by(state='pending').filter(Task.time_wait_until<=datetime.now())
return q.first()
def create_recurring(self):
if not self.recurring_cron:
return
now = datetime.now()
cron = croniter(self.recurring_cron, self.time_created)
time_next_job = cron.get_next(datetime)
while time_next_job < now:
time_next_job = cron.get_next(datetime)
t = Task.create(time_wait_until=time_next_job, resource_group=self.resource_group,
method=self.method, params=self.params, recurring_cron=self.recurring_cron,
parent_task_id=self.id, parent_type='recurring')
return t
def start(self, worker):
self.state = 'started'
self.lock_key = worker.lock
self._seconds_started = time.time()
def retry(self, worker, delay_seconds=0):
self.state = 'pending'
self.lock_key = None
self.time_wait_until = datetime.now() + timedelta(seconds=delay_seconds)
def complete(self, worker):
if not self.check_lock(worker):
raise AssertionError("Invalid lock for complete_task {0}: {1}".format(self.id, worker.lock))
self.seconds_elapsed = time.time() - self._seconds_started
self.state = 'completed'
def check_lock(self, worker):
return self.lock_key == worker.lock
Index('task_discover_idx',
Task.state,
Task.time_wait_until)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment