-
-
Save lechup/d886e89490b2f6c737d7 to your computer and use it in GitHub Desktop.
| from __future__ import absolute_import | |
| import signal | |
| import gevent | |
| import gevent.pool | |
| from rq import Worker | |
| from rq.timeouts import BaseDeathPenalty, JobTimeoutException | |
| from rq.worker import StopRequested, green, blue | |
| from rq.exceptions import DequeueTimeout | |
| class GeventDeathPenalty(BaseDeathPenalty): | |
| def setup_death_penalty(self): | |
| exception = JobTimeoutException('Gevent Job exceeded maximum timeout value (%d seconds).' % self._timeout) | |
| self.gevent_timeout = gevent.Timeout(self._timeout, exception) | |
| self.gevent_timeout.start() | |
| def cancel_death_penalty(self): | |
| self.gevent_timeout.cancel() | |
| class GeventWorker(Worker): | |
| death_penalty_class = GeventDeathPenalty | |
| def __init__(self, *args, **kwargs): | |
| pool_size = 20 | |
| if 'pool_size' in kwargs: | |
| pool_size = kwargs.pop('pool_size') | |
| self.gevent_pool = gevent.pool.Pool(pool_size) | |
| super(GeventWorker, self).__init__(*args, **kwargs) | |
| def get_ident(self): | |
| return id(gevent.getcurrent()) | |
| def _install_signal_handlers(self): | |
| def request_force_stop(): | |
| self.log.warning('Cold shut down.') | |
| self.gevent_pool.kill() | |
| raise SystemExit() | |
| def request_stop(): | |
| if not self._stopped: | |
| gevent.signal(signal.SIGINT, request_force_stop) | |
| gevent.signal(signal.SIGTERM, request_force_stop) | |
| self.log.warning('Warm shut down requested.') | |
| self.log.warning('Stopping after all greenlets are finished. ' | |
| 'Press Ctrl+C again for a cold shutdown.') | |
| self._stopped = True | |
| self.gevent_pool.join() | |
| gevent.signal(signal.SIGINT, request_stop) | |
| gevent.signal(signal.SIGTERM, request_stop) | |
| def execute_job(self, job): | |
| self.gevent_pool.spawn(self.perform_job, job) | |
| def dequeue_job_and_maintain_ttl(self, timeout): | |
| if self._stopped: | |
| raise StopRequested() | |
| result = None | |
| while True: | |
| if self._stopped: | |
| raise StopRequested() | |
| self.heartbeat() | |
| while self.gevent_pool.full(): | |
| gevent.sleep(0.1) | |
| if self._stopped: | |
| raise StopRequested() | |
| try: | |
| result = self.queue_class.dequeue_any(self.queues, 5, connection=self.connection) | |
| if result is None and timeout is None: | |
| self.gevent_pool.join() | |
| if result is not None: | |
| job, queue = result | |
| self.log.info('%s: %s (%s)' % (green(queue.name), | |
| blue(job.description), job.id)) | |
| break | |
| except DequeueTimeout: | |
| pass | |
| self.heartbeat() | |
| return result |
The problem with rqworker is it doesn't apply the gevent monkey patching. If I were to stick
import gevent.monkey
gevent.monkey.patch_all()
at the top of the rqworker file, it works. I don't use rqworker, I use a custom main method.
I forked your gist and made some changes to properly support shutdown, I think. One thing I changed was I ignore timeout in dequeue_job_and_maintain_ttl b/c the default is to wait 360 seconds, which means Warm shutdown doesn't really work quickly enough. I don't think it really matters much as this function loops forever until it gets a job anyway.
Thanks for updates and explanations!
I don't understand why
result = self.queue_class.dequeue_any(self.queues, 5, connection=self.connection)
if result is None and timeout is None:
self.gevent_pool.join()
not the default:
result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection)
as @jhorman said, when calling dequeue_any, will it ignore the shudown signal?
How do you terminate a job, but not the whole process?
For example, a job spawns greenlets on its own and fatal error occurs in one of those greenlets, making entire job pointless, how can this job be removed from the pool?
I made some modifications, and pack it in a package, @lechup is it OK? This is the repo: https://github.com/zhangliyong/rq-gevent-worker
I don't know why it blocks CTRL + C somehow while running from cmd line with
rqworker -w apps.main.utils.rqworker.GeventWorker- how to break the loop with CTRL + C?