Skip to content

Instantly share code, notes, and snippets.

@ecarreras
Last active December 18, 2015 01:48
Show Gist options
  • Save ecarreras/5706057 to your computer and use it in GitHub Desktop.
Save ecarreras/5706057 to your computer and use it in GitHub Desktop.
from hashlib import sha1
from redis import from_url
from rq import get_failed_queue, requeue_job, use_connection, Queue
conn = from_url('redis://localhost:6379/0')
use_connection(conn)
def get_failed_jobs(queue=None):
if not queue:
fq = get_failed_queue()
else:
fq = Queue(queue)
re_jobs = {}
hashes = {}
for job in fq.jobs:
exc = job.exc_info.split('\n')[-2]
hash = sha1(exc).hexdigest()[:7]
hashes[hash] = exc
re_jobs.setdefault(hash, [])
re_jobs[hash].append(job)
for key, value in re_jobs.items():
exc = hashes[key]
print "(%s) %s: %s jobs" % (key, exc, len(value))
return re_jobs
#!/usr/bin/env python
import sys
import times
from redis import from_url
from rq import use_connection, get_failed_queue, requeue_job, Queue
INTERVAL = 7200 # Seconds
MAX_ATTEMPTS = 5
PERMANENT_FAILED = 'permanent'
redis_conn = from_url(sys.argv[1])
use_connection(redis_conn)
fq = get_failed_queue()
pq = Queue(name=PERMANENT_FAILED)
for job in fq.jobs:
job.meta.setdefault('attempts', 0)
if job.meta['attempts'] > MAX_ATTEMPTS:
print "Job %s %s attempts. MAX ATTEMPTS %s limit exceeded" % (
job.id, job.meta['attempts'], MAX_ATTEMPTS
)
print job.description
print job.exc_info
print
fq.remove(job)
pq.enqueue_job(job)
print "Moved to %s queue" % PERMANENT_FAILED
else:
ago = (times.now() - job.enqueued_at).seconds
if ago >= INTERVAL:
print("%s: attemps: %s enqueued: %ss ago"
% (job.id, job.meta['attempts'], ago)),
job.meta['attempts'] += 1
job.save()
print "(Requeue)"
requeue_job(job.id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment