-
-
Save lechup/d886e89490b2f6c737d7 to your computer and use it in GitHub Desktop.
from __future__ import absolute_import | |
import signal | |
import gevent | |
import gevent.pool | |
from rq import Worker | |
from rq.timeouts import BaseDeathPenalty, JobTimeoutException | |
from rq.worker import StopRequested, green, blue | |
from rq.exceptions import DequeueTimeout | |
class GeventDeathPenalty(BaseDeathPenalty): | |
def setup_death_penalty(self): | |
exception = JobTimeoutException('Gevent Job exceeded maximum timeout value (%d seconds).' % self._timeout) | |
self.gevent_timeout = gevent.Timeout(self._timeout, exception) | |
self.gevent_timeout.start() | |
def cancel_death_penalty(self): | |
self.gevent_timeout.cancel() | |
class GeventWorker(Worker): | |
death_penalty_class = GeventDeathPenalty | |
def __init__(self, *args, **kwargs): | |
pool_size = 20 | |
if 'pool_size' in kwargs: | |
pool_size = kwargs.pop('pool_size') | |
self.gevent_pool = gevent.pool.Pool(pool_size) | |
super(GeventWorker, self).__init__(*args, **kwargs) | |
def get_ident(self): | |
return id(gevent.getcurrent()) | |
def _install_signal_handlers(self): | |
def request_force_stop(): | |
self.log.warning('Cold shut down.') | |
self.gevent_pool.kill() | |
raise SystemExit() | |
def request_stop(): | |
if not self._stopped: | |
gevent.signal(signal.SIGINT, request_force_stop) | |
gevent.signal(signal.SIGTERM, request_force_stop) | |
self.log.warning('Warm shut down requested.') | |
self.log.warning('Stopping after all greenlets are finished. ' | |
'Press Ctrl+C again for a cold shutdown.') | |
self._stopped = True | |
self.gevent_pool.join() | |
gevent.signal(signal.SIGINT, request_stop) | |
gevent.signal(signal.SIGTERM, request_stop) | |
def execute_job(self, job): | |
self.gevent_pool.spawn(self.perform_job, job) | |
def dequeue_job_and_maintain_ttl(self, timeout): | |
if self._stopped: | |
raise StopRequested() | |
result = None | |
while True: | |
if self._stopped: | |
raise StopRequested() | |
self.heartbeat() | |
while self.gevent_pool.full(): | |
gevent.sleep(0.1) | |
if self._stopped: | |
raise StopRequested() | |
try: | |
result = self.queue_class.dequeue_any(self.queues, 5, connection=self.connection) | |
if result is None and timeout is None: | |
self.gevent_pool.join() | |
if result is not None: | |
job, queue = result | |
self.log.info('%s: %s (%s)' % (green(queue.name), | |
blue(job.description), job.id)) | |
break | |
except DequeueTimeout: | |
pass | |
self.heartbeat() | |
return result |
The problem with rqworker is it doesn't apply the gevent monkey patching. If I were to stick
import gevent.monkey
gevent.monkey.patch_all()
at the top of the rqworker file, it works. I don't use rqworker, I use a custom main method.
I forked your gist and made some changes to properly support shutdown, I think. One thing I changed was I ignore timeout
in dequeue_job_and_maintain_ttl
b/c the default is to wait 360 seconds, which means Warm shutdown doesn't really work quickly enough. I don't think it really matters much as this function loops forever until it gets a job anyway.
Thanks for updates and explanations!
I don't understand why
result = self.queue_class.dequeue_any(self.queues, 5, connection=self.connection)
if result is None and timeout is None:
self.gevent_pool.join()
not the default:
result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection)
as @jhorman said, when calling dequeue_any, will it ignore the shudown signal?
How do you terminate a job, but not the whole process?
For example, a job spawns greenlets on its own and fatal error occurs in one of those greenlets, making entire job pointless, how can this job be removed from the pool?
I made some modifications, and pack it in a package, @lechup is it OK? This is the repo: https://github.com/zhangliyong/rq-gevent-worker
I don't know why it blocks CTRL + C somehow while running from cmd line with
rqworker -w apps.main.utils.rqworker.GeventWorker
- how to break the loop with CTRL + C?