Skip to content

Instantly share code, notes, and snippets.

@kulmam92
Created October 5, 2019 04:56
Show Gist options
  • Save kulmam92/f86e9840ef5c765e4f89a333f17cab97 to your computer and use it in GitHub Desktop.
Save kulmam92/f86e9840ef5c765e4f89a333f17cab97 to your computer and use it in GitHub Desktop.
Generate DDL including permission of objects in a given Snowflake database into separate files
import os
import shutil
import subprocess
import time
from datetime import datetime
import snowflake.connector
import logging
# Information for Snowflake
SNOWSQL_ACCOUNT = "XXXXXXXXX.east-us-2.azure"
SNOWSQL_USER = "XXXXXXXX"
SNOWSQL_PASSWORD = "XXXXXXX"
SNOWSQL_WAREHOUSE = "DEMO_WH"
SNOWSQL_SCHEMA = "PUBLIC"
SNOWSQL_ROLE = "SYSADMIN"
# Parameter
SNOWSQL_DATABASE = "DEMO_DB"
BASE_PATH = "/dev/snowflake"
OBJECT_LIST_QUERY ="""\
SELECT seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script
FROM (
select '01' seq, DATABASE_NAME catalog_name, '*' schema_name, 'DATABASE' object_type, DATABASE_NAME object_name, '' ARGUMENT_SIGNATURE
, 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
'DATABASE ' || D.DATABASE_NAME || ' ' ||
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';' script
from INFORMATION_SCHEMA.DATABASES D
WHERE DATABASE_NAME = '{}'
UNION ALL
select '02' seq, catalog_name, schema_name schema_name, 'SCHEMA' object_type, schema_name object_name, '' ARGUMENT_SIGNATURE
, 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
'SCHEMA ' || S.SCHEMA_NAME || ' ' ||
CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' ||
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';' script
from INFORMATION_SCHEMA.SCHEMATA S
UNION ALL
select '03' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'TABLE' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE != 'VIEW'
UNION ALL
select '04' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'VIEW' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.VIEWS
WHERE TABLE_SCHEMA != 'INFORMATION_SCHEMA'
UNION ALL
select '06' seq, SEQUENCE_catalog catalog_name, SEQUENCE_SCHEMA schema_name, 'SEQUENCE' object_type, SEQUENCE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.SEQUENCES
UNION ALL
select '07' seq, FILE_FORMAT_catalog catalog_name, FILE_FORMAT_SCHEMA schema_name, 'FILE_FORMAT' object_type, FILE_FORMAT_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.FILE_FORMATS
UNION ALL
select '08' seq, PIPE_catalog catalog_name, PIPE_SCHEMA schema_name, 'PIPE' object_type, PIPE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.PIPES
UNION ALL
select '09' seq, FUNCTION_catalog catalog_name, FUNCTION_SCHEMA schema_name, 'FUNCTION' object_type, FUNCTION_NAME object_name, ARGUMENT_SIGNATURE, '' script
from INFORMATION_SCHEMA.FUNCTIONS
UNION ALL
select '10' seq, PROCEDURE_catalog catalog_name, PROCEDURE_SCHEMA schema_name, 'PROCEDURE' object_type, PROCEDURE_NAME object_name, ARGUMENT_SIGNATURE, '' script
from "INFORMATION_SCHEMA"."PROCEDURES"
) T
ORDER BY seq, catalog_name, schema_name, object_name"""
DDL_DATABASE_QUERY = """\
SELECT 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
'DATABASE ' || D.DATABASE_NAME || ' ' ||
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';'
FROM INFORMATION_SCHEMA.DATABASES D
WHERE D.DATABASE_NAME = '{}'"""
DDL_SCHEMA_QUERY = """\
SELECT 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' ||
'SCHEMA ' || S.SCHEMA_NAME || ' ' ||
CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' ||
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) ||
CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';'
FROM INFORMATION_SCHEMA.SCHEMATA S"""
GET_DDL_QUERY = "SELECT GET_DDL('{}','{}') script"
# I wasn't able to find a good way to distinguish role and share from the GRANTEE column
# Therefore, I assumed that share will have postfix "_SHARE"
OBJECT_PERMISSION_QUERY = """\
SELECT 'GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME ||
' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
' ' || A.GRANTEE || ';' GRANTED
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
WHERE A.OBJECT_SCHEMA = '{}'
AND A.OBJECT_NAME = '{}'"""
ALL_PERMISSION_QUERY = """\
SELECT CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME OBJECT_NAME,
REPLACE(REPLACE(REPLACE(TO_VARCHAR(ARRAY_AGG('GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME ||
' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
' ' || A.GRANTEE || ';')),'["',''),'"]',''),';","',';\n') GRANTED
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
GROUP BY CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME"""
GET_DDL_WITH_PERMISSION_QUERY = """\
SELECT GET_DDL('{}','{}') script
UNION ALL
SELECT '\nGRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' ||
CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END ||
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME ||
' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END ||
' ' || A.GRANTEE || ';' GRANTED
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A
WHERE 'True' = '{}'
AND A.OBJECT_SCHEMA = '{}'
AND A.OBJECT_NAME = '{}'"""
# STREAM - SHOW - GET_DDL
STREAM_LIST_QUERY = "SHOW STREAMS IN DATABASE {}"
# TASK - SHOW
TASK_LIST_QUERY = "SHOW TASKS IN DATABASE {}"
""" 'CREATE OR REPLACE TASK ' || schema_name || '.' || name ||
'/n WAREHOUSE = ' || wareshouse ||
CASE WHEN schedule IS NULL THEN '' ELSE '/n SCHEDULE = ''' || schedule || '''' END ||
CASE WHEN predecessor IS NULL THEN '' ELSE ' AFTER ' || predecessor END ||
-- Can't find column for the session_parameters
CASE WHEN comment IS NULL THEN '' ELSE '\nCOMMENT = ''' || comment || '''' END || ';'
CASE WHEN condition IS NULL THEN '' ELSE '/n WHEN' || condition END ||
'/nAS' ||
'/n' || definition || ';'
"""
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
def ScriptOut(database, base_path, includePermission=True):
now = datetime.now()
ctx = snowflake.connector.connect(
account=SNOWSQL_ACCOUNT,
warehouse=SNOWSQL_WAREHOUSE,
database=database,
user=SNOWSQL_USER,
password=SNOWSQL_PASSWORD,
schema=SNOWSQL_SCHEMA,
role=SNOWSQL_ROLE
)
cs = ctx.cursor()
logging.info("Connected to Snowflake")
try:
# Change database
cs.execute("USE database {}".format(database))
# Get permission at once
if includePermission:
logging.info("Get permission at once")
permResults = cs.execute(ALL_PERMISSION_QUERY).fetchall()
perms = {}
for rec in permResults:
perms[rec[0]] = rec[1]
# Get object list using information_schema
results = cs.execute(OBJECT_LIST_QUERY.format(database)).fetchall()
ddlStmt = ""
for rec in results:
#seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script
logging.info('processing {}, {}'.format(rec[3], rec[4]))
if rec[3] == 'DATABASE':
outFile = os.path.join(BASE_PATH, rec[1], rec[4] + '.sql')
ddlStmt = rec[6]
if includePermission:
logging.info("get permission")
permStmt = perms.get(rec[4])
if permStmt:
logging.debug(permStmt)
ddlStmt += '\n\n' + permStmt
elif rec[3] == 'SCHEMA':
outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[4] + '.sql')
ddlStmt = rec[6]
# if includePermission:
# logging.info("get permission")
# rslt = cs.execute(OBJECT_TYPE_PERMISSION_QUERY.format(rec[3], rec[4])).fetchone() #object_type, object_name
# if rslt is not None:
# ddlStmt += '\n\n' + rslt[0]
if includePermission:
logging.info("get permission")
permStmt = perms.get(rec[1] + '.' + rec[4])
if permStmt:
logging.debug(permStmt)
ddlStmt += '\n\n' + permStmt
else:
if rec[3] in ('FUNCTION', 'PROCEDURE'):
#need to provide ARGUMENT_SIGNATURE only with data_type
argumentSignature = str(rec[5])
if argumentSignature != "()":
args = argumentSignature.replace("(","").replace(")","").split(",")
#print(args)
newargs = []
for arg in args:
newargs.append(arg.strip().split()[1])
argumentSignature = '(' + ', '.join(newargs) + ')'
# object_type, schema_name.object_name(ARGUMENT_SIGNATURE)
objectType = rec[3]
objectName = rec[2] + '.' + rec[4] + argumentSignature
outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[2] + '.' + rec[4] + '.sql')
else:
# object_type, schema_name.object_name
objectType = rec[3]
objectName = rec[2] + '.' + rec[4]
outFile = os.path.join(BASE_PATH, rec[1], rec[3], objectName + '.sql')
rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone()
ddlStmt = rslt[0]
if includePermission:
logging.info("get permission")
permStmt = perms.get(rec[1] + '.' + objectName)
if permStmt:
logging.debug(permStmt)
ddlStmt += '\n\n' + permStmt
# Write it to a file
logging.info("write to a file: {}".format(outFile))
os.makedirs(os.path.dirname(outFile), exist_ok=True)
logging.info("directory: {}".format(os.path.dirname(outFile)))
with open(outFile, "w") as f:
f.write(ddlStmt)
# STREAM - SHOW - GET_DDL
# created_on, name, database_name, schema_name
logging.info("stream")
stremResults = cs.execute(STREAM_LIST_QUERY.format(database)).fetchall()
for rec in stremResults:
objectType = 'STREAM'
objectName = rec[3] + '.' + rec[1]
outFile = os.path.join(BASE_PATH, rec[2], 'STREAM', objectName + '.sql')
rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone()
ddlStmt = rslt[0]
if includePermission:
logging.info("get permission")
permStmt = perms.get(rec[2] + '.' + objectName)
if permStmt:
logging.debug(permStmt)
ddlStmt += '\n\n' + permStmt
logging.info("write to a file")
os.makedirs(os.path.dirname(outFile), exist_ok=True)
with open(outFile, "w") as f:
f.write(ddlStmt)
# TASK - SHOW
# created_on,name,database_name,schema_name,owner,comment,warehouse,schedule,predecessor,state,definition,condition
logging.info("taskResults")
taskResults = cs.execute(TASK_LIST_QUERY.format(database)).fetchall()
for rec in taskResults:
logging.info('processing {}, {}'.format(rec[3], rec[1]))#schema_name, name
objectType = 'TASK'
objectName = rec[3] + '.' + rec[1]
ddlStmt = 'CREATE OR REPLACE TASK ' + rec[3] + '.' + rec[1] + '\n' + ' WAREHOUSE = ' + rec[6]
if rec[7] is not None: #schedule
ddlStmt += "\n SCHEDULE = '" + rec[7] + "'"
if rec[8] is not None: #predecessor
ddlStmt += "\n AFTER " + rec[8]
if rec[5] is not None: #comment
ddlStmt += "\n COMMENT = '" + rec[5] + "'"
if rec[11] is not None: #condition
ddlStmt += "\n WHEN " + rec[11]
ddlStmt += '\nAS\n' + rec[10] + ';' #definition
outFile = os.path.join(BASE_PATH, rec[2], objectType, objectName + '.sql')
if includePermission:
logging.info("get permission")
permStmt = perms.get(rec[2] + '.' + objectName)
if permStmt:
logging.debug(permStmt)
ddlStmt += '\n\n' + permStmt
logging.info("write to a file")
os.makedirs(os.path.dirname(outFile), exist_ok=True)
with open(outFile, "w") as f:
f.write(ddlStmt)
except snowflake.connector.errors.ProgrammingError as e:
# default error message
logging.error(e)
finally:
cs.close()
ctx.close()
def main():
start_time = time.time()
ScriptOut(SNOWSQL_DATABASE, BASE_PATH, True)
elapsed_time = time.time() - start_time
logging.info("Completed after: {}".format(str(elapsed_time)))
if __name__ == '__main__':
main()
@tommygeorge
Copy link

This is fantastic, by far the most comprehensive and logical DDL generator for Snowflake I've seen. I added logic to script permissions for Tables in a new folder called permissions under the table folder and a boolean flag you can set in parameters to turn each objects permissions on or off. I've also added parameters to include a list of schema's or exclude a list of parameters.

  1. Why? In my case Snowflake is not fully compatible with other tools and reverse engineering tables DDL fails a lot. As the data architect, this is one of my most common use cases. Quickly referencing many different objects for documentation and testing purposes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment