-
-
Save h0rn3t/624eb809ae0b58a0a7ebaa92f55c116a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncpg | |
import ujson | |
class Transaction: | |
def __init__(self, pool): | |
self.pool = pool | |
async def __aenter__(self): | |
self.acquire_context = self.pool.acquire() | |
con = await self.acquire_context.__aenter__() | |
self.transaction = con.transaction() | |
await self.transaction.__aenter__() | |
return con | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
await self.transaction.__aexit__(exc_type, exc_val, exc_tb) | |
await self.acquire_context.__aexit__(exc_type, exc_val, exc_tb) | |
def __await__(self): | |
return self.__aenter__().__await__() | |
class PostgreSQL: | |
def __init__(self): | |
self.pool = None | |
self.transaction = None | |
async def initialize(self, *args, **kwargs): | |
self.pool = await asyncpg.create_pool(*args, init=self.codecs, **kwargs) | |
self.transaction = lambda trans_kwargs=None: Transaction(self.pool, **(trans_kwargs or {})) | |
def __getattr__(self, item): | |
if item == 'transaction': | |
return getattr(self.transaction, item) | |
return getattr(self.pool, item) | |
@staticmethod | |
async def codecs(conn: asyncpg.Connection): | |
await conn.set_type_codec('json', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog', format='text') | |
await conn.set_type_codec('jsonb', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog', format='text') | |
try: | |
await conn.set_type_codec('zson', encoder=ujson.dumps, decoder=ujson.loads, schema='pg_catalog', format='text') | |
except Exception: | |
pass | |
db = PostgreSQL() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment