Created
December 18, 2010 08:10
-
-
Save mdornseif/746297 to your computer and use it in GitHub Desktop.
Eine AS/400 Datenbank auf PostgreSQL spiegeln
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Das Python-Setup auf balancer ist beinahe | |
# totally fucked up | |
# /usr/local/bin/ liegt im PATH vor /usr/bin, | |
# aber mit dem von Hand installierten Python | |
# dort klappt die Geschichte nicht... | |
# Datenbank als Unicode erzeugen: | |
# $ createdb softmmirror --encoding=UNICODE -O softm -e | |
# CREATE DATABASE softmmirror OWNER softm ENCODING 'UNICODE'; | |
import datetime | |
import pyodbc | |
import optparse | |
import psycopg2 | |
import sys | |
import time | |
from husoftm.fields import MAPPINGDIR | |
from decimal import Decimal | |
def execute(cursor, statement): | |
return cursor.execute(statement) | |
def list_all_tables(source): | |
cursor = source.cursor() | |
execute(cursor, "SELECT TABLE_NAME FROM QSYS2.tables WHERE TABLE_TYPE = 'BASE TABLE' and TABLE_SCHEMA='SMKDIFP'") | |
ret = [] | |
while True: | |
row = cursor.fetchone() | |
if not row: | |
break | |
ret.append(row[0].strip()) | |
ret.sort() | |
return ret | |
def create_table(source, destination, tablename): | |
"""Create Table""" | |
cursor = source.cursor() | |
cursor.execute("SELECT * FROM %s FETCH FIRST 1 ROW ONLY" % tablename) | |
row = cursor.fetchone() | |
if not row: | |
print "No rows for table %s" % tablename, | |
return False | |
columns = [] | |
for (name, type_code,display_size, internal_size, precision, scale, null_ok) in row.cursor_description: | |
column = name | |
if type_code == str: | |
column += " VARCHAR(%d)" % internal_size | |
elif type_code == Decimal: | |
column += " DECIMAL(%d, %d)" % (precision, scale) | |
elif type_code == datetime.date: | |
column += " DATE" | |
elif type_code == datetime.datetime: | |
column += " DATETIME" | |
else: | |
raise RuntimeError("what's about %s" % type_code) | |
if not null_ok: | |
column += " NOT NULL" | |
columns.append(column) | |
cursor = destination.cursor() | |
execute(cursor, "SELECT COUNT(*) FROM pg_tables WHERE schemaname='public' AND tablename = '%s_tmp'" % tablename) | |
row = cursor.fetchone() | |
if row[0] > 0: | |
execute(cursor, "DROP TABLE %s_tmp" % (tablename,)) | |
stmt = "CREATE TABLE %s_tmp (%s)" % (tablename, ", ".join(columns)) | |
execute(cursor, stmt) | |
return True | |
def fixstr(x): | |
if isinstance(x, basestring): | |
return x.decode('latin1') # .rstrip() | |
else: | |
return x | |
def transfer_data(source, destination, tablename, waittime): | |
"""Transfer data from DB2/400 to PostgreSQL""" | |
# see http://www.depesz.com/index.php/2007/07/05/how-to-insert-data-to-database-as-fast-as-possible/ | |
start = time.time() | |
src_cursor = source.cursor() | |
dst_cursor = destination.cursor() | |
execute(src_cursor, "SELECT COUNT(*) FROM %s" % tablename) | |
rowcount = src_cursor.fetchone()[0] | |
execute(dst_cursor, "BEGIN WORK") | |
execute(dst_cursor, "LOCK TABLE %s_tmp" % tablename) | |
columns = [] | |
counter = 0 | |
insertparams = [] | |
execute(src_cursor, "SELECT * FROM %s" % tablename) | |
while True: | |
row = src_cursor.fetchone() | |
time.sleep(waittime) | |
if not row: | |
break | |
if not columns: | |
columns = [desc[0] for desc in row.cursor_description] | |
print "%d columns, %d rows" % (len(columns), rowcount) | |
stmt = "INSERT INTO %s_tmp (%s) VALUES (%s)" % (tablename, ", ".join(columns), ",".join(["%s"] * len(columns))) | |
row = [fixstr(x) for x in row] | |
#print stmt, row | |
insertparams.append(row) | |
if counter % 250 == 0: | |
dst_cursor.executemany(stmt, insertparams) | |
insertparams = [] | |
print '\r%s: %s rows, %.1f rows/s ' % (tablename, counter, counter / (time.time() - start)), | |
sys.stdout.flush() | |
counter += 1 | |
execute(dst_cursor, "COMMIT WORK") | |
execute(dst_cursor, "VACUUM ANALYZE %s_tmp" % tablename) | |
def main(): | |
parser = optparse.OptionParser() | |
parser.add_option('-s', '--schema', action='store_true', default=False) | |
parser.add_option('-w', '--wait', type="int", default=0) | |
options, args = parser.parse_args() | |
db2conn = pyodbc.connect('DSN=HD400') | |
tables = list_all_tables(db2conn) | |
#tables = MAPPINGDIR.keys() | |
for tablename in sorted(tables, reverse=False): | |
tablename = tablename.lower() | |
if not tablename.isalnum(): | |
continue | |
pgconn = psycopg2.connect(host="postgresql.local.hudora.biz", database="softmmirror", | |
user="softm", password="heringssalat") | |
pgconn.set_client_encoding('UNICODE') | |
print tablename, | |
if options.schema: | |
try: | |
if not create_table(db2conn, pgconn, tablename): | |
print " leer" | |
continue | |
except psycopg2.ProgrammingError: | |
print " ERROR" | |
continue | |
transfer_data(db2conn, pgconn, tablename, options.wait/1000.0) | |
pgconn.commit() | |
emtytrash = False | |
dst_cursor = pgconn.cursor() | |
execute(dst_cursor, "BEGIN") | |
execute(dst_cursor, "SELECT COUNT(*) FROM pg_tables WHERE schemaname='public' AND tablename = '%s'" % tablename) | |
row = dst_cursor.fetchone() | |
if row[0] > 0: | |
emtytrash = True | |
execute(dst_cursor, "ALTER TABLE %s RENAME TO %s_garbage" % (tablename, tablename)) | |
execute(dst_cursor, "ALTER TABLE %s_tmp RENAME TO %s" % (tablename, tablename)) | |
if emtytrash: | |
execute(dst_cursor, "DROP TABLE %s_garbage" % (tablename,)) | |
execute(dst_cursor, "COMMIT") | |
pgconn.commit() | |
pgconn.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment