Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save MarkPryceMaherMSFT/350a4e73439dcbef7153bef69f9fc31a to your computer and use it in GitHub Desktop.
Save MarkPryceMaherMSFT/350a4e73439dcbef7153bef69f9fc31a to your computer and use it in GitHub Desktop.
script to generate different delta tables
def create_new_table(tablename, columnmapping,delete_vectors,partitions,columnmappingbyid,liquidclustering,optimize,vacuum,writeStatsAsStruct,writeStatsAsJson):
# What the function does goes here
# use the default sample table, publicholidays
tablename = "PARQUET_2_0"
if columnmapping==True and columnmappingbyid==False:
tablename=tablename+"_columnmapping_by_name"
if delete_vectors==True:
tablename=tablename+"_delete_vectors"
if partitions==True:
tablename=tablename+"_partitions"
if columnmapping==False and columnmappingbyid==True:
tablename=tablename+"_columnmapping_by_id"
if liquidclustering==True:
tablename=tablename+"_liquid_clustering"
if optimize==True:
tablename=tablename+"_optimize"
if liquidclustering==True:
tablename=tablename+"_vacuum"
if writeStatsAsStruct==True:
tablename=tablename+"_writeStatsAsStruct"
if writeStatsAsJson==False:
tablename=tablename+"_writeStatsAsJson"
#display(tablename)
spark.sql(f"drop table if exists {tablename}")
#Not allow to have these combinations
if columnmapping==True and columnmappingbyid==True:
return
if partitions==True and liquidclustering==True:
return
try:
tblprop = "'delta.minReaderVersion' = '2', 'delta.minWriterVersion' = '5' "
partitionstring=""
clusteringstring=""
if partitions==True:
partitionstring = " PARTITIONED BY (date) "
if liquidclustering==True:
clusteringstring = " CLUSTER BY (countryOrRegion) "
if columnmapping==True and columnmappingbyid==False:
tblprop = tblprop + ", 'delta.columnMapping.mode' = 'name' "
if columnmapping==False and columnmappingbyid==True:
tblprop = tblprop + ", 'delta.columnMapping.mode' = 'id' "
if delete_vectors==True:
tblprop = tblprop + ", 'delta.enableDeletionVectors' = 'true' "
if writeStatsAsStruct==True:
tblprop = tblprop + ", 'delta.checkpoint.writeStatsAsStruct' = 'true'"
if writeStatsAsJson==False:
stblprop = tblprop + ", 'delta.checkpoint.writeStatsAsJson' = 'false'"
sSql = f"create table {tablename} {partitionstring} {clusteringstring} TBLPROPERTIES ( {tblprop} ) as SELECT * FROM publicholidays WHERE countryOrRegion = 'Sweden' limit 10"
#display(sSql)
spark.sql(sSql)
if optimize==True:
spark.sql(f"OPTIMIZE {tablename};")
if vacuum==True:
spark.sql(f"VACUUM {tablename} ;")
except BaseException as exception:
display(tablename)
display(f"Exception Name: {type(exception).__name__}")
display(f"Exception Desc: {exception}")
#spark.sql(f"drop table if exists {tablename}")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Norway' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Colombia' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Japan' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Argentina' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Finland' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'United Kingdom' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Hungary' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Croatia' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Austria' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Austria' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Portugal' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'South Africa' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Poland' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Czech' limit 10")
spark.sql(f"insert into {tablename} SELECT * FROM publicholidays WHERE countryOrRegion = 'Denmark' limit 10")
spark.sql(f"delete from {tablename} WHERE countryOrRegion = 'Austria'")
spark.sql(f"delete from {tablename} WHERE countryOrRegion = 'Denmark'")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment