Created
September 21, 2023 16:07
-
-
Save matriv/05bec849ff166789d0378625e43c06b0 to your computer and use it in GitHub Desktop.
Asyncpg - float[]
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
About | |
===== | |
Test cases for CrateDB using `asyncpg`. | |
Usage | |
===== | |
Normally, this will be executed through the main Python test suite | |
triggered through the toplevel `Jenkinsfile`. | |
However, to run the `asyncpg` tests only, for example on a CrateDB | |
instance already provided through Docker, there's an alternative | |
option which goes along like this:: | |
# Run CrateDB. | |
docker run -it --rm --publish=5432:5432 crate/crate:nightly | |
# Run test suite. | |
export CRATEDB_URI=postgres://crate@localhost:5432/doc | |
python -m unittest discover -vvvf -s tests/client_tests -k test_asyncpg | |
""" | |
import os | |
import asyncpg | |
import unittest | |
from crate.qa.tests import NodeProvider | |
async def basic_queries(test, conn): | |
await conn.execute("DROP TABLE IF EXISTS t1") | |
result = await conn.execute( | |
"CREATE TABLE t1 (x int primary key, y int)") | |
test.assertEqual(result, 'CREATE 1') | |
result = await conn.execute('INSERT INTO t1 (x) VALUES (?)', 1) | |
test.assertEqual(result, 'INSERT 0 1') | |
result = await conn.execute('REFRESH TABLE t1') | |
test.assertEqual(result, 'REFRESH 1') | |
result = await conn.execute('UPDATE t1 SET y = ?', 2) | |
test.assertEqual(result, 'UPDATE 1') | |
result = await conn.execute('REFRESH TABLE t1') | |
test.assertEqual(result, 'REFRESH 1') | |
result = await conn.execute('DELETE FROM t1 WHERE y = ?', 2) | |
test.assertEqual(result, 'DELETE 1') | |
result = await conn.execute(''' | |
INSERT INTO t1 (x) ( | |
SELECT unnest FROM unnest([1, 2]) WHERE unnest = ? | |
) | |
''', 1) | |
test.assertEqual(result, 'INSERT 0 1') | |
result = await conn.execute(''' | |
INSERT INTO t1 (x) ( | |
SELECT unnest FROM unnest([1, 2]) WHERE unnest = ?) | |
ON CONFLICT (x) DO UPDATE SET y = ? | |
''', 1, 2) | |
test.assertEqual(result, 'INSERT 0 1') | |
async def record_type_can_be_read_using_binary_streaming(test, conn): | |
result = await conn.fetch('SELECT pg_catalog.pg_get_keywords()') | |
keyword = sorted([row[0] for row in result], key=lambda x: x[0])[0] | |
test.assertEqual(keyword, ('absolute', 'U', 'unreserved')) | |
async def bitstring_can_be_inserted_and_selected_using_binary_encoding(test, conn): | |
xs = asyncpg.BitString('0101') | |
await conn.execute('drop table if exists tbl_bit') | |
await conn.execute('create table tbl_bit (xs bit(4))') | |
await conn.execute('insert into tbl_bit (xs) values (?)', xs) | |
await conn.execute('refresh table tbl_bit') | |
result = await conn.fetch('select xs from tbl_bit') | |
test.assertEqual(result[0][0], xs) | |
async def float_vector_can_be_inserted_and_selected(test, conn): | |
fv = [1.1, 2.2, 3.3, 4.4] | |
await conn.execute('drop table if exists tbl_fv') | |
await conn.execute('create table tbl_fv (id int, fv float[])') | |
await conn.execute('insert into tbl_fv (id, fv) values (1, [1.1, 2.2, 3.3, 4.4])') | |
await conn.execute('insert into tbl_fv (id, fv) values (2, null)') | |
await conn.execute('refresh table tbl_fv') | |
result = await conn.fetch('select * from tbl_fv order by id') | |
test.assertEqual(result[0][1], fv) | |
test.assertNone(result[1][1]) | |
async def fetch_summits(test, uri): | |
conn = await asyncpg.connect(uri) | |
async with conn.transaction(): | |
cur = await conn.cursor( | |
'select mountain from sys.summits order by height desc') | |
first, second = await cur.fetch(2) | |
third, fourth = await cur.fetch(2) | |
test.assertEqual(first['mountain'], 'Mont Blanc') | |
test.assertEqual(second['mountain'], 'Monte Rosa') | |
test.assertEqual(third['mountain'], 'Dom') | |
test.assertEqual(fourth['mountain'], 'Liskamm') | |
await conn.close() | |
async def exec_queries_pooled(test, uri): | |
pool = await asyncpg.create_pool(uri) | |
async with pool.acquire() as conn: | |
await basic_queries(test, conn) | |
await record_type_can_be_read_using_binary_streaming(test, conn) | |
await bitstring_can_be_inserted_and_selected_using_binary_encoding(test, conn) | |
await float_vector_can_be_inserted_and_selected(test, conn) | |
await pool.close() | |
class AsyncpgTestCase(NodeProvider, unittest.IsolatedAsyncioTestCase): | |
def ensure_cratedb(self): | |
if "CRATEDB_URI" in os.environ: | |
crate_psql_url = os.environ["CRATEDB_URI"] | |
else: | |
(node, _) = self._new_node(self.CRATE_VERSION) | |
node.start() | |
psql_addr = node.addresses.psql | |
crate_address = f'{psql_addr.host}:{psql_addr.port}' | |
crate_psql_url = f'postgres://crate@{crate_address}/doc' | |
return crate_psql_url | |
def setUp(self): | |
super().setUp() | |
self.crate_psql_url = self.ensure_cratedb() | |
async def test_basic_statements(self): | |
await exec_queries_pooled(self, self.crate_psql_url) | |
async def test_result_streaming_using_fetch_size(self): | |
await fetch_summits(self, self.crate_psql_url) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment