Skip to content

Instantly share code, notes, and snippets.

@sloev
Last active May 3, 2021 19:07
Show Gist options
  • Select an option

  • Save sloev/20a421b9c8084d02b9a379b5d69a61e5 to your computer and use it in GitHub Desktop.

Select an option

Save sloev/20a421b9c8084d02b9a379b5d69a61e5 to your computer and use it in GitHub Desktop.
asyncio sqlalchemy mysql
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from contextlib import closing, contextmanager, ExitStack
from sqlalchemy.sql import text
import asyncio
import logging
import time
class AsyncMysqlDatabase:
def __init__(self, dsn, executor=None, hooks=None, pool_size=10):
self.engine = create_engine(
dsn, poolclass=QueuePool, pool_size=pool_size, max_overflow=0
)
self.executor = executor
self.hooks = hooks or [self.logging_hook]
@property
def loop(self):
return asyncio.get_event_loop()
async def connect(self, loop=None):
(loop or self.loop).run_in_executor(self.executor, self.engine.connect)
async def dispose(self, loop=None):
(loop or self.loop).run_in_executor(self.executor, self.engine.dispose)
def sync_fetch_all(self, query, params):
with self._hook_scope(query=query, params=params):
with closing(self.engine.execute(text(query), **params)) as results:
results = [dict(result) for result in results]
return results
def sync_fetch_one(self, query, params):
with self._hook_scope(query=query, params=params):
with closing(self.engine.execute(text(query), **params)) as result:
result = result.fetchone()
if result is not None:
result = dict(result)
return result
async def fetch_all(self, query, params, loop=None):
results = await (loop or self.loop).run_in_executor(
None, self.sync_fetch_all, query, params
)
return results
async def fetch_one(self, query, params, default=None, loop=None):
result = await (loop or self.loop).run_in_executor(
None, self.sync_fetch_one, query, params
)
if default is not None:
result = result or {}
result = {k: result.get(k) or default[k] for k in default}
return result
@contextmanager
def logging_hook(self, query, params, **kwargs_dump):
start = time.time()
try:
yield
except:
took = (time.time() - start) * 1000.0
logging.exception(
{
"message": "Error running mysql query",
"query": query,
"params": params,
"took_ms": took,
}
)
raise
else:
took = (time.time() - start) * 1000.0
logging.debug(
{
"message": "Success running mysql query",
"query": query,
"params": params,
"took_ms": took,
}
)
@contextmanager
def _hook_scope(self, **kwargs):
with ExitStack() as cm:
for hook in self.hooks:
cm.enter_context(hook(**kwargs))
yield

use mysql from asyncio

You might think its better to use some "real asyncio mysql driver" like aiomysql but ive had nothing but trouble with it, plus its not really maintained anymore

instal

you need to install (for debian:) default-libmysqlclient-dev and the requirements listed above

so whats this?

it uses

  • sqlalchemy for pooling and query templating
  • mysqlclient which uses the native c lib underneath
  • and it relies on the installed mysql lib installed on the same host/container etc
sqlalchemy1.3.17
mysqlclient1.4.6
import logging
logging.basicConfig(level=logging.DEBUG)
import asyncio
from async_mysql import AsyncMysqlDatabase
query = """
SELECT *
FROM something
where
something_else = :this
limit 100;
"""
params = {"this": "lolcat"}
async def main():
futures = []
db = AsyncMysqlDatabase(dsn)
loop = asyncio.get_event_loop()
await db.connect()
for i in range(60):
futures.append(loop.create_task(db.fetch_all(query, params)))
for i in range(60, 120):
futures.append(loop.create_task(db.fetch_one(query, params)))
for index, task in enumerate(asyncio.as_completed(futures)):
await task
asyncio.get_event_loop().run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment