Last active
November 24, 2021 05:18
-
-
Save versusvoid/8c55b90e44d02dd3fa32888078b6a107 to your computer and use it in GitHub Desktop.
Abort WSGI request processing on client disconnect (gunicorn, gevent + sqlalchemy, psycopg2)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import gevent | |
import gevent.local | |
import gunicorn.app.wsgiapp | |
import psycopg2 | |
import psycopg2.errors | |
import psycopg2.extensions | |
import sqlalchemy.util | |
from gevent._hub_primitives import wait_read, wait_write | |
from gevent.libuv.watcher import libuv | |
from gunicorn.workers.ggevent import GeventWorker | |
class AbortOnDisconnect(gevent.GreenletExit): | |
pass | |
def patch_psycopg2(event=None): | |
psycopg2.extensions.set_wait_callback(_gevent_wait_callback) | |
_exception_holder = gevent.local.local() | |
def _gevent_wait_callback(conn, timeout=None): | |
while True: | |
try: | |
state = conn.poll() | |
if state == psycopg2.extensions.POLL_OK: | |
break | |
elif state == psycopg2.extensions.POLL_READ: | |
wait_read(conn.fileno(), timeout=timeout) | |
elif state == psycopg2.extensions.POLL_WRITE: | |
wait_write(conn.fileno(), timeout=timeout) | |
else: | |
raise psycopg2.OperationalError(f'Bad result from poll: {state}') | |
except AbortOnDisconnect as e: | |
# If cancel throws exception here we let it bubble up | |
# instead of AbortOnDisconnect. | |
# psycopg2 and sqlalchemy can then close and dispose resources | |
# and wrap it as some connection error which likely will still abort | |
# normal request processihg | |
conn.cancel() | |
# If we allow AbortOnDisconnect bubble up, psycopg2 will close perfectly | |
# healthy connection, so we pass it around through greenlet context | |
_exception_holder.aborted = e | |
class AbortingCursor(psycopg2.extensions.cursor): | |
def execute(self, *args, **kwargs): | |
try: | |
result = super().execute(*args, **kwargs) | |
e = getattr(_exception_holder, 'aborted', None) | |
if e is not None: | |
# cancel() was ineffective | |
# bubbling AbortOnDisconnect up | |
raise e | |
return result | |
except psycopg2.errors.QueryCanceled: | |
e = getattr(_exception_holder, 'aborted', None) | |
if e is not None: | |
# Normal case, connection still healthy, | |
# bubbling AbortOnDisconnect up | |
raise e | |
else: | |
raise | |
class AbortingConnection(psycopg2.extensions.connection): | |
def cursor(self, *args, **kwargs): | |
kwargs.setdefault('cursor_factory', AbortingCursor) | |
return super().cursor(*args, **kwargs) | |
class AbortingGeventWorker(GeventWorker): | |
def handle(self, listener, socket, addr): | |
handle_greenlet = gevent.getcurrent() | |
disconnect_event = gevent.get_hub().loop.io(socket.fileno(), libuv.UV_DISCONNECT) | |
disconnect_event.start(self._handle_disconnect, addr, handle_greenlet, disconnect_event) | |
try: | |
super().handle(listener, socket, addr) | |
finally: | |
disconnect_event.close() | |
def _handle_disconnect(self, addr, handle_greenlet, disconnect_event): | |
self.log.debug('Connection to %s aborted', addr) | |
gevent.kill(handle_greenlet, exception=AbortOnDisconnect) | |
disconnect_event.stop() | |
def handle_request(self, listener_name, req, sock, addr): | |
try: | |
# Calling method on parent of GeventWorker | |
super(GeventWorker).handle_request(listener_name, req, sock, addr) | |
except AbortOnDisconnect: | |
# GeventWorker.handle_request simply ignores GreenletExit | |
# and proceeds to read from socket. | |
# If UV_DISCONNECT was signalled, it will then generate EBADF 'Bad file number' and spam it to logs | |
# On StopIteration it will simply cease any work with socket | |
raise StopIteration() | |
except gevent.GreenletExit: | |
pass | |
except SystemExit: | |
pass | |
_original_is_exit_exception = sqlalchemy.util.is_exit_exception | |
def _is_exit_exception(e): | |
# sqlalchemy just as psycopg2 hurries to close connection on any unknown exception | |
# But since we already certain connection is healthy (otherwise | |
# AbortingCursor.execute would raise another exception, not AbortOnDisconnect) | |
# we are telling sqlalchemy that connection is ok and it should | |
# just bubble AbortOnDisconnect further | |
if type(e) == AbortOnDisconnect: | |
return False | |
return _original_is_exit_exception(e) | |
def run_aborting_gunicorn(): | |
# libev doesn't support DISCONNECT socket events | |
gevent.config.loop = "libuv" | |
# patching sqlalchemy to handle our custom exception | |
sqlalchemy.util.is_exit_exception = _is_exit_exception | |
# configuring gunicorn to use our worker | |
sys.argv.extend(['--worker-class', 'utils.gevent.AbortingGeventWorker']) | |
gunicorn.app.wsgiapp.run() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ... | |
[tool.poetry.scripts] | |
aborting-gunicorn = 'aborting:run_aborting_gunicorn' | |
[tool.poetry.plugins."gevent.plugins.monkey.did_patch_builtins"] | |
patch_psycopg2_for_gevent = 'aborting:patch_psycopg2' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment