Last active
March 22, 2022 10:04
-
-
Save mebaysan/c32470d8abc222d68db313d8c9a3c178 to your computer and use it in GitHub Desktop.
Python Script to Create Database Map of MsSQL Server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
!pip install pymssql | |
!pip install pandas | |
!pip install numpy | |
import pandas as pd | |
import numpy as np | |
import pymssql | |
def create_server_map(CONN): | |
# create base connection to get all databases on server | |
con = pymssql.connect( | |
server=CONN['server'], | |
port=CONN['port'], | |
user=CONN['user'], | |
password=CONN['password']) | |
# const to store all databases on server | |
DATABASES = pd.read_sql(con=con,sql="SELECT name, database_id, create_date FROM sys.databases") | |
# const to store all schemas on server | |
SCHEMAS = pd.DataFrame({'database_id':[],'schema_name':[],'schema_id':[]}) | |
# const to store all tables on server | |
TABLES = pd.DataFrame({'database_id':[],'schema_id':[],'table_name':[]}) | |
# const to store all columns on server | |
COLUMNS = pd.DataFrame({'database_id':[],'schema_id':[],'table_name':[],'column_id':[],'column_name':[],'column_type':[]}) | |
for index, row in DATABASES.iterrows(): | |
try: | |
# CREATE CONNECTION | |
dummy_con = pymssql.connect(server=CONN['server'],port=CONN['port'],user=CONN['user'],password=CONN['password'],database=row['name']) | |
print('SUCCESS CONNECTION for SCHEMAS: ', row['name']) | |
# SCHEMAS | |
dummy_schemas = pd.read_sql(con=dummy_con,sql="select * from sys.schemas") | |
dummy_SCHEMAS = pd.DataFrame({'database_id':row['database_id'],'schema_name':dummy_schemas['name'],'schema_id':dummy_schemas['schema_id']}) | |
SCHEMAS=pd.concat([SCHEMAS,dummy_SCHEMAS]) | |
# TABLES | |
dummy_tables = pd.read_sql(con=dummy_con,sql="select * from sys.all_objects where type_desc like '%TABLE%' and type = 'U' order by name asc") | |
dummy_TABLES = pd.DataFrame({'database_id':row['database_id'],'schema_id':dummy_tables['schema_id'],'table_name':dummy_tables['name']}) | |
TABLES = pd.concat([TABLES,dummy_TABLES]) | |
# COLUMNS | |
dummy_columns = pd.read_sql(con=dummy_con,sql="""select tab.schema_id ,schema_name(tab.schema_id) as schema_name, | |
tab.name as table_name, | |
col.column_id, | |
col.name as column_name, | |
t.name as data_type, | |
col.max_length, | |
col.precision | |
from sys.tables as tab | |
inner join sys.columns as col | |
on tab.object_id = col.object_id | |
left join sys.types as t | |
on col.user_type_id = t.user_type_id | |
order by schema_name, | |
table_name, column_id""") | |
dummy_COLUMNS = pd.DataFrame({'database_id':row['database_id'],'schema_id':dummy_columns['schema_id'],'table_name':dummy_columns['table_name'],'column_id':dummy_columns['column_id'], | |
'column_name':dummy_columns['column_name'],'column_type':dummy_columns['data_type']}) | |
COLUMNS = pd.concat([COLUMNS,dummy_COLUMNS]) | |
print('SUCCESS CONCATENATION for : ', row['name']) | |
except: | |
print('FAILED CONNECTION for SCHEMAS: ', row['name']) | |
return DATABASES, SCHEMAS,TABLES,COLUMNS | |
def merge_server_map(DATABASES, SCHEMAS,TABLES,COLUMNS): | |
db_sc = pd.merge(DATABASES,SCHEMAS) | |
db_sc_tb = pd.merge(db_sc,TABLES) | |
data = pd.merge(db_sc_tb,COLUMNS) | |
return data | |
CONN = {'server':'','port':'1433','user':'','password':''} | |
DATABASES, SCHEMAS,TABLES,COLUMNS = create_server_map(CONN) | |
data = merge_server_map(DATABASES, SCHEMAS,TABLES,COLUMNS) | |
# NEW VERSION | |
def create_server_map_new(CONN): | |
# create base connection to get all databases on server | |
con = pymssql.connect( | |
server=CONN['server'], | |
port=CONN['port'], | |
user=CONN['user'], | |
password=CONN['password']) | |
# const to store all databases on server | |
DATABASES = pd.read_sql(con=con,sql="SELECT name, database_id, create_date FROM sys.databases") | |
# const to store all columns and tables (included views) on server | |
MAP = pd.DataFrame({ | |
'DATABASE':[],'SCHEMA':[],'TABLE_NAME':[], | |
'TABLE_TYPE':[],'COLUMN_NAME':[],'COLUMN_TYPE':[] | |
}) | |
MAP_QUERY = """ | |
select | |
c.TABLE_CATALOG, c.TABLE_SCHEMA, c.COLUMN_NAME, c.DATA_TYPE, t.TABLE_NAME, t.TABLE_TYPE | |
from INFORMATION_SCHEMA.COLUMNS c | |
left join INFORMATION_SCHEMA.TABLES t on c.TABLE_NAME = t.TABLE_NAME | |
""" | |
for index, row in DATABASES.iterrows(): | |
try: | |
# CREATE CONNECTION | |
dummy_con = pymssql.connect(server=CONN['server'],port=CONN['port'],user=CONN['user'],password=CONN['password'],database=row['name']) | |
print('SUCCESS CONNECTION for SCHEMAS: ', row['name']) | |
# MAP | |
dummy_map = pd.read_sql(con=dummy_con, sql=MAP_QUERY) | |
dummy_MAP = pd.DataFrame({ | |
'DATABASE':dummy_map['TABLE_CATALOG'], | |
'SCHEMA':dummy_map['TABLE_SCHEMA'], | |
'TABLE_NAME':dummy_map['TABLE_NAME'], | |
'TABLE_TYPE':dummy_map['TABLE_TYPE'], | |
'COLUMN_NAME':dummy_map['COLUMN_NAME'], | |
'COLUMN_TYPE':dummy_map['DATA_TYPE'] | |
}) | |
MAP=pd.concat([MAP,dummy_MAP]) | |
print('SUCCESS CONCATENATION for : ', row['name']) | |
except: | |
print('FAILED CONNECTION for SCHEMAS: ', row['name']) | |
return MAP |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment