Created
August 3, 2012 04:35
-
-
Save gree2/3244413 to your computer and use it in GitHub Desktop.
Python: CmpMssqlDb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
import pymssql | |
db1Conn = ['127.0.0.1', 'sa', 'sa','AuditDataCenter'] | |
db2Conn = ['127.0.0.1', 'sa', 'sa','AuditDataCenter1'] | |
sql = "SELECT * from INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'" | |
def MssqlTables(dbConn): | |
conn = pymssql.connect(host=dbConn[0], user=dbConn[1], password=dbConn[2], database=dbConn[3]) | |
cur = conn.cursor() | |
tables = [] | |
cur.execute(sql) | |
row = cur.fetchone() | |
while row: | |
tables.append(row[2]) | |
row = cur.fetchone() | |
cur.close() | |
conn.close() | |
tables.sort() | |
return tables | |
def MssqlTables2Dict(tables): | |
dic = {} | |
for table in tables: | |
dic[table] = table | |
return dic | |
def MssqlTablesCmp(): | |
table1 = MssqlTables(db1Conn) | |
table2 = MssqlTables(db2Conn) | |
table2Dic = MssqlTables2Dict(table2) | |
count = 0 | |
for table in table1: | |
if table not in table2Dic: | |
count += 1 | |
print str(count) + " no table " + table | |
def MssqlTableSchema(dbConn, table): | |
conn = pymssql.connect(host=dbConn[0], user=dbConn[1], password=dbConn[2], database=dbConn[3]) | |
cur = conn.cursor() | |
sql = """SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH AS CSIZE, | |
COLUMN_DEFAULT, IS_NULLABLE, (columnproperty(object_id(TABLE_NAME), COLUMN_NAME, 'IsIdentity')) AS [IDENT] | |
FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = """ | |
sql += "'" + table + "' ORDER BY ORDINAL_POSITION ASC" | |
cur.execute(sql) | |
row = cur.fetchone() | |
cols = [] | |
while row: | |
col = [] | |
##Name Type Size Default Nullable IDent | |
col.append(row[0]) | |
col.append(row[1]) | |
col.append(row[2]) | |
col.append(row[3]) | |
col.append(row[4]) | |
col.append(row[5]) | |
cols.append(col) | |
row = cur.fetchone() | |
cur.close() | |
conn.close() | |
cols.sort() | |
return cols | |
def MssqlColumns2Dict(cols): | |
dic = {} | |
for col in cols: | |
dic[col[0]] = col | |
return dic | |
def MssqlTableColumnsCmp(table, cols1, cols2): | |
cols1Dic = MssqlColumns2Dict(cols1) | |
cols2Dic = MssqlColumns2Dict(cols2) | |
infos = [] | |
for col in cols1: | |
colName = str(col[0]) | |
colType = str(col[1]) | |
colSize = str(col[2]) | |
if col[0] not in cols2Dic: | |
infos.append(table + "\t" + colName + "\t" + colType + "\t" + colSize) | |
if len(infos) == 0: | |
return | |
for info in infos: | |
print info | |
def MssqlTablesColumnsCmp(): | |
table1 = MssqlTables(db1Conn) | |
table2 = MssqlTables(db2Conn) | |
table2Dic = MssqlTables2Dict(table2) | |
count = 0 | |
for table in table1: | |
if table in table2Dic: | |
table1Cols = MssqlTableSchema(db1Conn, table) | |
table2Cols = MssqlTableSchema(db2Conn, table) | |
MssqlTableColumnsCmp(table, table1Cols, table2Cols) | |
def MssqlDbCmp(): | |
MssqlTablesCmp() | |
MssqlTablesColumnsCmp() | |
MssqlDbCmp() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment