-
-
Save DamolAAkinleye/e7a5bac4aca6dd2ae6bb0adf841658c4 to your computer and use it in GitHub Desktop.
Shallow clone all tables to another schema in a Databricks environment
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Use-case: automate clones in Databricks for dev testing similar to Snowflake's zero-copy clone. | |
This doesn't use the Catalog API as that may rely on cred passthrough | |
- https://docs.microsoft.com/en-us/azure/databricks/security/credential-passthrough/adls-passthrough | |
`py4j.security.Py4JSecurityException: method ... is not whitelisted on class org.apache.spark.sql.catalog.Table` | |
""" | |
prefixes = ["layer_a", "layer_b"] # prefix matching on schemas to copy over for layers | |
def get_git_branch() -> str: | |
"""Generated for this example""" | |
import uuid; return uuid.uuid4().hex.upper()[0:7] # would come from git | |
def copy_template(source: str, dest: str) -> str: | |
return f"CREATE TABLE IF NOT EXISTS {dest} SHALLOW CLONE {source};" | |
# `show tables` doesn't show all tables in all schemas, at least for me, hence the need to `show databases` first | |
schemas = [row["databaseName"] for row in spark.sql("show databases").collect()] | |
schemas_to_copy = list(filter(lambda schema: schema.lower().startswith(tuple(prefixes)), schemas)) | |
git_branch = get_git_branch() | |
for schema in schemas_to_copy: | |
rows = spark.sql(f"show tables in {schema}").collect() | |
mappings = [(f"{schema}.{row['tableName']}", f"{git_branch}_{schema}.{row['tableName']}") for row in rows] | |
copy_statements = [copy_template(source, dest) for (source, dest) in mappings] | |
for s in copy_statements: | |
print(s) # or `spark.sql(s)` to execute | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment