Created
October 31, 2024 17:39
-
-
Save MarkPryceMaherMSFT/350a4e73439dcbef7153bef69f9fc31a to your computer and use it in GitHub Desktop.
script to generate different delta tables
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def create_new_table(tablename, columnmapping,delete_vectors,partitions,columnmappingbyid,liquidclustering,optimize,vacuum,writeStatsAsStruct,writeStatsAsJson): | |
# What the function does goes here | |
# use the default sample table, publicholidays | |
tablename = "PARQUET_2_0" | |
if columnmapping==True and columnmappingbyid==False: | |
tablename=tablename+"_columnmapping_by_name" | |
if delete_vectors==True: | |
tablename=tablename+"_delete_vectors" | |
if partitions==True: | |
tablename=tablename+"_partitions" | |
if columnmapping==False and columnmappingbyid==True: | |
tablename=tablename+"_columnmapping_by_id" | |
if liquidclustering==True: | |
tablename=tablename+"_liquid_clustering" | |
if optimize==True: | |
tablename=tablename+"_optimize" | |
if liquidclustering==True: | |
tablename=tablename+"_vacuum" | |
if writeStatsAsStruct==True: | |
tablename=tablename+"_writeStatsAsStruct" | |
if writeStatsAsJson==False: | |
tablename=tablename+"_writeStatsAsJson" | |
#display(tablename) | |
spark.sql(f"drop table if exists {tablename}") | |
#Not allow to have these combinations | |
if columnmapping==True and columnmappingbyid==True: | |
return | |
if partitions==True and liquidclustering==True: | |
return | |
try: | |
tblprop = "'delta.minReaderVersion' = '2', 'delta.minWriterVersion' = '5' " | |
partitionstring="" | |
clusteringstring="" | |
if partitions==True: | |
partitionstring = " PARTITIONED BY (date) " | |
if liquidclustering==True: | |
clusteringstring = " CLUSTER BY (countryOrRegion) " | |
if columnmapping==True and columnmappingbyid==False: | |
tblprop = tblprop + ", 'delta.columnMapping.mode' = 'name' " | |
if columnmapping==False and columnmappingbyid==True: | |
tblprop = tblprop + ", 'delta.columnMapping.mode' = 'id' " | |
if delete_vectors==True: | |
tblprop = tblprop + ", 'delta.enableDeletionVectors' = 'true' " | |
if writeStatsAsStruct==True: | |
tblprop = tblprop + ", 'delta.checkpoint.writeStatsAsStruct' = 'true'" | |
if writeStatsAsJson==False: | |
stblprop = tblprop + ", 'delta.checkpoint.writeStatsAsJson' = 'false'" | |
sSql = f"create table {tablename} {partitionstring} {clusteringstring} TBLPROPERTIES ( {tblprop} ) as SELECT * FROM publicholidays WHERE countryOrRegion = 'Sweden' limit 10" | |
#display(sSql) | |
spark.sql(sSql) | |
if optimize==True: | |
spark.sql(f"OPTIMIZE {tablename};") | |
if vacuum==True: | |
spark.sql(f"VACUUM {tablename} ;") | |
except BaseException as exception: | |
display(tablename) | |
display(f"Exception Name: {type(exception).__name__}") | |
display(f"Exception Desc: {exception}") | |
#spark.sql(f"drop table if exists {tablename}") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Norway' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Colombia' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Japan' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Argentina' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Finland' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'United Kingdom' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Hungary' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Croatia' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Austria' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Austria' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Portugal' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'South Africa' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Poland' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Czech' limit 10") | |
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Denmark' limit 10") | |
spark.sql(f"delete from {tablename} WHERE countryOrRegion = 'Austria'") | |
spark.sql(f"delete from {tablename} WHERE countryOrRegion = 'Denmark'") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment