Skip to content

Instantly share code, notes, and snippets.

@Afeez1131
Forked from jbarham/delete_failed_jobs.py
Created January 13, 2025 11:03
Show Gist options
  • Save Afeez1131/14ac4289850132cc89e5fd1911556fbc to your computer and use it in GitHub Desktop.
Save Afeez1131/14ac4289850132cc89e5fd1911556fbc to your computer and use it in GitHub Desktop.
from django.core.management.base import BaseCommand
from rq.registry import FailedJobRegistry
from django_rq.queues import get_queue
from django_rq.utils import get_jobs
class Command(BaseCommand):
help = 'Delete failed jobs from Django RQ.'
def add_arguments(self, parser):
parser.add_argument('-q', '--queue', default='default', help='queue name (default: "default")')
parser.add_argument('-f', '--func', help='optional job function name, e.g. "app.tasks.func"')
parser.add_argument('--dry-run', action='store_true', help='Do not actually delete failed jobs')
def handle(self, *args, **options):
queue = get_queue(options['queue'])
registry = FailedJobRegistry(queue.name, queue.connection)
job_ids = registry.get_job_ids()
jobs = get_jobs(queue, job_ids, registry)
func_name = options.get('func')
if func_name:
jobs = [job for job in jobs if job.func_name == func_name]
if options['dry_run']:
print(f'Found {len(jobs)} failed jobs')
else:
for job in jobs:
queue.connection.lrem(queue.key, 0, job.id)
job.delete()
print(f'Deleted {len(jobs)} failed jobs')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment