Created
October 5, 2019 04:56
-
-
Save kulmam92/f86e9840ef5c765e4f89a333f17cab97 to your computer and use it in GitHub Desktop.
Generate DDL including permission of objects in a given Snowflake database into separate files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
import subprocess | |
import time | |
from datetime import datetime | |
import snowflake.connector | |
import logging | |
# Information for Snowflake | |
SNOWSQL_ACCOUNT = "XXXXXXXXX.east-us-2.azure" | |
SNOWSQL_USER = "XXXXXXXX" | |
SNOWSQL_PASSWORD = "XXXXXXX" | |
SNOWSQL_WAREHOUSE = "DEMO_WH" | |
SNOWSQL_SCHEMA = "PUBLIC" | |
SNOWSQL_ROLE = "SYSADMIN" | |
# Parameter | |
SNOWSQL_DATABASE = "DEMO_DB" | |
BASE_PATH = "/dev/snowflake" | |
OBJECT_LIST_QUERY ="""\ | |
SELECT seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script | |
FROM ( | |
select '01' seq, DATABASE_NAME catalog_name, '*' schema_name, 'DATABASE' object_type, DATABASE_NAME object_name, '' ARGUMENT_SIGNATURE | |
, 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || | |
'DATABASE ' || D.DATABASE_NAME || ' ' || | |
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || | |
CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';' script | |
from INFORMATION_SCHEMA.DATABASES D | |
WHERE DATABASE_NAME = '{}' | |
UNION ALL | |
select '02' seq, catalog_name, schema_name schema_name, 'SCHEMA' object_type, schema_name object_name, '' ARGUMENT_SIGNATURE | |
, 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || | |
'SCHEMA ' || S.SCHEMA_NAME || ' ' || | |
CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' || | |
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || | |
CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';' script | |
from INFORMATION_SCHEMA.SCHEMATA S | |
UNION ALL | |
select '03' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'TABLE' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script | |
from INFORMATION_SCHEMA.TABLES | |
WHERE TABLE_TYPE != 'VIEW' | |
UNION ALL | |
select '04' seq, table_catalog catalog_name, TABLE_SCHEMA schema_name, 'VIEW' object_type, TABLE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script | |
from INFORMATION_SCHEMA.VIEWS | |
WHERE TABLE_SCHEMA != 'INFORMATION_SCHEMA' | |
UNION ALL | |
select '06' seq, SEQUENCE_catalog catalog_name, SEQUENCE_SCHEMA schema_name, 'SEQUENCE' object_type, SEQUENCE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script | |
from INFORMATION_SCHEMA.SEQUENCES | |
UNION ALL | |
select '07' seq, FILE_FORMAT_catalog catalog_name, FILE_FORMAT_SCHEMA schema_name, 'FILE_FORMAT' object_type, FILE_FORMAT_NAME object_name, '' ARGUMENT_SIGNATURE, '' script | |
from INFORMATION_SCHEMA.FILE_FORMATS | |
UNION ALL | |
select '08' seq, PIPE_catalog catalog_name, PIPE_SCHEMA schema_name, 'PIPE' object_type, PIPE_NAME object_name, '' ARGUMENT_SIGNATURE, '' script | |
from INFORMATION_SCHEMA.PIPES | |
UNION ALL | |
select '09' seq, FUNCTION_catalog catalog_name, FUNCTION_SCHEMA schema_name, 'FUNCTION' object_type, FUNCTION_NAME object_name, ARGUMENT_SIGNATURE, '' script | |
from INFORMATION_SCHEMA.FUNCTIONS | |
UNION ALL | |
select '10' seq, PROCEDURE_catalog catalog_name, PROCEDURE_SCHEMA schema_name, 'PROCEDURE' object_type, PROCEDURE_NAME object_name, ARGUMENT_SIGNATURE, '' script | |
from "INFORMATION_SCHEMA"."PROCEDURES" | |
) T | |
ORDER BY seq, catalog_name, schema_name, object_name""" | |
DDL_DATABASE_QUERY = """\ | |
SELECT 'CREATE OR REPLACE' || CASE WHEN D.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || | |
'DATABASE ' || D.DATABASE_NAME || ' ' || | |
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || | |
CASE WHEN D.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || D.COMMENT || '''' END || ';' | |
FROM INFORMATION_SCHEMA.DATABASES D | |
WHERE D.DATABASE_NAME = '{}'""" | |
DDL_SCHEMA_QUERY = """\ | |
SELECT 'CREATE OR REPLACE' || CASE WHEN S.IS_TRANSIENT = 'YES' THEN ' TRANSIENT' ELSE '' END || ' ' || | |
'SCHEMA ' || S.SCHEMA_NAME || ' ' || | |
CASE WHEN S.IS_MANAGED_ACCESS = 'YES' THEN '\nWITH MANAGED ACCESS' ELSE '' END || ' ' || | |
'\nDATA_RETENTION_TIME_IN_DAYS = ' || TO_VARCHAR(RETENTION_TIME) || | |
CASE WHEN S.COMMENT IS NULL THEN '' ELSE '\nCOMMENT = ''' || S.COMMENT || '''' END || ';' | |
FROM INFORMATION_SCHEMA.SCHEMATA S""" | |
GET_DDL_QUERY = "SELECT GET_DDL('{}','{}') script" | |
# I wasn't able to find a good way to distinguish role and share from the GRANTEE column | |
# Therefore, I assumed that share will have postfix "_SHARE" | |
OBJECT_PERMISSION_QUERY = """\ | |
SELECT 'GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' || | |
CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || | |
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME || | |
' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END || | |
' ' || A.GRANTEE || ';' GRANTED | |
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A | |
WHERE A.OBJECT_SCHEMA = '{}' | |
AND A.OBJECT_NAME = '{}'""" | |
ALL_PERMISSION_QUERY = """\ | |
SELECT CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || | |
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME OBJECT_NAME, | |
REPLACE(REPLACE(REPLACE(TO_VARCHAR(ARRAY_AGG('GRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' || | |
CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || | |
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME || | |
' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END || | |
' ' || A.GRANTEE || ';')),'["',''),'"]',''),';","',';\n') GRANTED | |
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A | |
GROUP BY CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || | |
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME""" | |
GET_DDL_WITH_PERMISSION_QUERY = """\ | |
SELECT GET_DDL('{}','{}') script | |
UNION ALL | |
SELECT '\nGRANT ' || A.PRIVILEGE_TYPE || ' ON ' || A.OBJECT_TYPE || ' ' || | |
CASE WHEN A.OBJECT_CATALOG IS NULL THEN '' ELSE A.OBJECT_CATALOG || '.' END || | |
CASE WHEN A.OBJECT_SCHEMA IS NULL THEN '' ELSE A.OBJECT_SCHEMA || '.' END || A.OBJECT_NAME || | |
' TO ' || CASE WHEN UPPER(GRANTEE) LIKE '%_SHARE' THEN 'SHARE' ELSE 'ROLE' END || | |
' ' || A.GRANTEE || ';' GRANTED | |
FROM INFORMATION_SCHEMA.OBJECT_PRIVILEGES A | |
WHERE 'True' = '{}' | |
AND A.OBJECT_SCHEMA = '{}' | |
AND A.OBJECT_NAME = '{}'""" | |
# STREAM - SHOW - GET_DDL | |
STREAM_LIST_QUERY = "SHOW STREAMS IN DATABASE {}" | |
# TASK - SHOW | |
TASK_LIST_QUERY = "SHOW TASKS IN DATABASE {}" | |
""" 'CREATE OR REPLACE TASK ' || schema_name || '.' || name || | |
'/n WAREHOUSE = ' || wareshouse || | |
CASE WHEN schedule IS NULL THEN '' ELSE '/n SCHEDULE = ''' || schedule || '''' END || | |
CASE WHEN predecessor IS NULL THEN '' ELSE ' AFTER ' || predecessor END || | |
-- Can't find column for the session_parameters | |
CASE WHEN comment IS NULL THEN '' ELSE '\nCOMMENT = ''' || comment || '''' END || ';' | |
CASE WHEN condition IS NULL THEN '' ELSE '/n WHEN' || condition END || | |
'/nAS' || | |
'/n' || definition || ';' | |
""" | |
logging.basicConfig( | |
format='%(asctime)s %(levelname)-8s %(message)s', | |
level=logging.INFO, | |
datefmt='%Y-%m-%d %H:%M:%S') | |
def ScriptOut(database, base_path, includePermission=True): | |
now = datetime.now() | |
ctx = snowflake.connector.connect( | |
account=SNOWSQL_ACCOUNT, | |
warehouse=SNOWSQL_WAREHOUSE, | |
database=database, | |
user=SNOWSQL_USER, | |
password=SNOWSQL_PASSWORD, | |
schema=SNOWSQL_SCHEMA, | |
role=SNOWSQL_ROLE | |
) | |
cs = ctx.cursor() | |
logging.info("Connected to Snowflake") | |
try: | |
# Change database | |
cs.execute("USE database {}".format(database)) | |
# Get permission at once | |
if includePermission: | |
logging.info("Get permission at once") | |
permResults = cs.execute(ALL_PERMISSION_QUERY).fetchall() | |
perms = {} | |
for rec in permResults: | |
perms[rec[0]] = rec[1] | |
# Get object list using information_schema | |
results = cs.execute(OBJECT_LIST_QUERY.format(database)).fetchall() | |
ddlStmt = "" | |
for rec in results: | |
#seq, catalog_name, schema_name, object_type, object_name, ARGUMENT_SIGNATURE, script | |
logging.info('processing {}, {}'.format(rec[3], rec[4])) | |
if rec[3] == 'DATABASE': | |
outFile = os.path.join(BASE_PATH, rec[1], rec[4] + '.sql') | |
ddlStmt = rec[6] | |
if includePermission: | |
logging.info("get permission") | |
permStmt = perms.get(rec[4]) | |
if permStmt: | |
logging.debug(permStmt) | |
ddlStmt += '\n\n' + permStmt | |
elif rec[3] == 'SCHEMA': | |
outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[4] + '.sql') | |
ddlStmt = rec[6] | |
# if includePermission: | |
# logging.info("get permission") | |
# rslt = cs.execute(OBJECT_TYPE_PERMISSION_QUERY.format(rec[3], rec[4])).fetchone() #object_type, object_name | |
# if rslt is not None: | |
# ddlStmt += '\n\n' + rslt[0] | |
if includePermission: | |
logging.info("get permission") | |
permStmt = perms.get(rec[1] + '.' + rec[4]) | |
if permStmt: | |
logging.debug(permStmt) | |
ddlStmt += '\n\n' + permStmt | |
else: | |
if rec[3] in ('FUNCTION', 'PROCEDURE'): | |
#need to provide ARGUMENT_SIGNATURE only with data_type | |
argumentSignature = str(rec[5]) | |
if argumentSignature != "()": | |
args = argumentSignature.replace("(","").replace(")","").split(",") | |
#print(args) | |
newargs = [] | |
for arg in args: | |
newargs.append(arg.strip().split()[1]) | |
argumentSignature = '(' + ', '.join(newargs) + ')' | |
# object_type, schema_name.object_name(ARGUMENT_SIGNATURE) | |
objectType = rec[3] | |
objectName = rec[2] + '.' + rec[4] + argumentSignature | |
outFile = os.path.join(BASE_PATH, rec[1], rec[3], rec[2] + '.' + rec[4] + '.sql') | |
else: | |
# object_type, schema_name.object_name | |
objectType = rec[3] | |
objectName = rec[2] + '.' + rec[4] | |
outFile = os.path.join(BASE_PATH, rec[1], rec[3], objectName + '.sql') | |
rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone() | |
ddlStmt = rslt[0] | |
if includePermission: | |
logging.info("get permission") | |
permStmt = perms.get(rec[1] + '.' + objectName) | |
if permStmt: | |
logging.debug(permStmt) | |
ddlStmt += '\n\n' + permStmt | |
# Write it to a file | |
logging.info("write to a file: {}".format(outFile)) | |
os.makedirs(os.path.dirname(outFile), exist_ok=True) | |
logging.info("directory: {}".format(os.path.dirname(outFile))) | |
with open(outFile, "w") as f: | |
f.write(ddlStmt) | |
# STREAM - SHOW - GET_DDL | |
# created_on, name, database_name, schema_name | |
logging.info("stream") | |
stremResults = cs.execute(STREAM_LIST_QUERY.format(database)).fetchall() | |
for rec in stremResults: | |
objectType = 'STREAM' | |
objectName = rec[3] + '.' + rec[1] | |
outFile = os.path.join(BASE_PATH, rec[2], 'STREAM', objectName + '.sql') | |
rslt = cs.execute(GET_DDL_QUERY.format(objectType, objectName)).fetchone() | |
ddlStmt = rslt[0] | |
if includePermission: | |
logging.info("get permission") | |
permStmt = perms.get(rec[2] + '.' + objectName) | |
if permStmt: | |
logging.debug(permStmt) | |
ddlStmt += '\n\n' + permStmt | |
logging.info("write to a file") | |
os.makedirs(os.path.dirname(outFile), exist_ok=True) | |
with open(outFile, "w") as f: | |
f.write(ddlStmt) | |
# TASK - SHOW | |
# created_on,name,database_name,schema_name,owner,comment,warehouse,schedule,predecessor,state,definition,condition | |
logging.info("taskResults") | |
taskResults = cs.execute(TASK_LIST_QUERY.format(database)).fetchall() | |
for rec in taskResults: | |
logging.info('processing {}, {}'.format(rec[3], rec[1]))#schema_name, name | |
objectType = 'TASK' | |
objectName = rec[3] + '.' + rec[1] | |
ddlStmt = 'CREATE OR REPLACE TASK ' + rec[3] + '.' + rec[1] + '\n' + ' WAREHOUSE = ' + rec[6] | |
if rec[7] is not None: #schedule | |
ddlStmt += "\n SCHEDULE = '" + rec[7] + "'" | |
if rec[8] is not None: #predecessor | |
ddlStmt += "\n AFTER " + rec[8] | |
if rec[5] is not None: #comment | |
ddlStmt += "\n COMMENT = '" + rec[5] + "'" | |
if rec[11] is not None: #condition | |
ddlStmt += "\n WHEN " + rec[11] | |
ddlStmt += '\nAS\n' + rec[10] + ';' #definition | |
outFile = os.path.join(BASE_PATH, rec[2], objectType, objectName + '.sql') | |
if includePermission: | |
logging.info("get permission") | |
permStmt = perms.get(rec[2] + '.' + objectName) | |
if permStmt: | |
logging.debug(permStmt) | |
ddlStmt += '\n\n' + permStmt | |
logging.info("write to a file") | |
os.makedirs(os.path.dirname(outFile), exist_ok=True) | |
with open(outFile, "w") as f: | |
f.write(ddlStmt) | |
except snowflake.connector.errors.ProgrammingError as e: | |
# default error message | |
logging.error(e) | |
finally: | |
cs.close() | |
ctx.close() | |
def main(): | |
start_time = time.time() | |
ScriptOut(SNOWSQL_DATABASE, BASE_PATH, True) | |
elapsed_time = time.time() - start_time | |
logging.info("Completed after: {}".format(str(elapsed_time))) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is fantastic, by far the most comprehensive and logical DDL generator for Snowflake I've seen. I added logic to script permissions for Tables in a new folder called permissions under the table folder and a boolean flag you can set in parameters to turn each objects permissions on or off. I've also added parameters to include a list of schema's or exclude a list of parameters.