Skip to content

Instantly share code, notes, and snippets.

@elliottcordo
Last active August 29, 2015 14:07
Show Gist options
  • Save elliottcordo/76512e3f5c921cb657ac to your computer and use it in GitHub Desktop.
Save elliottcordo/76512e3f5c921cb657ac to your computer and use it in GitHub Desktop.
sql alchemy schema migration
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy import create_engine, MetaData, Table, schema, Text, Index, select, func
to_db="mysql+pymysql://root@localhost/test"
#to_db="postgres://admin:your_redshift_cluster/pwd"
from_db="mysql+pymysql://root@localhost/test"
def make_session(connection_string):
engine = create_engine(connection_string, echo=False, convert_unicode=True)
Session = sessionmaker(bind=engine)
return Session(), engine
source, sengine = make_session(from_db)
smeta = MetaData(bind=sengine)
destination, dengine = make_session(to_db)
smeta.reflect(sengine, only=['dnb_customer','dnb_sold_to','dnb_ship_to'])
for t in smeta.sorted_tables:
table = Table(t,smeta,autoload=True)
#print select([table, func.current_date()]).execute()
print table.name
#make some changes to target schema
table.name = 'tmp_' + table.name
table.schema = 'ab'
etl_date = schema.Column('etl_date',Text)
table.append_column(etl_date)
print table.schema+'.'+table.name
table.indexes.clear()
table.foreign_keys.clear()
#print select([table, func.current_date()]).execute()
table.drop(dengine, checkfirst=True)
table.create(dengine, checkfirst=True)
destination.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment