Last active
November 7, 2017 20:13
-
-
Save addam/d1aa628decbd87b0ec77ba4f676bbb9b to your computer and use it in GitHub Desktop.
Copy data from a posgre SQL server to another using asyncpg
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import asyncio | |
import asyncpg | |
from time import time | |
try: | |
import uvloop | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
except: | |
pass # it will just be slower, no harm done | |
async def await_save(query, block, pool): | |
async with pool.acquire() as conn: | |
await conn.executemany(query, block) | |
async def process_db(reader_params, reader_query, writer_params, writer_query, block_size=100, block_count=10): | |
loop = asyncio.get_event_loop() | |
total_count = 0 | |
reader = await asyncpg.connect(*reader_params) | |
writers = await asyncpg.create_pool(*writer_params) | |
pending = set() | |
async with reader.transaction(): | |
cursor = await reader.cursor(reader_query) | |
while 1: | |
while len(pending) > block_count: | |
completed, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) | |
total_count += len(completed) | |
if total_count % 100 < len(completed): | |
print(total_count * block_size, "rows") | |
block = await cursor.fetch(block_size) | |
if not block: | |
break | |
pending.add(loop.create_task(await_save(writer_query, block, writers))) | |
await asyncio.wait(pending) | |
params = { | |
"reader_params": {host="1.1.1.1", port=..., database=..., user=..., password=...}, | |
"writer_params": {host="2.2.2.2", port=..., database=..., user=..., password=...}, | |
"reader_query": "select column_a, column_b from table where column_c = 42;", | |
"writer_query": "insert into copied_table (col_a, col_b) values ($1, $2);", | |
} | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(process_db(**params)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment