Skip to content

Instantly share code, notes, and snippets.

@icsaas
Forked from lechup/gevent_rqworker.py
Created July 6, 2014 03:04
Show Gist options
  • Save icsaas/4692e842e70d6d5aeb03 to your computer and use it in GitHub Desktop.
Save icsaas/4692e842e70d6d5aeb03 to your computer and use it in GitHub Desktop.
from __future__ import absolute_import
import signal
import gevent
import gevent.pool
from rq import Worker
from rq.timeouts import BaseDeathPenalty, JobTimeoutException
from rq.worker import StopRequested, green, blue
from rq.exceptions import DequeueTimeout
class GeventDeathPenalty(BaseDeathPenalty):
def setup_death_penalty(self):
exception = JobTimeoutException('Gevent Job exceeded maximum timeout value (%d seconds).' % self._timeout)
self.gevent_timeout = gevent.Timeout(self._timeout, exception)
self.gevent_timeout.start()
def cancel_death_penalty(self):
self.gevent_timeout.cancel()
class GeventWorker(Worker):
death_penalty_class = GeventDeathPenalty
def __init__(self, *args, **kwargs):
pool_size = kwargs.get('pool_size', 5)
self.gevent_pool = gevent.pool.Pool(pool_size)
super(GeventWorker, self).__init__(*args, **kwargs)
def get_ident(self):
return id(gevent.getcurrent())
def _install_signal_handlers(self):
def request_force_stop():
self.log.warning('Cold shut down.')
self.gevent_pool.kill()
raise SystemExit()
def request_stop():
gevent.signal(signal.SIGINT, request_force_stop)
gevent.signal(signal.SIGTERM, request_force_stop)
self.log.warning('Warm shut down requested.')
self.log.warning('Stopping after all greenlets are finished. '
'Press Ctrl+C again for a cold shutdown.')
self._stopped = True
self.gevent_pool.join()
raise StopRequested()
gevent.signal(signal.SIGINT, request_stop)
gevent.signal(signal.SIGTERM, request_stop)
def execute_job(self, job):
self.gevent_pool.spawn(self.perform_job, job)
def dequeue_job_and_maintain_ttl(self, timeout):
result = None
while True:
self.heartbeat()
# do jobs in pool before adding new one
while not self.gevent_pool.free_count() > 0:
gevent.sleep(0)
try:
result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection)
if result is None and timeout is None:
self.gevent_pool.join()
if result is not None:
job, queue = result
self.log.info('%s: %s (%s)' % (green(queue.name),
blue(job.description), job.id))
break
except DequeueTimeout:
pass
self.heartbeat()
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment