Created
January 4, 2011 01:08
-
-
Save shazow/764253 to your computer and use it in GitHub Desktop.
SQLAlchemy-based task manager (like Celery but standalone)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Task(BaseModel): | |
__tablename__ = 'task' | |
id = Column(types.Integer, primary_key=True) | |
time_created = Column(types.DateTime, default=datetime.now, nullable=False) | |
time_updated = Column(types.DateTime, onupdate=datetime.now) | |
time_wait_until = Column(types.DateTime, default=datetime.now, nullable=False) | |
seconds_elapsed = Column(types.Float) | |
# Tag tasks that depend on a specific resource so they can be queried | |
# Example: An error from a 'twitter-api' tag would trigger a 1hr delay on all pending tasks of that group. | |
resource_group = Column(types.String(32)) | |
method = Column(types.String(32), nullable=False) | |
params = Column(types.PickleType, default=dict, nullable=False) | |
lock_key = Column(types.String(16)) | |
recurring_cron = Column(types.String(128)) # Cron-like string for recurring. If not set, wont recur. | |
parent_task_id = Column(types.Integer, ForeignKey('task.id')) | |
parent_task = orm.relationship('Task', remote_side=id) | |
parent_type = Column(mytypes.Enum(['master', 'retry', 'recurring']), default='master', nullable=False) | |
state = Column(mytypes.Enum(['pending', 'started', 'completed', 'failed']), default='pending', nullable=False, index=True) | |
@staticmethod | |
def delay_resource_group(resource_group, delta=None): | |
# Make sure all tasks of given resource_group are scheduled for no sooner than now+delta. | |
affected_tasks = Session.query(Task).filter(and_(Task.time_wait_until > (datetime.now() - delta), Task.state<'completed', Task.resource_group==resource_group)) | |
for t in affected_tasks: | |
t.time_wait_until += delta | |
@staticmethod | |
def create_from_time(delta=None, now=None, resolution='hour', **kw): | |
if not now: | |
now = datetime.now() | |
if not delta: | |
return Task.create(time_wait_until=now, **kw) | |
resolutions = ['year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond'] | |
if resolution not in resolutions: | |
raise KeyError("Resolution is not valid: {0}".format(resolution)) | |
t = now + delta | |
args = [] | |
for r in resolutions: | |
args += [getattr(t, r)] | |
if r == resolution: | |
break | |
return Task.create(time_wait_until=datetime(*args), **kw) | |
@staticmethod | |
def next(): | |
q = Session.query(Task).filter_by(state='pending').filter(Task.time_wait_until<=datetime.now()) | |
return q.first() | |
def create_recurring(self): | |
if not self.recurring_cron: | |
return | |
now = datetime.now() | |
cron = croniter(self.recurring_cron, self.time_created) | |
time_next_job = cron.get_next(datetime) | |
while time_next_job < now: | |
time_next_job = cron.get_next(datetime) | |
t = Task.create(time_wait_until=time_next_job, resource_group=self.resource_group, | |
method=self.method, params=self.params, recurring_cron=self.recurring_cron, | |
parent_task_id=self.id, parent_type='recurring') | |
return t | |
def start(self, worker): | |
self.state = 'started' | |
self.lock_key = worker.lock | |
self._seconds_started = time.time() | |
def retry(self, worker, delay_seconds=0): | |
self.state = 'pending' | |
self.lock_key = None | |
self.time_wait_until = datetime.now() + timedelta(seconds=delay_seconds) | |
def complete(self, worker): | |
if not self.check_lock(worker): | |
raise AssertionError("Invalid lock for complete_task {0}: {1}".format(self.id, worker.lock)) | |
self.seconds_elapsed = time.time() - self._seconds_started | |
self.state = 'completed' | |
def check_lock(self, worker): | |
return self.lock_key == worker.lock | |
Index('task_discover_idx', | |
Task.state, | |
Task.time_wait_until) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment