Created
July 20, 2019 04:50
-
-
Save ktmud/3659f5b9de74cef16f79ea79066e0f6e to your computer and use it in GitHub Desktop.
Use SQLAlchemy to move data between servers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 -W ignore::DeprecationWarning | |
# -*- coding: utf-8 -*- | |
""" | |
Sync GIS data from a PostgreSQL server to another | |
""" | |
import os | |
import click | |
from io import StringIO | |
from sqlalchemy import create_engine | |
from tqdm import tqdm | |
from .models import metadata | |
from .settings import GIS_DB_URL | |
@bp.cli.command('dbsync') | |
@click.option('-s', '--source', default=GIS_DB_URL, help='Source database url') | |
@click.option('-f', '--force', default=False, is_flag=True, help='Force drop and recreate table') | |
@click.argument('input-table') | |
@click.argument('output-table') | |
def import_gisdata(input_table, output_table, source, force): | |
"""Import and override a table from source to dest""" | |
source = create_engine(source) | |
input_tables = input_table.split(',') | |
output_tables = output_table.split(',') | |
if len(input_tables) != len(output_tables): | |
print('Number of input and output tables do not match.') | |
return | |
for inp, out in zip(input_tables, output_tables): | |
inp, out = inp.strip(), out.strip() | |
result = source.execute(f'SELECT count(1) FROM {inp}') | |
total = result.first()[0] | |
print(f'Importing {total} rows from {inp} to {out}...') | |
table = metadata.tables[out] | |
if table.exists(): | |
if not force: | |
ans = input(f'Re-create dest table `{out}`? [Y]/N: ').lower() or 'y' | |
if ans[0] != 'y': | |
return | |
table.drop() | |
table.create() | |
else: | |
table.create() | |
source_cursor = source.raw_connection().cursor() | |
dest_cursor = table.bind.raw_connection().cursor() | |
limit = 2000 # read 2000 rows at a time | |
offset = 0 | |
with tqdm(total=total) as pbar: | |
while offset < total - limit: | |
fp = StringIO() | |
query = f'(SELECT * FROM "{inp}" LIMIT {limit} OFFSET {offset})' | |
source_cursor.copy_to(fp, query) | |
fp.seek(0) | |
dest_cursor.copy_from(fp, f'"{out}"') | |
offset += limit | |
pbar.update(limit) | |
dest_cursor.connection.commit() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment