- I've been using gevent in production for about 6 years, and I've only encountered a handful of issues:
- In gevent pre-1.0 there were issues with the DNS resolver
- I've had a couple of issues with gevent-openssl, which I've fixed by using a custom version: mjs/gevent_openssl#14 and mjs/gevent_openssl#12
- Celery hangs when gevent is monkeypatched. I haven't figured out why, and this might have been fixed in newer versions of celery (the one I'm using is fairly old).
- All of my experience using gevent is in fully monkeypatched mode. It's certainky possible to use outside of monkeypatch mode, but I don't know anything about that.
- See the
monkeypatches.py
andgevent_.py
files for my implementation of monkeypatching. - The
gevent_.py
file contains a couple things:- Monkeypatches
- Tests which verify whether or not the monkeypatches succeed (there's a view in my application which runs
gevent_.run_checks
so it's easy to verify whether or not the patches have been applied correctly) - A custom
spawn
function which injects some thread local state into new threads to help with logging. - The
lazy_imap
helper, which applies a function concurrently while guaranteeing that only a certain number of threads and outstanding (ie, unconsumed) results are available. After writing this I realized that there is amaxsize
argument to Pool.imap, which I believe does the same thing.
- With all of the above in place, I haven't had any issues with common libraries (requests, urllib*, redis-py), and this can be verified with the checks implemented in
gevent_.py
. - I use
worker_class = "gevent"
in gunicorn
Last active
July 10, 2018 21:03
-
-
Save wolever/6b23f57b247cb9bc56572dd34b00af7b to your computer and use it in GitHub Desktop.
Point-form notes about using gevent in production
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import heapq | |
using_gevent = False | |
def try_init_gevent(): | |
use_gevent = os.environ.get("USE_GEVENT") | |
if not use_gevent or use_gevent in ["0", "false"]: | |
return | |
global using_gevent | |
using_gevent = True | |
# For some reason, as of Django 1.8 it can't be imported before gevent has | |
# done its monkey patching. Double check that here. | |
assert "django" not in sys.modules, \ | |
"Django was imported before try_init_gevent was called" | |
from gevent import monkey | |
monkey.patch_all(subprocess=True) | |
import gevent_openssl | |
gevent_openssl.monkey_patch() | |
from gevent_openssl.SSL import Connection | |
def Connection_sendall_fixed(self, buf, flags=0): | |
# There is a bug with gevent which causes `sendall` to crash when | |
# the buffer is large. Work around this by manually implementing | |
# `sendall`. See also: https://github.com/gevent/gevent/issues/736 | |
while buf: | |
sent = self.send(buf, flags) | |
buf = buf[sent:] | |
Connection.sendall = Connection_sendall_fixed | |
from psycogreen.gevent import patch_psycopg | |
patch_psycopg() | |
monkeypatch_gevent_greenlet() | |
#import gevent | |
#bd_thread = gevent.spawn(start_blocking_detector) | |
#_blocking_detector[0] = bd_thread | |
_blocking_detector = [None] | |
def start_blocking_detector(): | |
# Sleep for a little while to give the application a chance to do all the | |
# imports + initialization (which may take a little while) | |
import gevent | |
gevent.sleep(5) | |
from gevent_helpers import BlockingDetector | |
BlockingDetector(timeout=1.5, raise_exc=False, aggressive=False)() | |
def stop_blocking_detector(): | |
bd = _blocking_detector[0] | |
if bd is not None: | |
bd.kill() | |
def check(a, b): | |
if a == b: | |
return "OK" | |
return "ERROR: %r != %r" %(a, b) | |
def check_yields(f): | |
import gevent | |
result = [] | |
gevent.spawn(lambda: result.append("okay")) | |
f() | |
return check(result, ["okay"]) | |
def run_checks(): | |
yield ("USE_GEVENT", os.environ.get("USE_GEVENT") or "") | |
yield ("using_gevent", check(using_gevent, True)) | |
yield ("spawn", check_spawn()) | |
yield ("requests", check_requests()) | |
yield ("psycopg2", check_psycopg2()) | |
yield ("subprocess", check_psycopg2()) | |
yield ("Timeout", check_timeout()) | |
def check_spawn(): | |
import gevent | |
result = [] | |
gevent.joinall([ | |
gevent.spawn(lambda: result.append("a")), | |
gevent.spawn(lambda: gevent.sleep(0.2) or result.append("c")), | |
gevent.spawn(lambda: gevent.sleep(0.1) or result.append("b")), | |
]) | |
return check(result, ["a", "b", "c"]) | |
def check_requests(): | |
import requests | |
return check_yields(lambda: requests.get("http://example.com")) | |
def check_psycopg2(): | |
from django.contrib.auth.models import User | |
return check_yields(lambda: User.objects.all().first()) | |
def check_timeout(): | |
from gevent import Timeout, sleep | |
with Timeout(0.001, False): | |
sleep(0.1) | |
return "ERROR" | |
return "OK" | |
def check_subprocess(): | |
import subprocess | |
return check_yields(lambda: subprocess.call(["true"])) | |
def spawn_helper(log_context, post_spawn, func, args, kwargs): | |
post_spawn() | |
return func(*args, **kwargs) | |
def spawn(func, *args, **kwargs): | |
import gevent | |
from gevent.greenlet import Greenlet | |
if Greenlet.__init__.__name__.startswith("akindi"): | |
return gevent.spawn(func, *args, **kwargs) | |
return gevent.spawn(spawn_helper, get_post_spawn(), func, args, kwargs) | |
def get_post_spawn(func=None): | |
from .global_ import g | |
child_num = g.get("_num_child_threads", 0) + 1 | |
log_context = g.log_context | |
g._num_child_threads = child_num | |
cur_name = g.log_context.get("thread") | |
child_thread = "%s%s" %(cur_name and cur_name + ":" or "", child_num) | |
def akindi_post_spawn(*args, **kwargs): | |
g.log_context.update(log_context) | |
g.log_context["thread"] = child_thread | |
if func is not None: | |
return func(*args, **kwargs) | |
return akindi_post_spawn | |
def pool(size=None): | |
from gevent.pool import Pool | |
return Pool(size=size) | |
def monkeypatch_gevent_greenlet(): | |
from gevent.greenlet import Greenlet | |
def akindi_greenlet__init__(self, run=None, *args, **kwargs): | |
post_spawn = get_post_spawn(func=(run or self._run)) | |
Greenlet__init__(self, post_spawn, *args, **kwargs) | |
Greenlet__init__ = Greenlet.__init__ | |
Greenlet.__init__ = akindi_greenlet__init__ | |
class lazy_imap(object): | |
""" Lazily apply func over iter, ensuring that there are at most ``size`` | |
outstanding elements:: | |
>>> def some_func(x): | |
... print "Running some_func with:", x | |
... sleep(0.1) | |
... return x | |
... | |
>>> items = lazy_imap(some_func, range(100), size=3) | |
Running some_func with: 0 | |
Running some_func with: 1 | |
Running some_func with: 2 | |
>>> items.next() | |
0 | |
Running some_func with: 3 | |
>>> | |
""" | |
def __init__(self, func, iter, size=None): | |
from gevent.event import Event | |
self.size = size or 10 | |
self.func = func | |
self.iter_with_idx = enumerate(iter) | |
self.next_ready = Event() | |
self.pending_results = [] | |
self.pending_threads = 0 | |
self.next_result_idx = 0 | |
self.iter_exhausted = False | |
self._spawn_next() | |
def _is_done(self): | |
return ( | |
self.iter_exhausted and | |
not self.pending_threads and | |
not self.pending_results | |
) | |
def _spawn_next(self): | |
from gevent import spawn | |
if self.iter_exhausted: | |
return | |
while self.pending_threads + len(self.pending_results) < self.size: | |
try: | |
idx, val = self.iter_with_idx.next() | |
except StopIteration: | |
self.iter_exhausted = True | |
return | |
self.pending_threads += 1 | |
greenlet = spawn(self.func, val) | |
greenlet.__result_index = idx | |
greenlet.rawlink(self._on_thread_result) | |
def _on_thread_result(self, greenlet): | |
heapq.heappush(self.pending_results, ( | |
greenlet.__result_index, | |
greenlet.exception, | |
greenlet.value, | |
)) | |
self.pending_threads -= 1 | |
self.next_ready.set() | |
self._spawn_next() | |
def __iter__(self): | |
return self | |
def next(self): | |
if self._is_done(): | |
raise StopIteration | |
while True: | |
has_next = ( | |
self.pending_results and | |
self.pending_results[0][0] == self.next_result_idx | |
) | |
if has_next: | |
break | |
self.next_ready.wait(timeout=1) | |
self.next_ready.clear() | |
self.next_result_idx += 1 | |
_, exc, value = heapq.heappop(self.pending_results) | |
self._spawn_next() | |
if exc: | |
raise exc | |
return value |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
I import this file and run `monkeypatch_everything` from my application's `__init__.py` | |
to guarantee that monkeypatching takes place before anything else is imported. | |
""" | |
import logging | |
did_init = False | |
def _patch_urllib3(module): | |
from django.utils.module_loading import import_string | |
try: | |
inject_into_urllib3 = import_string(module + ".contrib.pyopenssl.inject_into_urllib3") | |
inject_into_urllib3() | |
except Exception as e: | |
import gevent | |
def log_error(): | |
# Give the rest of Django a chance to warm up and configure | |
# loggers, otherwise this message won't go anywhere. | |
gevent.sleep(1) | |
log = logging.getLogger("akindi.__init__") | |
log.error( | |
"Error injecting PyOpenSSL into %s: %s " | |
"(things will still work, but SSL certificates " | |
"with SNI will fail to validate)", | |
module, e, | |
) | |
gevent.spawn(log_error) | |
def _patch_openssl(): | |
from gevent_openssl.SSL import _real_connection as Connection | |
old_set_tlsext_host_name = Connection.set_tlsext_host_name | |
def set_tlsext_host_name(self, name): | |
if isinstance(name, unicode): | |
name = name.encode("idna") | |
return old_set_tlsext_host_name(self, name) | |
set_tlsext_host_name.old = old_set_tlsext_host_name | |
Connection.set_tlsext_host_name = set_tlsext_host_name | |
def monkeypatch_everything(): | |
""" Initialize monkeypatches. This should be called as early as possible, either | |
from manage.py or akindi.wsgi. """ | |
global did_init | |
if did_init: | |
return | |
from akindi.gevent_ import try_init_gevent | |
try_init_gevent() | |
_patch_openssl() | |
_patch_urllib3("urllib3") | |
_patch_urllib3("requests.packages.urllib3") | |
did_init = True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
any word on the celery+gevent hanging? :)