Created
September 29, 2021 12:15
-
-
Save nsivabalan/23caa2f57c41bc9356ed7fa29590c147 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SaveMode._ | |
val df = Seq( | |
(1, "key1", "abc"), | |
(1, "key1", "def"), | |
(2, "key2", "ghi"), | |
(2, "key3", "jkl") | |
).toDF("typeId","recordKey", "str") | |
df.write.format("hudi"). | |
option("hoodie.insert.shuffle.parallelism", "2"). | |
option("hoodie.upsert.shuffle.parallelism", "2"). | |
option("hoodie.bulkinsert.shuffle.parallelism", "2"). | |
option("hoodie.datasource.write.precombine.field", "typeId"). | |
option("hoodie.datasource.write.partitionpath.field", "typeId"). | |
option("hoodie.datasource.write.recordkey.field", "recordKey"). | |
option("hoodie.datasource.write.operation","bulk_insert"). | |
option("hoodie.table.name", "hudi_tbl"). | |
option("hoodie.datasource.write.insert.drop.duplicates","true"). | |
option("hoodie.datasource.write.row.writer.enable","false"). | |
mode(Overwrite). | |
save("/tmp/hudi_tbl_trial/") | |
var hudiDF = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/") | |
hudiDF.createOrReplaceTempView("hudi_sql_tbl") | |
spark.sql("select typeId, recordKey, str from hudi_sql_tbl").show(false) | |
// Output | |
+------+---------+---+ | |
|typeId|recordKey|str| | |
+------+---------+---+ | |
|2 |key3 |jkl| | |
|1 |key1 |def| | |
|2 |key2 |ghi| | |
+------+---------+---+ | |
val df1 = Seq( | |
(2, "key2", "stu"), | |
(2, "key4", "xyz"), | |
(2, "key4", "mno"), | |
(3, "key5", "pqr") | |
).toDF("typeId","recordKey", "str") | |
df1.write.format("hudi"). | |
option("hoodie.insert.shuffle.parallelism", "2"). | |
option("hoodie.upsert.shuffle.parallelism", "2"). | |
option("hoodie.bulkinsert.shuffle.parallelism", "2"). | |
option("hoodie.datasource.write.precombine.field", "typeId"). | |
option("hoodie.datasource.write.partitionpath.field", "typeId"). | |
option("hoodie.datasource.write.recordkey.field", "recordKey"). | |
option("hoodie.combine.before.insert","true"). | |
option("hoodie.combine.before.upsert","true"). | |
option("hoodie.datasource.write.operation","insert_overwrite"). | |
option("hoodie.table.name", "hudi_tbl"). | |
mode(Append). | |
save("/tmp/hudi_tbl_trial/") | |
var hudiDF1 = spark.read.format("hudi").load("/tmp/hudi_tbl_trial/") | |
hudiDF1.createOrReplaceTempView("hudi_sql_tbl1") | |
spark.sql("select typeId, recordKey, str from hudi_sql_tbl1").show(false) | |
// Output | |
+------+---------+---+ | |
|typeId|recordKey|str| | |
+------+---------+---+ | |
|2 |key2 |stu| | |
|2 |key4 |mno| | |
|3 |key5 |pqr| | |
|1 |key1 |def| | |
+------+---------+---+ | |
2,key4 had 2 entires in coming batch. and so it got deduped. | |
existing records in partition 2 got overridden. | |
1 was never over ridden. and 3 is a new partition. | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment