Skip to content

Instantly share code, notes, and snippets.

@mebaysan
Last active March 22, 2022 10:04
Show Gist options
  • Save mebaysan/c32470d8abc222d68db313d8c9a3c178 to your computer and use it in GitHub Desktop.
Save mebaysan/c32470d8abc222d68db313d8c9a3c178 to your computer and use it in GitHub Desktop.
Python Script to Create Database Map of MsSQL Server
!pip install pymssql
!pip install pandas
!pip install numpy
import pandas as pd
import numpy as np
import pymssql
def create_server_map(CONN):
# create base connection to get all databases on server
con = pymssql.connect(
server=CONN['server'],
port=CONN['port'],
user=CONN['user'],
password=CONN['password'])
# const to store all databases on server
DATABASES = pd.read_sql(con=con,sql="SELECT name, database_id, create_date FROM sys.databases")
# const to store all schemas on server
SCHEMAS = pd.DataFrame({'database_id':[],'schema_name':[],'schema_id':[]})
# const to store all tables on server
TABLES = pd.DataFrame({'database_id':[],'schema_id':[],'table_name':[]})
# const to store all columns on server
COLUMNS = pd.DataFrame({'database_id':[],'schema_id':[],'table_name':[],'column_id':[],'column_name':[],'column_type':[]})
for index, row in DATABASES.iterrows():
try:
# CREATE CONNECTION
dummy_con = pymssql.connect(server=CONN['server'],port=CONN['port'],user=CONN['user'],password=CONN['password'],database=row['name'])
print('SUCCESS CONNECTION for SCHEMAS: ', row['name'])
# SCHEMAS
dummy_schemas = pd.read_sql(con=dummy_con,sql="select * from sys.schemas")
dummy_SCHEMAS = pd.DataFrame({'database_id':row['database_id'],'schema_name':dummy_schemas['name'],'schema_id':dummy_schemas['schema_id']})
SCHEMAS=pd.concat([SCHEMAS,dummy_SCHEMAS])
# TABLES
dummy_tables = pd.read_sql(con=dummy_con,sql="select * from sys.all_objects where type_desc like '%TABLE%' and type = 'U' order by name asc")
dummy_TABLES = pd.DataFrame({'database_id':row['database_id'],'schema_id':dummy_tables['schema_id'],'table_name':dummy_tables['name']})
TABLES = pd.concat([TABLES,dummy_TABLES])
# COLUMNS
dummy_columns = pd.read_sql(con=dummy_con,sql="""select tab.schema_id ,schema_name(tab.schema_id) as schema_name,
tab.name as table_name,
col.column_id,
col.name as column_name,
t.name as data_type,
col.max_length,
col.precision
from sys.tables as tab
inner join sys.columns as col
on tab.object_id = col.object_id
left join sys.types as t
on col.user_type_id = t.user_type_id
order by schema_name,
table_name, column_id""")
dummy_COLUMNS = pd.DataFrame({'database_id':row['database_id'],'schema_id':dummy_columns['schema_id'],'table_name':dummy_columns['table_name'],'column_id':dummy_columns['column_id'],
'column_name':dummy_columns['column_name'],'column_type':dummy_columns['data_type']})
COLUMNS = pd.concat([COLUMNS,dummy_COLUMNS])
print('SUCCESS CONCATENATION for : ', row['name'])
except:
print('FAILED CONNECTION for SCHEMAS: ', row['name'])
return DATABASES, SCHEMAS,TABLES,COLUMNS
def merge_server_map(DATABASES, SCHEMAS,TABLES,COLUMNS):
db_sc = pd.merge(DATABASES,SCHEMAS)
db_sc_tb = pd.merge(db_sc,TABLES)
data = pd.merge(db_sc_tb,COLUMNS)
return data
CONN = {'server':'','port':'1433','user':'','password':''}
DATABASES, SCHEMAS,TABLES,COLUMNS = create_server_map(CONN)
data = merge_server_map(DATABASES, SCHEMAS,TABLES,COLUMNS)
# NEW VERSION
def create_server_map_new(CONN):
# create base connection to get all databases on server
con = pymssql.connect(
server=CONN['server'],
port=CONN['port'],
user=CONN['user'],
password=CONN['password'])
# const to store all databases on server
DATABASES = pd.read_sql(con=con,sql="SELECT name, database_id, create_date FROM sys.databases")
# const to store all columns and tables (included views) on server
MAP = pd.DataFrame({
'DATABASE':[],'SCHEMA':[],'TABLE_NAME':[],
'TABLE_TYPE':[],'COLUMN_NAME':[],'COLUMN_TYPE':[]
})
MAP_QUERY = """
select
c.TABLE_CATALOG, c.TABLE_SCHEMA, c.COLUMN_NAME, c.DATA_TYPE, t.TABLE_NAME, t.TABLE_TYPE
from INFORMATION_SCHEMA.COLUMNS c
left join INFORMATION_SCHEMA.TABLES t on c.TABLE_NAME = t.TABLE_NAME
"""
for index, row in DATABASES.iterrows():
try:
# CREATE CONNECTION
dummy_con = pymssql.connect(server=CONN['server'],port=CONN['port'],user=CONN['user'],password=CONN['password'],database=row['name'])
print('SUCCESS CONNECTION for SCHEMAS: ', row['name'])
# MAP
dummy_map = pd.read_sql(con=dummy_con, sql=MAP_QUERY)
dummy_MAP = pd.DataFrame({
'DATABASE':dummy_map['TABLE_CATALOG'],
'SCHEMA':dummy_map['TABLE_SCHEMA'],
'TABLE_NAME':dummy_map['TABLE_NAME'],
'TABLE_TYPE':dummy_map['TABLE_TYPE'],
'COLUMN_NAME':dummy_map['COLUMN_NAME'],
'COLUMN_TYPE':dummy_map['DATA_TYPE']
})
MAP=pd.concat([MAP,dummy_MAP])
print('SUCCESS CONCATENATION for : ', row['name'])
except:
print('FAILED CONNECTION for SCHEMAS: ', row['name'])
return MAP
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment