Skip to content

Instantly share code, notes, and snippets.

@NorthIsUp
Created August 28, 2012 01:55
Show Gist options
  • Save NorthIsUp/3494228 to your computer and use it in GitHub Desktop.
Save NorthIsUp/3494228 to your computer and use it in GitHub Desktop.
extensions for thoonk jobs
from thoonk.feeds import Job
from thoonk.feeds.queue import Empty
import time
# To use this do the super horrible following actions:
# job = pubsub.job(channel)
# job.__class__ = JobExtended
class JobExtended(Job):
def finish_range(self, hi_id, lo_id=None):
"""
Mark a job as completed, and store any results.
Arguments:
id -- The ID of the completed job.
result -- The result data from the job. (should be a string!)
"""
def _finish(pipe):
if pipe.zrank(self.feed_claimed, hi_id) is None:
return # raise exception?
pipe.lo_rank = 0 if lo_id is None else pipe.zrank(self.feed_claimed, lo_id) or 0
pipe.hi_rank = pipe.zrank(self.feed_claimed, hi_id)
pipe.ids = pipe.zrange(self.feed_claimed, pipe.lo_rank, pipe.hi_rank)
pipe.ids_count = len(pipe.ids)
pipe.multi()
pipe.zrem(self.feed_claimed, *pipe.ids)
pipe.hdel(self.feed_cancelled, *pipe.ids)
pipe.zrem(self.feed_published, *pipe.ids)
pipe.incr(self.feed_finishes, amount=pipe.ids_count)
pipe.hdel(self.feed_items, *pipe.ids)
self.redis.transaction(_finish, self.feed_claimed)
def get_range(self, count=1, timeout=0):
"""
Retrieve the next job from the queue.
Raises an Empty exception if the request times out.
Arguments:
timeout -- Optional time in seconds to wait before
raising an exception.
Returns:
id -- The id of the job
job -- The job content
cancelled -- The number of times the job has been cancelled
"""
pipe = self.redis.pipeline()
pipe.multi()
pipe.ids = pipe.lrange(self.feed_ids, -1 * count, -1)
pipe.ltrim(self.feed_ids, 0, -1 * count)
result = pipe.execute()
job_ids = result[0]
if len(job_ids) == 0:
return None
pipe = self.redis.pipeline()
pipe.ids = job_ids
pipe.ids_count = len(job_ids)
pipe.map = dict(zip(job_ids, (int(time.time() * 1000),) * len(job_ids)))
pipe.multi()
pipe.zadd(self.feed_claimed, **pipe.map)
pipe.hmget(self.feed_items, pipe.ids)
pipe.hmget(self.feed_cancelled, pipe.ids)
result += pipe.execute()
jobs = result[3]
job_cc = [0 if cc is None else cc for cc in result[4]]
self.thoonk._publish(self.feed_claimed, job_ids)
return zip(job_ids, jobs, job_cc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment