Created
March 17, 2019 20:44
-
-
Save solyard/e351d22c79829062583243decbb09b4a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncpg | |
import ujson | |
class Transaction: | |
def __init__(self, pool): | |
self.pool = pool | |
async def __aenter__(self): | |
self.acquire_context = self.pool.acquire() | |
con = await self.acquire_context.__aenter__() | |
self.transaction = con.transaction() | |
await self.transaction.__aenter__() | |
return con | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
await self.transaction.__aexit__(exc_type, exc_val, exc_tb) | |
await self.acquire_context.__aexit__(exc_type, exc_val, exc_tb) | |
def __await__(self): | |
return self.__aenter__().__await__() | |
class PostgreSQL: | |
def __init__(self): | |
self.pool = None | |
self.transaction = None | |
async def initialize(self, *args, **kwargs): | |
self.pool = await asyncpg.create_pool(*args, init=self.codecs, **kwargs) | |
self.transaction = lambda trans_kwargs=None: Transaction(self.pool, **(trans_kwargs or {})) | |
def __getattr__(self, item): | |
if item == 'transaction': | |
return getattr(self.transaction, item) | |
return getattr(self.pool, item) | |
@staticmethod | |
async def codecs(conn: asyncpg.Connection): | |
await conn.set_type_codec('json', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog', format='text') | |
await conn.set_type_codec('jsonb', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog', format='text') | |
try: | |
await conn.set_type_codec('zson', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog', format='text') | |
except Exception: | |
pass | |
db = PostgreSQL() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment