Skip to content

Instantly share code, notes, and snippets.

@ktmud
Created July 20, 2019 04:50
Show Gist options
  • Save ktmud/3659f5b9de74cef16f79ea79066e0f6e to your computer and use it in GitHub Desktop.
Save ktmud/3659f5b9de74cef16f79ea79066e0f6e to your computer and use it in GitHub Desktop.
Use SQLAlchemy to move data between servers
#!/usr/bin/env python3 -W ignore::DeprecationWarning
# -*- coding: utf-8 -*-
"""
Sync GIS data from a PostgreSQL server to another
"""
import os
import click
from io import StringIO
from sqlalchemy import create_engine
from tqdm import tqdm
from .models import metadata
from .settings import GIS_DB_URL
@bp.cli.command('dbsync')
@click.option('-s', '--source', default=GIS_DB_URL, help='Source database url')
@click.option('-f', '--force', default=False, is_flag=True, help='Force drop and recreate table')
@click.argument('input-table')
@click.argument('output-table')
def import_gisdata(input_table, output_table, source, force):
"""Import and override a table from source to dest"""
source = create_engine(source)
input_tables = input_table.split(',')
output_tables = output_table.split(',')
if len(input_tables) != len(output_tables):
print('Number of input and output tables do not match.')
return
for inp, out in zip(input_tables, output_tables):
inp, out = inp.strip(), out.strip()
result = source.execute(f'SELECT count(1) FROM {inp}')
total = result.first()[0]
print(f'Importing {total} rows from {inp} to {out}...')
table = metadata.tables[out]
if table.exists():
if not force:
ans = input(f'Re-create dest table `{out}`? [Y]/N: ').lower() or 'y'
if ans[0] != 'y':
return
table.drop()
table.create()
else:
table.create()
source_cursor = source.raw_connection().cursor()
dest_cursor = table.bind.raw_connection().cursor()
limit = 2000 # read 2000 rows at a time
offset = 0
with tqdm(total=total) as pbar:
while offset < total - limit:
fp = StringIO()
query = f'(SELECT * FROM "{inp}" LIMIT {limit} OFFSET {offset})'
source_cursor.copy_to(fp, query)
fp.seek(0)
dest_cursor.copy_from(fp, f'"{out}"')
offset += limit
pbar.update(limit)
dest_cursor.connection.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment