Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active May 14, 2021 16:10
Show Gist options
  • Save nsivabalan/138781b76cb6e7cd97790e10b2640b29 to your computer and use it in GitHub Desktop.
Save nsivabalan/138781b76cb6e7cd97790e10b2640b29 to your computer and use it in GitHub Desktop.
//output after 1st compaction( i.e. after first 2 delta commits)
scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
+------+-------------------+-------+
|typeId| eventTime|preComb|
+------+-------------------+-------+
| 1| null| 2|
| 2|2021-12-29 09:54:00| 2|
| 3|2021-12-30 09:54:00| 2|
| 4| null| 2|
| 5|2016-12-29 09:54:00| 1|
| 6| null| 1|
| 7|2020-12-29 09:54:00| 2|
| 8| null| 2|
+------+-------------------+-------+
// after 1 ingestion, and ensuring I had parquet and log file
scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
+------+-------------------+-------+
|typeId| eventTime|preComb|
+------+-------------------+-------+
| 1| null| 2|
| 2|2021-12-29 09:54:00| 2|
| 3| null| 4|
| 4| null| 2|
| 5|2016-12-29 09:54:00| 1|
| 6| null| 1|
| 7|2020-12-29 09:54:00| 2|
| 8|2024-12-29 09:54:00| 4|
| 9| null| 4|
+------+-------------------+-------+
// after 2 more ingestion and 2nd time compaction got triggered.
scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
+------+-------------------+-------+
|typeId| eventTime|preComb|
+------+-------------------+-------+
| 1| null| 2|
| 2|2021-12-29 09:54:00| 2|
| 3| null| 4|
| 4| null| 4|
| 5| null| 4|
| 6| null| 1|
| 7|2020-12-29 09:54:00| 2|
| 8|2024-12-29 09:54:00| 4|
| 9| null| 4|
| 11|2024-12-29 09:54:00| 4|
| 15| null| 4|
| 10|2024-12-29 09:54:00| 4|
+------+-------------------+-------+
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
import java.sql.Timestamp
import spark.implicits._
val df = Seq(
(1, Timestamp.valueOf("2014-01-01 23:00:01"),0L),
(2, Timestamp.valueOf("2016-12-29 09:54:00"), 1L),
(3, null, 1L),
(4, null, 1L),
(5, Timestamp.valueOf("2016-12-29 09:54:00"), 1L),
(6, null, 1L)
). toDF("typeId","eventTime","preComb")
df.schema
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "insert").
option("hoodie.compact.inline","true").
option("hoodie.keep.max.commits","3").
option("hoodie.cleaner.commits.retained","1").
option("hoodie.compact.inline.max.delta.commits","2").
option("hoodie.keep.min.commits","2").
mode(Overwrite).
save(basePath)
var tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("describe hudi_trips_snapshot").show()
spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
var df1 = Seq(
(1, null,2L),
(2, Timestamp.valueOf("2021-12-29 09:54:00"), 2L),
(3, Timestamp.valueOf("2021-12-30 09:54:00"), 2L),
(4, null, 2L),
(7, Timestamp.valueOf("2020-12-29 09:54:00"), 2L),
(8, null, 2L)
). toDF("typeId","eventTime","preComb")
df1.schema
df1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true").
option("hoodie.keep.max.commits","3").
option("hoodie.compact.inline.max.delta.commits","2").
option("hoodie.cleaner.commits.retained","1").
option("hoodie.keep.min.commits","2").
mode(Append).
save(basePath)
// ingested df1 once again and compaction triggered. now I just see just parquet.
var df2 = Seq(
(3, null, 4L),
(8, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(9, null, 4L),
(10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L)
). toDF("typeId","eventTime","preComb")
df2.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true").
option("hoodie.keep.max.commits","3").
option("hoodie.compact.inline.max.delta.commits","2").
option("hoodie.cleaner.commits.retained","1").
option("hoodie.keep.min.commits","2").
mode(Append).
save(basePath)
// now, 1 parquet + 1 log file. read the table to ensure I do see nulls for corres records
var df3 = Seq(
(4, null, 4L),
(10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(5, null, 4L),
(11, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(15, null, 4L)
). toDF("typeId","eventTime","preComb")
df3.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true").
option("hoodie.keep.max.commits","3").
option("hoodie.compact.inline.max.delta.commits","2").
option("hoodie.cleaner.commits.retained","1").
option("hoodie.keep.min.commits","2").
mode(Append).
save(basePath)
// ingested df3 again to ensure compaction got triggered.
// again read to ensure corres records have null for timestamp col.
@nsivabalan
Copy link
Author

my final output

 spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
+------+-------------------+-------+
|typeId|          eventTime|preComb|
+------+-------------------+-------+
|     1|               null|      2|
|     2|2021-12-29 09:54:00|      2|
|     3|               null|      4|
|     4|               null|      4|
|     5|               null|      4|
|     6|               null|      1|
|     7|2020-12-29 09:54:00|      2|
|     8|2024-12-29 09:57:00|      4|
|     9|               null|      4|
|    10|2024-12-29 09:55:00|      4|
|    11|2024-12-29 09:54:00|      4|
|    15|               null|      4|
+------+-------------------+-------+

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment