In [2]:
spark.conf.get("spark.sql.shuffle.partitions")

Waiting for a Spark session to start...

500

|Table Type|Partitioned|Has Sort Order|Output File Count|
|----------|-----------|--------------|-----------------|
|Hive|No||4|
|Hive|Yes||4 + 4|
|Iceberg|No|No|4|
|Iceberg|Yes|No|4 + 4|
|Iceberg|No|Yes|500|
|Iceberg|Yes|Yes|250 + 251|

In [3]:
val epoch = java.time.Instant.now().getEpochSecond()
val db = sys.env.getOrElse("BD_USER", "jzhuge")
val df = spark.range(100000).toDF("id").sort("id")

epoch = 1633558130
db = jzhuge
df = [id: bigint]


[id: bigint]

# Hive Table

In [4]:
val hive_table = s"${db}.wso_244_hive_${epoch}"
spark.sql(s"DROP TABLE IF EXISTS ${hive_table}")
spark.sql(s"CREATE TABLE ${hive_table}(id bigint) USING hive_parquet")
df.coalesce(4)
 .write
 .byName
 .mode("overwrite")
 .insertInto(hive_table)



hive_table = jzhuge.wso_244_hive_1633558130


jzhuge.wso_244_hive_1633558130

# Hive Table - Partitioned

In [5]:
val hive_part_table = s"${db}.wso_244_hive_part_${epoch}"
spark.sql(s"DROP TABLE IF EXISTS ${hive_part_table}")
spark.sql(s"CREATE TABLE ${hive_part_table}(id bigint, part bigint) USING hive_parquet PARTITIONED BY (part)")
df.withColumn("part", col("id") % 2)
 .coalesce(4)
 .write
 .byName
 .mode("overwrite")
 .insertInto(hive_part_table)



hive_part_table = jzhuge.wso_244_hive_part_1633558130


jzhuge.wso_244_hive_part_1633558130

# Iceberg Table - No Sort Order

In [6]:
val iceberg_no_sort_order_table = s"${db}.wso_244_nso_${epoch}"
spark.sql(s"DROP TABLE IF EXISTS ${iceberg_no_sort_order_table}")
spark.sql(s"CREATE TABLE ${iceberg_no_sort_order_table}(id bigint)")
df.coalesce(4)
 .write
 .byName
 .mode("overwrite")
 .insertInto(iceberg_no_sort_order_table)

[Stage 2:> (0 + 4) / 4]

iceberg_no_sort_order_table = jzhuge.wso_244_nso_1633558130


jzhuge.wso_244_nso_1633558130

# Iceberg Table - No Sort Order - Partitioned

In [7]:
val iceberg_no_sort_order_part_table = s"${db}.wso_244_nso_part_${epoch}"
spark.sql(s"DROP TABLE IF EXISTS ${iceberg_no_sort_order_part_table}")
spark.sql(s"CREATE TABLE ${iceberg_no_sort_order_part_table}(id bigint, part bigint) PARTITIONED BY (part)")
df.withColumn("part", col("id") % 2)
 .coalesce(4)
 .write
 .byName
 .mode("overwrite")
 .insertInto(iceberg_no_sort_order_part_table)

iceberg_no_sort_order_part_table = jzhuge.wso_244_nso_part_1633558130


jzhuge.wso_244_nso_part_1633558130

# Iceberg Table - Sort Order

In [8]:
val iceberg_sort_order_table = s"${db}.wso_244_so_${epoch}"
spark.sql(s"DROP TABLE IF EXISTS ${iceberg_sort_order_table}")
spark.sql(s"CREATE TABLE ${iceberg_sort_order_table}(id bigint)")
spark.sql(s"ALTER TABLE ${iceberg_sort_order_table} WRITE LOCALLY ORDERED BY (id)")
df.coalesce(4)
 .write
 .byName
 .mode("overwrite")
 .insertInto(iceberg_sort_order_table)



iceberg_sort_order_table = jzhuge.wso_244_so_1633558130


jzhuge.wso_244_so_1633558130

# Iceberg Table - Sort Order - Partitioned

In [9]:
val iceberg_sort_order_part_table = s"${db}.wso_244_so_part_${epoch}"
spark.sql(s"DROP TABLE IF EXISTS ${iceberg_sort_order_part_table}")
spark.sql(s"CREATE TABLE ${iceberg_sort_order_part_table}(id bigint, part bigint) PARTITIONED BY (part)")
spark.sql(s"ALTER TABLE ${iceberg_sort_order_part_table} WRITE LOCALLY ORDERED BY (part, id)")
df.withColumn("part", col("id") % 2)
 .coalesce(4)
 .write
 .byName
 .mode("overwrite")
 .insertInto(iceberg_sort_order_part_table)



iceberg_sort_order_part_table = jzhuge.wso_244_so_part_1633558130


jzhuge.wso_244_so_part_1633558130