Last active
December 22, 2022 22:25
-
-
Save vpzed/90bcc4cc092fe7fa0de92766dd1c2368 to your computer and use it in GitHub Desktop.
Simple asyncpg test script with jsonb field (python 3.6.3, Postgres 10.0, and asyncpg 0.13.0)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Use /usr/bin/env python to support virtual environments | |
# This script is for Python > 3.6 | |
# -*- coding: utf-8 -*- | |
""" | |
Test loading mock initial alliances data into a test table with asyncpg and Postgres | |
""" | |
import datetime | |
import json | |
import asyncio | |
import asyncpg | |
import uvloop | |
# Load configuration variables from json configuration file | |
with open('d2_postgres_config.json', 'r') as cfgfile: | |
CONFIG = json.load(cfgfile) | |
# Setup PostgreSQL client URI | |
POSTGRES_URI = 'postgres://{}:{}@{}:{}/{}'.format(CONFIG['DB']['USER'], CONFIG['DB']['PASSWORD'],\ | |
CONFIG['DB']['HOST'], CONFIG['DB']['PORT'], CONFIG['DB']['DATABASE']) | |
async def main(): | |
""" | |
Try some mock data with with custom jsonb type encoder/decoder | |
""" | |
# Establish a connection to Postgres | |
conn = await asyncpg.connect(POSTGRES_URI) | |
# Build test input record fields | |
a_id = 1 | |
a_name = 'Some Test' | |
a_jsonb_data = [123456] | |
a_last_updated = datetime.datetime.utcnow() | |
try: | |
# Setup jsonb encoder/decoder | |
def _encoder(value): | |
return b'\x01' + json.dumps(value).encode('utf-8') | |
def _decoder(value): | |
return json.loads(value[1:].decode('utf-8')) | |
await conn.set_type_codec('jsonb', encoder=_encoder, decoder=_decoder, schema='pg_catalog', format='binary') | |
# Drop test table | |
await conn.execute('DROP TABLE IF EXISTS t_test_table') | |
# Execute a statement to create a new table. | |
await conn.execute(''' | |
CREATE TABLE t_test_table( | |
id INT4 NOT NULL, | |
name TEXT NOT NULL, | |
jsonb_data JSONB NOT NULL, | |
last_updated TIMESTAMP(0) NOT NULL DEFAULT NOW(), | |
CONSTRAINT pk_t_test_table_id PRIMARY KEY (id) | |
) | |
''') | |
# Insert a record into the created table. | |
await conn.execute(''' | |
INSERT INTO t_test_table(id, name, jsonb_data, last_updated) VALUES($1, $2, $3, $4) | |
''', a_id, a_name, a_jsonb_data, a_last_updated) | |
# Select a row from the table. | |
row = await conn.fetchrow('SELECT * FROM t_test_table WHERE id = $1', a_id) | |
print('----------') | |
print('Returned row: ') | |
print('{}, {}, {}, {}'.format(row['id'], row['name'], row['jsonb_data'], row['last_updated'])) | |
print('----------') | |
finally: | |
# Close the connection. | |
await conn.close() | |
# Run the task | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
asyncio.get_event_loop().run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Script returns: