Skip to content

Instantly share code, notes, and snippets.

@honi
Last active February 1, 2022 15:57
Show Gist options
  • Save honi/81ba1bff622649a4a2ef7ccf695b1fa0 to your computer and use it in GitHub Desktop.
Save honi/81ba1bff622649a4a2ef7ccf695b1fa0 to your computer and use it in GitHub Desktop.
RQ worker wrapper that autoreloads - use only during development
import os
import shlex
import subprocess
from django.core.management.base import BaseCommand
from django.utils import autoreload
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--worker-pid-file', action='store', dest='worker_pid_file', default='/tmp/rqworker.pid')
parser.add_argument('rqworkerargs', nargs='*')
def handle(self, *args, **options):
autoreload.run_with_reloader(lambda: run_worker(options['worker_pid_file'], options['rqworkerargs']))
def run_worker(worker_pid_file, worker_args):
if os.path.exists(worker_pid_file):
worker_pid = subprocess.run(['cat', worker_pid_file], stdout=subprocess.PIPE).stdout.decode('utf-8')
kill_worker_cmd = f'kill {worker_pid}'
subprocess.run(shlex.split(kill_worker_cmd), stderr=subprocess.PIPE)
start_worker_cmd = f'{get_managepy_path()} rqworker --pid={worker_pid_file} {" ".join(worker_args)}'
print(f'Starting RQ worker: {start_worker_cmd}')
subprocess.run(shlex.split(start_worker_cmd))
def get_managepy_path():
managepy_path = None
search_path = os.path.dirname(__file__)
while not managepy_path:
if os.path.exists(os.path.join(search_path, 'manage.py')):
managepy_path = os.path.abspath(os.path.join(search_path, 'manage.py'))
else:
search_path = os.path.join(search_path, '../')
return managepy_path
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment