Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created September 29, 2021 12:15
Show Gist options
  • Save nsivabalan/23caa2f57c41bc9356ed7fa29590c147 to your computer and use it in GitHub Desktop.
Save nsivabalan/23caa2f57c41bc9356ed7fa29590c147 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.SaveMode._
val df = Seq(
(1, "key1", "abc"),
(1, "key1", "def"),
(2, "key2", "ghi"),
(2, "key3", "jkl")
).toDF("typeId","recordKey", "str")
df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
option("hoodie.datasource.write.precombine.field", "typeId").
option("hoodie.datasource.write.partitionpath.field", "typeId").
option("hoodie.datasource.write.recordkey.field", "recordKey").
option("hoodie.datasource.write.operation","bulk_insert").
option("hoodie.table.name", "hudi_tbl").
option("hoodie.datasource.write.insert.drop.duplicates","true").
option("hoodie.datasource.write.row.writer.enable","false").
mode(Overwrite).
save("/tmp/hudi_tbl_trial/")
var hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/")
hudiDF.createOrReplaceTempView("hudi_sql_tbl")
spark.sql("select typeId, recordKey, str from hudi_sql_tbl").show(false)
// Output
+------+---------+---+
|typeId|recordKey|str|
+------+---------+---+
|2 |key3 |jkl|
|1 |key1 |def|
|2 |key2 |ghi|
+------+---------+---+
val df1 = Seq(
(2, "key2", "stu"),
(2, "key4", "xyz"),
(2, "key4", "mno"),
(3, "key5", "pqr")
).toDF("typeId","recordKey", "str")
df1.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism", "2").
option("hoodie.bulkinsert.shuffle.parallelism", "2").
option("hoodie.datasource.write.precombine.field", "typeId").
option("hoodie.datasource.write.partitionpath.field", "typeId").
option("hoodie.datasource.write.recordkey.field", "recordKey").
option("hoodie.combine.before.insert","true").
option("hoodie.combine.before.upsert","true").
option("hoodie.datasource.write.operation","insert_overwrite").
option("hoodie.table.name", "hudi_tbl").
mode(Append).
save("/tmp/hudi_tbl_trial/")
var hudiDF1 = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/")
hudiDF1.createOrReplaceTempView("hudi_sql_tbl1")
spark.sql("select typeId, recordKey, str from hudi_sql_tbl1").show(false)
// Output
+------+---------+---+
|typeId|recordKey|str|
+------+---------+---+
|2 |key2 |stu|
|2 |key4 |mno|
|3 |key5 |pqr|
|1 |key1 |def|
+------+---------+---+
2,key4 had 2 entires in coming batch. and so it got deduped.
existing records in partition 2 got overridden.
1 was never over ridden. and 3 is a new partition.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment