Skip to content

Instantly share code, notes, and snippets.

@solyard
Created March 17, 2019 20:44
Show Gist options
  • Save solyard/e351d22c79829062583243decbb09b4a to your computer and use it in GitHub Desktop.
Save solyard/e351d22c79829062583243decbb09b4a to your computer and use it in GitHub Desktop.
import asyncpg
import ujson
class Transaction:
def __init__(self, pool):
self.pool = pool
async def __aenter__(self):
self.acquire_context = self.pool.acquire()
con = await self.acquire_context.__aenter__()
self.transaction = con.transaction()
await self.transaction.__aenter__()
return con
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.transaction.__aexit__(exc_type, exc_val, exc_tb)
await self.acquire_context.__aexit__(exc_type, exc_val, exc_tb)
def __await__(self):
return self.__aenter__().__await__()
class PostgreSQL:
def __init__(self):
self.pool = None
self.transaction = None
async def initialize(self, *args, **kwargs):
self.pool = await asyncpg.create_pool(*args, init=self.codecs, **kwargs)
self.transaction = lambda trans_kwargs=None: Transaction(self.pool, **(trans_kwargs or {}))
def __getattr__(self, item):
if item == 'transaction':
return getattr(self.transaction, item)
return getattr(self.pool, item)
@staticmethod
async def codecs(conn: asyncpg.Connection):
await conn.set_type_codec('json', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog', format='text')
await conn.set_type_codec('jsonb', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog', format='text')
try:
await conn.set_type_codec('zson', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog', format='text')
except Exception:
pass
db = PostgreSQL()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment