Last active
November 8, 2021 09:40
-
-
Save 0x0L/3d55d47f6329eb0d7e7a46d1c895b5be to your computer and use it in GitHub Desktop.
asyncpg / apache arrow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "ae35c501-093f-4d82-9ac5-bfb3a2caee9e", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"%load_ext cython" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "65099ae5-0f04-48dd-89c7-330af2aec8be", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import asyncpg\n", | |
"import psycopg\n", | |
"import pandas as pd\n", | |
"import pyarrow as pa" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "4806649a-39b2-4af1-b6de-407ab5ce69a2", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"dsn = 'postgres://postgres:postgres@localhost:55000/usda'\n", | |
"conn = await asyncpg.connect(dsn)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"id": "8fbf95c1-e3b8-467d-9f00-0b4a99a3de8b", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"async def get_pandas(sql):\n", | |
" z = await conn.fetch(sql)\n", | |
" df = pd.DataFrame.from_records(z, columns=list(z[0].keys()))\n", | |
" return df" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"id": "b181e23c-88b4-4a4a-9e82-7f4e453bad0d", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>schemaname</th>\n", | |
" <th>tablename</th>\n", | |
" <th>tableowner</th>\n", | |
" <th>tablespace</th>\n", | |
" <th>hasindexes</th>\n", | |
" <th>hasrules</th>\n", | |
" <th>hastriggers</th>\n", | |
" <th>rowsecurity</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>public</td>\n", | |
" <td>data_src</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>1</th>\n", | |
" <td>public</td>\n", | |
" <td>datsrcln</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>2</th>\n", | |
" <td>public</td>\n", | |
" <td>nut_data</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>3</th>\n", | |
" <td>public</td>\n", | |
" <td>fd_group</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4</th>\n", | |
" <td>public</td>\n", | |
" <td>food_des</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>5</th>\n", | |
" <td>public</td>\n", | |
" <td>footnote</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>6</th>\n", | |
" <td>public</td>\n", | |
" <td>nutr_def</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>7</th>\n", | |
" <td>public</td>\n", | |
" <td>deriv_cd</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>8</th>\n", | |
" <td>public</td>\n", | |
" <td>src_cd</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>9</th>\n", | |
" <td>public</td>\n", | |
" <td>weight</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>10</th>\n", | |
" <td>public</td>\n", | |
" <td>minute_bars</td>\n", | |
" <td>postgres</td>\n", | |
" <td>None</td>\n", | |
" <td>False</td>\n", | |
" <td>False</td>\n", | |
" <td>False</td>\n", | |
" <td>False</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" schemaname tablename tableowner tablespace hasindexes hasrules \\\n", | |
"0 public data_src postgres None True False \n", | |
"1 public datsrcln postgres None True False \n", | |
"2 public nut_data postgres None True False \n", | |
"3 public fd_group postgres None True False \n", | |
"4 public food_des postgres None True False \n", | |
"5 public footnote postgres None True False \n", | |
"6 public nutr_def postgres None True False \n", | |
"7 public deriv_cd postgres None True False \n", | |
"8 public src_cd postgres None True False \n", | |
"9 public weight postgres None True False \n", | |
"10 public minute_bars postgres None False False \n", | |
"\n", | |
" hastriggers rowsecurity \n", | |
"0 True False \n", | |
"1 True False \n", | |
"2 True False \n", | |
"3 True False \n", | |
"4 True False \n", | |
"5 True False \n", | |
"6 True False \n", | |
"7 True False \n", | |
"8 True False \n", | |
"9 True False \n", | |
"10 False False " | |
] | |
}, | |
"execution_count": 5, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"df = await get_pandas(\"select * from pg_catalog.pg_tables where schemaname = 'public'\")\n", | |
"df" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"id": "8e8d373b-0a83-4a72-9576-db2d402d5422", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"<Record count=4464100>\n" | |
] | |
}, | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>ordinal_position</th>\n", | |
" <th>column_name</th>\n", | |
" <th>column_default</th>\n", | |
" <th>is_nullable</th>\n", | |
" <th>udt_name</th>\n", | |
" <th>data_type</th>\n", | |
" <th>character_maximum_length</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>1</td>\n", | |
" <td>timestamp</td>\n", | |
" <td>None</td>\n", | |
" <td>YES</td>\n", | |
" <td>timestamp</td>\n", | |
" <td>timestamp without time zone</td>\n", | |
" <td>None</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>1</th>\n", | |
" <td>2</td>\n", | |
" <td>symbol</td>\n", | |
" <td>None</td>\n", | |
" <td>YES</td>\n", | |
" <td>int4</td>\n", | |
" <td>integer</td>\n", | |
" <td>None</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>2</th>\n", | |
" <td>3</td>\n", | |
" <td>open_price</td>\n", | |
" <td>None</td>\n", | |
" <td>YES</td>\n", | |
" <td>float4</td>\n", | |
" <td>real</td>\n", | |
" <td>None</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>3</th>\n", | |
" <td>4</td>\n", | |
" <td>high_price</td>\n", | |
" <td>None</td>\n", | |
" <td>YES</td>\n", | |
" <td>float4</td>\n", | |
" <td>real</td>\n", | |
" <td>None</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4</th>\n", | |
" <td>5</td>\n", | |
" <td>low_price</td>\n", | |
" <td>None</td>\n", | |
" <td>YES</td>\n", | |
" <td>float4</td>\n", | |
" <td>real</td>\n", | |
" <td>None</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>5</th>\n", | |
" <td>6</td>\n", | |
" <td>close_price</td>\n", | |
" <td>None</td>\n", | |
" <td>YES</td>\n", | |
" <td>float4</td>\n", | |
" <td>real</td>\n", | |
" <td>None</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>6</th>\n", | |
" <td>7</td>\n", | |
" <td>volume</td>\n", | |
" <td>None</td>\n", | |
" <td>YES</td>\n", | |
" <td>int4</td>\n", | |
" <td>integer</td>\n", | |
" <td>None</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" ordinal_position column_name column_default is_nullable udt_name \\\n", | |
"0 1 timestamp None YES timestamp \n", | |
"1 2 symbol None YES int4 \n", | |
"2 3 open_price None YES float4 \n", | |
"3 4 high_price None YES float4 \n", | |
"4 5 low_price None YES float4 \n", | |
"5 6 close_price None YES float4 \n", | |
"6 7 volume None YES int4 \n", | |
"\n", | |
" data_type character_maximum_length \n", | |
"0 timestamp without time zone None \n", | |
"1 integer None \n", | |
"2 real None \n", | |
"3 real None \n", | |
"4 real None \n", | |
"5 real None \n", | |
"6 integer None " | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"tbl = 'minute_bars'\n", | |
"\n", | |
"c = await conn.fetchrow(f'select count(*) from {tbl}')\n", | |
"print(c)\n", | |
"\n", | |
"df = await get_pandas(f\"select ordinal_position, column_name, column_default, is_nullable, udt_name, data_type, character_maximum_length from information_schema.columns where table_name = '{tbl}'\")\n", | |
"df" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"id": "9b8fbe25-c64d-4188-b138-cac2828a3708", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"column_name\n", | |
"timestamp timestamp\n", | |
"symbol int4\n", | |
"open_price float4\n", | |
"high_price float4\n", | |
"low_price float4\n", | |
"close_price float4\n", | |
"volume int4\n", | |
"Name: udt_name, dtype: object" | |
] | |
}, | |
"execution_count": 7, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"types = df.set_index('column_name').udt_name\n", | |
"types" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"id": "a80c10c1-0745-4f49-ba1e-eac3ce84db03", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"_type_map = {\n", | |
" 'bpchar': pa.string(),\n", | |
" 'text': pa.string(),\n", | |
" 'int4': pa.int32(),\n", | |
" 'float8': pa.float64(),\n", | |
" 'float4': pa.float32(),\n", | |
" 'timestamp': pa.timestamp('us')\n", | |
"}\n", | |
"\n", | |
"_builder_map = {\n", | |
" 'bpchar': 'string',\n", | |
" 'text': 'string',\n", | |
" 'int4': 'int32',\n", | |
" 'float8': 'float64',\n", | |
" 'float4': 'float32',\n", | |
" 'timestamp': 'timestamp'\n", | |
"}" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"id": "f6bdd0c1-3b1f-4fef-9c69-3b38c5bd8814", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"%%cython -3 -+ -c=-std=c++11 -c=-Wno-unused-variable --link-args=-std=c++11 -I/Users/xav/.condaenv/pq/lib/python3.10/site-packages/numpy/core/include\n", | |
"\n", | |
"cimport cython\n", | |
"\n", | |
"from libc.stdint cimport int16_t, int32_t, uint16_t, uint32_t, int64_t, uint64_t\n", | |
"\n", | |
"from libcpp.memory cimport shared_ptr\n", | |
"from libcpp.vector cimport vector\n", | |
"\n", | |
"from pyarrow.lib cimport *\n", | |
"\n", | |
"cdef extern from \"/Users/xav/src/asyncpg/asyncpg/pgproto/hton.h\":\n", | |
" cdef int16_t unpack_int16(const char *buf);\n", | |
" cdef uint16_t unpack_uint16(const char *buf);\n", | |
" cdef int32_t unpack_int32(const char *buf);\n", | |
" cdef uint32_t unpack_uint32(const char *buf);\n", | |
" cdef int64_t unpack_int64(const char *buf);\n", | |
" cdef uint64_t unpack_uint64(const char *buf);\n", | |
" cdef float unpack_float(const char *buf);\n", | |
" cdef double unpack_double(const char *buf);\n", | |
"\n", | |
"\n", | |
"cdef enum ArrayBuilderType:\n", | |
" CStringBuilderType = 1\n", | |
" CInt32BuilderType = 2\n", | |
" CDoubleBuilderType = 3\n", | |
" CFloatBuilderType = 4\n", | |
" CTimestampBuilderType = 5\n", | |
"\n", | |
"\n", | |
"cdef class Pg2Arrow:\n", | |
" cdef vector[shared_ptr[CArrayBuilder]] c_builders\n", | |
" cdef vector[ArrayBuilderType] c_types\n", | |
"\n", | |
" def __cinit__(self, types):\n", | |
" cdef CMemoryPool* pool = c_default_memory_pool()\n", | |
" cdef shared_ptr[CDataType] typ\n", | |
" for t in types:\n", | |
" if t == 'string':\n", | |
" self.c_builders.push_back(shared_ptr[CArrayBuilder](new CStringBuilder(pool)))\n", | |
" self.c_types.push_back(CStringBuilderType)\n", | |
" elif t == 'int32':\n", | |
" self.c_builders.push_back(shared_ptr[CArrayBuilder](new CInt32Builder(pool)))\n", | |
" self.c_types.push_back(CInt32BuilderType)\n", | |
" elif t == 'float64':\n", | |
" self.c_builders.push_back(shared_ptr[CArrayBuilder](new CDoubleBuilder(pool)))\n", | |
" self.c_types.push_back(CDoubleBuilderType)\n", | |
" elif t == 'float32':\n", | |
" self.c_builders.push_back(shared_ptr[CArrayBuilder](new CFloatBuilder(pool)))\n", | |
" self.c_types.push_back(CFloatBuilderType)\n", | |
" elif t == 'timestamp':\n", | |
" typ = shared_ptr[CDataType](new CTimestampType(TimeUnit_MICRO))\n", | |
" self.c_builders.push_back(shared_ptr[CArrayBuilder](new CTimestampBuilder(typ, pool)))\n", | |
" self.c_types.push_back(CTimestampBuilderType)\n", | |
" else:\n", | |
" self.c_builders.push_back(shared_ptr[CArrayBuilder](new CStringBuilder(pool)))\n", | |
" self.c_types.push_back(CStringBuilderType)\n", | |
"\n", | |
" @cython.boundscheck(False)\n", | |
" def process(self, char[:] buffer):\n", | |
" cdef ArrayBuilderType ftyp\n", | |
" cdef int16_t fnum\n", | |
" cdef int32_t flen\n", | |
" while buffer.size > 0:\n", | |
" fnum = unpack_int16(&buffer[0])\n", | |
" buffer = buffer[2:]\n", | |
" if fnum == -1:\n", | |
" break\n", | |
"\n", | |
" for i in range(self.c_builders.size()):\n", | |
" ftyp = self.c_types[i]\n", | |
" flen = unpack_int32(&buffer[0])\n", | |
" buffer = buffer[4:]\n", | |
"\n", | |
" if flen == -1:\n", | |
" self.c_builders[i].get().AppendNull()\n", | |
" else:\n", | |
" if ftyp == CStringBuilderType:\n", | |
" (<CStringBuilder*>self.c_builders[i].get()).Append(&buffer[0], flen)\n", | |
" elif ftyp == CInt32BuilderType:\n", | |
" (<CInt32Builder*>self.c_builders[i].get()).Append(unpack_int32(&buffer[0]))\n", | |
" elif ftyp == CDoubleBuilderType:\n", | |
" (<CDoubleBuilder*>self.c_builders[i].get()).Append(unpack_double(&buffer[0]))\n", | |
" elif ftyp == CFloatBuilderType:\n", | |
" (<CFloatBuilder*>self.c_builders[i].get()).Append(unpack_float(&buffer[0]))\n", | |
" elif ftyp == CTimestampBuilderType:\n", | |
" (<CTimestampBuilder*>self.c_builders[i].get()).Append(unpack_int64(&buffer[0]) + <int64_t>946684800000000)\n", | |
" buffer = buffer[flen:]\n", | |
"\n", | |
" arrays = []\n", | |
" cdef shared_ptr[CArray] array\n", | |
" for i in range(self.c_builders.size()):\n", | |
" self.c_builders[i].get().Finish(&array)\n", | |
" arrays.append(pyarrow_wrap_array(array))\n", | |
"\n", | |
" return arrays " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "c92993a0-f873-441b-8106-e0b453fc3b75", | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"id": "bf5a272c-af8c-480c-bfe7-dae4c043d422", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"async def copy(table):\n", | |
" buffers = []\n", | |
" async def sink(buffer):\n", | |
" buffers.append(buffer)\n", | |
"\n", | |
" await conn.copy_from_table(table, output=sink, format='binary')\n", | |
" return buffers" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"id": "fb8afcf0-66b0-4cad-bc38-93882003602b", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"async def copy_to_arrow(sql_table, types):\n", | |
" schema = pa.schema(types.map(lambda x: _type_map.get(x, pa.string())).to_dict())\n", | |
" builder_types = types.map(lambda x: _builder_map.get(x, 'string')).to_list()\n", | |
" _sink = Pg2Arrow(builder_types)\n", | |
" \n", | |
" tables = []\n", | |
" skip_header = True\n", | |
" async def sink(buffer):\n", | |
" nonlocal skip_header\n", | |
" if skip_header:\n", | |
" buffer = buffer[19:]\n", | |
" skip_header = False\n", | |
" tbl = pa.table(_sink.process(buffer), schema=schema)\n", | |
" tables.append(tbl)\n", | |
"\n", | |
" await conn.copy_from_table(sql_table, output=sink, format='binary')\n", | |
" return pa.concat_tables(tables)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"id": "c075cbc8-fdc9-4ffa-b33d-04abf4e0451a", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import asyncio\n", | |
"\n", | |
"async def test(coro, times=30, cooldown=10):\n", | |
" timings = []\n", | |
" for i in range(times):\n", | |
" await asyncio.sleep(cooldown)\n", | |
" t = pd.to_datetime('now')\n", | |
" z = await coro()\n", | |
" timings.append(pd.to_datetime('now') - t)\n", | |
" return z, timings" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"id": "68bf6a48-88c4-4a37-815b-7411138abbde", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"tests = {\n", | |
" 'copy': lambda: copy(tbl),\n", | |
" 'fetch': lambda: conn.fetch(f'select * from {tbl}'),\n", | |
" 'get_pandas': lambda: get_pandas(f'select * from {tbl}'),\n", | |
" 'copy_to_arrow': lambda: copy_to_arrow(tbl, types)\n", | |
"}" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"id": "6184e0eb-1cd9-478f-ac9c-d5c21b0276ac", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"testing copy\n", | |
"testing fetch\n", | |
"testing get_pandas\n", | |
"testing copy_to_arrow\n" | |
] | |
} | |
], | |
"source": [ | |
"results = {}\n", | |
"timings = {}\n", | |
"for name, t in tests.items():\n", | |
" print('testing', name)\n", | |
" results[name], timings[name] = await test(t)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"id": "3a846c49-5807-4e89-a432-2c35221d5d71", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"timings = pd.DataFrame(timings).apply(lambda x: x.dt.total_seconds())" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"id": "3dc0b927-1904-4d61-890a-985379d267cc", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"<AxesSubplot:>" | |
] | |
}, | |
"execution_count": 16, | |
"metadata": {}, | |
"output_type": "execute_result" | |
}, | |
{ | |
"data": { | |
"image/png": "\n", | |
"text/plain": [ | |
"<Figure size 432x288 with 1 Axes>" | |
] | |
}, | |
"metadata": { | |
"needs_background": "light" | |
}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"import seaborn as sns\n", | |
"sns.violinplot(data=timings)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "99dad90e-a716-4b2b-a1fa-76611bcb7b22", | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"id": "942ffcf8-5cd6-4b11-b5b2-6c68eec3621c", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"276.774221" | |
] | |
}, | |
"execution_count": 17, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"sum(len(q) for q in results[\"copy\"]) / 1e6" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 18, | |
"id": "cd69e589-47a4-4c7e-8cf4-00134e0a3981", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"4.13 s ± 3.26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n", | |
"30.7 ms ± 878 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)\n" | |
] | |
} | |
], | |
"source": [ | |
"# for reference\n", | |
"%timeit pd.DataFrame.from_records(results[\"fetch\"], columns=list(results[\"fetch\"][0].keys()))\n", | |
"%timeit results[\"copy_to_arrow\"].to_pandas()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 19, | |
"id": "7768dcca-6934-4637-8070-df517edae750", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>timestamp</th>\n", | |
" <th>symbol</th>\n", | |
" <th>open_price</th>\n", | |
" <th>high_price</th>\n", | |
" <th>low_price</th>\n", | |
" <th>close_price</th>\n", | |
" <th>volume</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>2000-01-01</td>\n", | |
" <td>0</td>\n", | |
" <td>-0.835820</td>\n", | |
" <td>-0.746893</td>\n", | |
" <td>0.294243</td>\n", | |
" <td>0.332640</td>\n", | |
" <td>832154</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>1</th>\n", | |
" <td>2000-01-01</td>\n", | |
" <td>1</td>\n", | |
" <td>0.893081</td>\n", | |
" <td>0.985754</td>\n", | |
" <td>1.159500</td>\n", | |
" <td>-1.943980</td>\n", | |
" <td>650086</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>2</th>\n", | |
" <td>2000-01-01</td>\n", | |
" <td>2</td>\n", | |
" <td>0.800148</td>\n", | |
" <td>0.253886</td>\n", | |
" <td>-1.370295</td>\n", | |
" <td>-1.400043</td>\n", | |
" <td>685743</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>3</th>\n", | |
" <td>2000-01-01</td>\n", | |
" <td>3</td>\n", | |
" <td>-1.261819</td>\n", | |
" <td>-0.128649</td>\n", | |
" <td>1.117490</td>\n", | |
" <td>-0.258251</td>\n", | |
" <td>659479</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4</th>\n", | |
" <td>2000-01-01</td>\n", | |
" <td>4</td>\n", | |
" <td>-1.649070</td>\n", | |
" <td>0.551472</td>\n", | |
" <td>1.702268</td>\n", | |
" <td>-0.966815</td>\n", | |
" <td>584522</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>...</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4464095</th>\n", | |
" <td>2000-02-01</td>\n", | |
" <td>95</td>\n", | |
" <td>0.357868</td>\n", | |
" <td>-1.557160</td>\n", | |
" <td>-0.242268</td>\n", | |
" <td>1.096502</td>\n", | |
" <td>836040</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4464096</th>\n", | |
" <td>2000-02-01</td>\n", | |
" <td>96</td>\n", | |
" <td>0.844888</td>\n", | |
" <td>0.779728</td>\n", | |
" <td>0.611882</td>\n", | |
" <td>-0.089665</td>\n", | |
" <td>713316</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4464097</th>\n", | |
" <td>2000-02-01</td>\n", | |
" <td>97</td>\n", | |
" <td>-2.449516</td>\n", | |
" <td>0.784922</td>\n", | |
" <td>0.302922</td>\n", | |
" <td>1.179399</td>\n", | |
" <td>782881</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4464098</th>\n", | |
" <td>2000-02-01</td>\n", | |
" <td>98</td>\n", | |
" <td>-0.546269</td>\n", | |
" <td>-0.023306</td>\n", | |
" <td>0.107929</td>\n", | |
" <td>-0.947063</td>\n", | |
" <td>461618</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4464099</th>\n", | |
" <td>2000-02-01</td>\n", | |
" <td>99</td>\n", | |
" <td>-1.737921</td>\n", | |
" <td>0.297596</td>\n", | |
" <td>-1.050883</td>\n", | |
" <td>0.137020</td>\n", | |
" <td>774423</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"<p>4464100 rows × 7 columns</p>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" timestamp symbol open_price high_price low_price close_price \\\n", | |
"0 2000-01-01 0 -0.835820 -0.746893 0.294243 0.332640 \n", | |
"1 2000-01-01 1 0.893081 0.985754 1.159500 -1.943980 \n", | |
"2 2000-01-01 2 0.800148 0.253886 -1.370295 -1.400043 \n", | |
"3 2000-01-01 3 -1.261819 -0.128649 1.117490 -0.258251 \n", | |
"4 2000-01-01 4 -1.649070 0.551472 1.702268 -0.966815 \n", | |
"... ... ... ... ... ... ... \n", | |
"4464095 2000-02-01 95 0.357868 -1.557160 -0.242268 1.096502 \n", | |
"4464096 2000-02-01 96 0.844888 0.779728 0.611882 -0.089665 \n", | |
"4464097 2000-02-01 97 -2.449516 0.784922 0.302922 1.179399 \n", | |
"4464098 2000-02-01 98 -0.546269 -0.023306 0.107929 -0.947063 \n", | |
"4464099 2000-02-01 99 -1.737921 0.297596 -1.050883 0.137020 \n", | |
"\n", | |
" volume \n", | |
"0 832154 \n", | |
"1 650086 \n", | |
"2 685743 \n", | |
"3 659479 \n", | |
"4 584522 \n", | |
"... ... \n", | |
"4464095 836040 \n", | |
"4464096 713316 \n", | |
"4464097 782881 \n", | |
"4464098 461618 \n", | |
"4464099 774423 \n", | |
"\n", | |
"[4464100 rows x 7 columns]" | |
] | |
}, | |
"execution_count": 19, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"results[\"copy_to_arrow\"].to_pandas()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 20, | |
"id": "71ef11ef-f1b4-4893-8b70-f346aaae9859", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>timestamp</th>\n", | |
" <th>symbol</th>\n", | |
" <th>open_price</th>\n", | |
" <th>high_price</th>\n", | |
" <th>low_price</th>\n", | |
" <th>close_price</th>\n", | |
" <th>volume</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>2000-01-01</td>\n", | |
" <td>0</td>\n", | |
" <td>-0.835820</td>\n", | |
" <td>-0.746893</td>\n", | |
" <td>0.294243</td>\n", | |
" <td>0.332640</td>\n", | |
" <td>832154</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>1</th>\n", | |
" <td>2000-01-01</td>\n", | |
" <td>1</td>\n", | |
" <td>0.893081</td>\n", | |
" <td>0.985754</td>\n", | |
" <td>1.159500</td>\n", | |
" <td>-1.943980</td>\n", | |
" <td>650086</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>2</th>\n", | |
" <td>2000-01-01</td>\n", | |
" <td>2</td>\n", | |
" <td>0.800148</td>\n", | |
" <td>0.253886</td>\n", | |
" <td>-1.370295</td>\n", | |
" <td>-1.400043</td>\n", | |
" <td>685743</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>3</th>\n", | |
" <td>2000-01-01</td>\n", | |
" <td>3</td>\n", | |
" <td>-1.261819</td>\n", | |
" <td>-0.128649</td>\n", | |
" <td>1.117490</td>\n", | |
" <td>-0.258251</td>\n", | |
" <td>659479</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4</th>\n", | |
" <td>2000-01-01</td>\n", | |
" <td>4</td>\n", | |
" <td>-1.649070</td>\n", | |
" <td>0.551472</td>\n", | |
" <td>1.702268</td>\n", | |
" <td>-0.966815</td>\n", | |
" <td>584522</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>...</th>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" <td>...</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4464095</th>\n", | |
" <td>2000-02-01</td>\n", | |
" <td>95</td>\n", | |
" <td>0.357868</td>\n", | |
" <td>-1.557160</td>\n", | |
" <td>-0.242268</td>\n", | |
" <td>1.096502</td>\n", | |
" <td>836040</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4464096</th>\n", | |
" <td>2000-02-01</td>\n", | |
" <td>96</td>\n", | |
" <td>0.844888</td>\n", | |
" <td>0.779728</td>\n", | |
" <td>0.611882</td>\n", | |
" <td>-0.089665</td>\n", | |
" <td>713316</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4464097</th>\n", | |
" <td>2000-02-01</td>\n", | |
" <td>97</td>\n", | |
" <td>-2.449516</td>\n", | |
" <td>0.784922</td>\n", | |
" <td>0.302922</td>\n", | |
" <td>1.179399</td>\n", | |
" <td>782881</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4464098</th>\n", | |
" <td>2000-02-01</td>\n", | |
" <td>98</td>\n", | |
" <td>-0.546269</td>\n", | |
" <td>-0.023306</td>\n", | |
" <td>0.107929</td>\n", | |
" <td>-0.947063</td>\n", | |
" <td>461618</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4464099</th>\n", | |
" <td>2000-02-01</td>\n", | |
" <td>99</td>\n", | |
" <td>-1.737921</td>\n", | |
" <td>0.297596</td>\n", | |
" <td>-1.050883</td>\n", | |
" <td>0.137020</td>\n", | |
" <td>774423</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"<p>4464100 rows × 7 columns</p>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" timestamp symbol open_price high_price low_price close_price \\\n", | |
"0 2000-01-01 0 -0.835820 -0.746893 0.294243 0.332640 \n", | |
"1 2000-01-01 1 0.893081 0.985754 1.159500 -1.943980 \n", | |
"2 2000-01-01 2 0.800148 0.253886 -1.370295 -1.400043 \n", | |
"3 2000-01-01 3 -1.261819 -0.128649 1.117490 -0.258251 \n", | |
"4 2000-01-01 4 -1.649070 0.551472 1.702268 -0.966815 \n", | |
"... ... ... ... ... ... ... \n", | |
"4464095 2000-02-01 95 0.357868 -1.557160 -0.242268 1.096502 \n", | |
"4464096 2000-02-01 96 0.844888 0.779728 0.611882 -0.089665 \n", | |
"4464097 2000-02-01 97 -2.449516 0.784922 0.302922 1.179399 \n", | |
"4464098 2000-02-01 98 -0.546269 -0.023306 0.107929 -0.947063 \n", | |
"4464099 2000-02-01 99 -1.737921 0.297596 -1.050883 0.137020 \n", | |
"\n", | |
" volume \n", | |
"0 832154 \n", | |
"1 650086 \n", | |
"2 685743 \n", | |
"3 659479 \n", | |
"4 584522 \n", | |
"... ... \n", | |
"4464095 836040 \n", | |
"4464096 713316 \n", | |
"4464097 782881 \n", | |
"4464098 461618 \n", | |
"4464099 774423 \n", | |
"\n", | |
"[4464100 rows x 7 columns]" | |
] | |
}, | |
"execution_count": 20, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"results[\"get_pandas\"]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"id": "bc9bcecd-d643-4561-a6ce-0ed2c482f239", | |
"metadata": {}, | |
"outputs": [], | |
"source": [] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 21, | |
"id": "34078e6f-f76c-4d77-a5b6-f6fb33663bb8", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# import numpy as np\n", | |
"\n", | |
"# r = \"\"\"\n", | |
"# DROP TABLE IF EXISTS minute_bars\n", | |
"# \"\"\"\n", | |
"\n", | |
"# await conn.execute(r)\n", | |
"\n", | |
"# r = \"\"\"\n", | |
"# CREATE TABLE minute_bars\n", | |
"# (\n", | |
"# timestamp timestamp without time zone,\n", | |
"# symbol integer,\n", | |
"# open_price real,\n", | |
"# high_price real,\n", | |
"# low_price real,\n", | |
"# close_price real,\n", | |
"# volume integer\n", | |
"# )\n", | |
"# \"\"\"\n", | |
"\n", | |
"# await conn.execute(r)\n", | |
"\n", | |
"# times = pd.date_range('2000', '2000-02-01', freq='1min')\n", | |
"# symbols = list(range(100))\n", | |
"\n", | |
"# records = [\n", | |
"# (t, s, *np.random.randn(4).tolist(), np.random.randint(0, 1_000_000))\n", | |
"# for t in times\n", | |
"# for s in symbols\n", | |
"# ]\n", | |
"\n", | |
"# await conn.copy_records_to_table('minute_bars', records=records)" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.10.0" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment