-
-
Save zzzeek/6287e28054d3baddc07fa21a7227904e to your computer and use it in GitHub Desktop.
"""Proof of concept of an adapter that will wrap a blocking IO library in | |
greenlets, so that it can be invoked by asyncio code and itself make calls out | |
to an asyncio database library. | |
hint: any Python ORM or database abstraction tool written against the DBAPI | |
could in theory allow asyncio round trips to a pure async DB driver like | |
asyncpg. | |
The approach here seems too simple to be true and I did not expect it to | |
collapse down to something this minimal, so it is very possible I am totally | |
missing something here. it's 1 AM and maybe I'm staring at it for too long | |
but it works? | |
note this is not using gevent or eventlet, we don't actually need those here | |
as we are using regular asyncio networking. It uses only the underlying | |
greenlet library which allows us to context switch without the | |
"await" keyword or other asyncio syntax. | |
""" | |
import asyncio | |
import contextvars | |
import sys | |
import asyncpg | |
import greenlet | |
class AsyncConnection: | |
"""An asyncio database connection class. | |
This is a pure asyncio class that has to be run in an | |
explicit async event loop. | |
Uses asyncpg. | |
""" | |
@classmethod | |
async def create(self, **kw): | |
conn = AsyncConnection.__new__(AsyncConnection) | |
conn.conn = await asyncpg.connect(**kw) | |
return conn | |
async def execute(self, statement): | |
await self.conn.execute(statement) | |
async def fetch(self, statement): | |
return await self.conn.fetch(statement) | |
async def close(self): | |
await self.conn.close() | |
class SyncConnection: | |
"""A synchronous database connection class. | |
This class mirrors the asyncpg API above however does not use asyncio at | |
all, all methods are written in blocking style. | |
This class is fairly analogous to the Connection in SQLAlchemy or other | |
ORMs. For a more DBAPI/SQLAlcheny-like approach we'd need to make more of | |
a "cursor" effect from asyncpg and make sure it starts a transaction, etc. | |
but in general this would be the SQLAlchemy Connection class, or the ORM | |
Session (which in SQLAlchemy 2.0 is heavily based on Session.execute()). | |
""" | |
@classmethod | |
def create(self, impl, **kw): | |
conn = SyncConnection.__new__(SyncConnection) | |
conn.conn = impl.create(**kw) | |
return conn | |
def execute(self, statement): | |
self.conn.execute(statement) | |
def fetch(self, statement): | |
return self.conn.fetch(statement) | |
def close(self): | |
self.conn.close() | |
class SyncAdaptImpl: | |
"""An adapter that translates between the async and sync versions | |
of the Connection. | |
This class can likely be generated programmatically for a larger | |
scale application as this is basically "add the async keyword to | |
each function call" boilerplate. | |
""" | |
@classmethod | |
def create(cls, **kw): | |
conn = SyncAdaptImpl.__new__(SyncAdaptImpl) | |
async def async_create(**kw): | |
return await AsyncConnection.create( | |
user="scott", | |
password="tiger", | |
host="localhost", | |
database="test", | |
) | |
conn.async_conn = greenlet_runner.run_in_asyncio(async_create(**kw)) | |
return conn | |
def execute(self, statement): | |
async def execute(statement): | |
await self.async_conn.execute(statement) | |
return greenlet_runner.run_in_asyncio(execute(statement)) | |
def fetch(self, statement): | |
async def fetch(statement): | |
return await self.async_conn.fetch(statement) | |
return greenlet_runner.run_in_asyncio(fetch(statement)) | |
def close(self): | |
async def close(): | |
await self.async_conn.close() | |
return greenlet_runner.run_in_asyncio(close()) | |
def im_a_greenlet(): | |
"""our greenlet-style function. | |
All the code here is written against the SyncConnection in blocking style | |
and this code knows nothing about asyncio. It is written for blocking | |
database connections, however we will run it in a greenlet container that | |
allows it to talk to asyncio database connections. | |
This would apply towards all the major chunks of the ORM that would | |
be insane to rewrite just to have "async" keywords everywhere, namely | |
the logic used by the unit of work to flush objects, and possibly | |
code like eager loaders as that run inline with loading as well. | |
""" | |
conn = SyncConnection.create( | |
SyncAdaptImpl, | |
user="scott", | |
password="tiger", | |
host="localhost", | |
database="test", | |
) | |
conn.execute("drop table if exists mytable") | |
conn.execute( | |
"create table if not exists " | |
"mytable (id serial primary key, data varchar)" | |
) | |
conn.execute("insert into mytable(data) values ('even more data')") | |
result = conn.fetch("select id, data from mytable") | |
conn.close() | |
return result | |
class GreenletExecutor: | |
"""Greenlet exector that wires up greenlet.switch() and await calls | |
together. | |
I'm sort of not really sure why this even works and I have not tested | |
this under real concurrency, this seems to simple to be true actually, | |
I first wrote this using a more complicated approach with two separate | |
event loops, but I'd prefer to have things run as a straight shot. | |
If this effect is actually this simple and there's no spooky weird | |
thing that is going to happen from adding stackless Python context | |
switches into async / await, this would be kind of amazing? | |
""" | |
current_return = contextvars.ContextVar("current greenlet context") | |
def run_in_asyncio(self, coroutine): | |
return self.current_return.get().switch(coroutine) | |
async def run_greenlet(self, fn): | |
result_future = asyncio.Future() | |
def run_greenlet_target(): | |
result_future.set_result(fn()) | |
return None | |
async def run_greenlet(): | |
gl = greenlet.greenlet(run_greenlet_target) | |
greenlet_coroutine = gl.switch() | |
while greenlet_coroutine is not None: | |
task = asyncio.create_task(greenlet_coroutine) | |
try: | |
await task | |
except: | |
# this allows an exception to be raised within | |
# the moderated greenlet so that it can continue | |
# its expected flow. | |
greenlet_coroutine = gl.throw(*sys.exc_info()) | |
else: | |
greenlet_coroutine = gl.switch(task.result()) | |
current_return = greenlet.greenlet(run_greenlet) | |
self.current_return.set(current_return) | |
await current_return.switch() | |
return result_future.result() | |
greenlet_runner = GreenletExecutor() | |
if __name__ == "__main__": | |
# finally here's our program, written as asyncio. we want to use | |
# asyncio to write our app, but be able to call into a library | |
# that knows nothing about asyncio, so we will adapt it to run | |
# as a greenlet. the trick is that the database library ultimately | |
# used is also asyncio, so we don't need gevent or eventlet networking | |
# or even their event loops, only the greenlet construct. | |
loop = asyncio.get_event_loop() | |
async def run_our_asyncio_program(): | |
retval = await greenlet_runner.run_greenlet(im_a_greenlet) | |
print(retval) | |
loop.run_until_complete(run_our_asyncio_program()) |
Aye, I had the same Gevent discussion in mind. FWIW, aiogevent
would not work on a modern CPython version without major refactoring (it's even trying to call asyncio.async
which is not quite the correct syntax anymore...).
I just wanted to use asyncio and not have to use async/await keywords within a middle library.
Indeed, that's one of the major annoyances for reusing code in libraries supporting the both worlds. If I were to decide, the idea should have not been rejected from the standard lib in the first place 👿 (I understand Guido's argument against reentrance and the resulting uncontrolled recursion, but the interpreter itself could offer support for switching contexts).
Your solution is admittedly really elegant in its simplicity 👍 Are you planning to release it as a separate microlibrary?
oh that's a great link, I have a vague notion that Guido has rejected all the very reasonable approaches such that asyncio doesnt force all library authors everywhere to rewrite everything fron scratch, but this seems to be where it actually happened.
re: microlibrary, I think at the moment this would be a utility wihtin SQLAlchemy itself but I don't have an appetite to maintain /release a separate package. Also considering "greenback" already does this I wouldn't want to compete with that.
that said, consider this code to be public domain and go nuts with it.
there are libraries like that, e.g. to allow operability between asyncio and gevent, here are the links that I found for that kind of thing which helped me to come up with this approach:
top -level gevent discussion: gevent/gevent#982
clone of @vstinner 's aiogevent: https://github.com/2mf/aiogevent
tulipcore: https://github.com/decentfox/tulipcore
The important thing to note is that gevent has its own event loop. So you either have to get the asyncio event loop in there (what tulipcore does) or you have to pass messages between two event loops which is probably not very performant at high volume (basically this is what threadpoolexecutor has to do also).
Above, I had no interest in gevent's event loop or any of its networking functions, I just wanted to use asyncio and not have to use async/await keywords within a middle library. Someone has actually done this already, albeit in a more complicated style that tries to make it a bit more transparent than how I am doing it, this is at https://github.com/oremanj/greenback .