Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active May 14, 2021 16:10
Show Gist options
  • Save nsivabalan/138781b76cb6e7cd97790e10b2640b29 to your computer and use it in GitHub Desktop.
Save nsivabalan/138781b76cb6e7cd97790e10b2640b29 to your computer and use it in GitHub Desktop.
//output after 1st compaction( i.e. after first 2 delta commits)
scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
+------+-------------------+-------+
|typeId| eventTime|preComb|
+------+-------------------+-------+
| 1| null| 2|
| 2|2021-12-29 09:54:00| 2|
| 3|2021-12-30 09:54:00| 2|
| 4| null| 2|
| 5|2016-12-29 09:54:00| 1|
| 6| null| 1|
| 7|2020-12-29 09:54:00| 2|
| 8| null| 2|
+------+-------------------+-------+
// after 1 ingestion, and ensuring I had parquet and log file
scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
+------+-------------------+-------+
|typeId| eventTime|preComb|
+------+-------------------+-------+
| 1| null| 2|
| 2|2021-12-29 09:54:00| 2|
| 3| null| 4|
| 4| null| 2|
| 5|2016-12-29 09:54:00| 1|
| 6| null| 1|
| 7|2020-12-29 09:54:00| 2|
| 8|2024-12-29 09:54:00| 4|
| 9| null| 4|
+------+-------------------+-------+
// after 2 more ingestion and 2nd time compaction got triggered.
scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
+------+-------------------+-------+
|typeId| eventTime|preComb|
+------+-------------------+-------+
| 1| null| 2|
| 2|2021-12-29 09:54:00| 2|
| 3| null| 4|
| 4| null| 4|
| 5| null| 4|
| 6| null| 1|
| 7|2020-12-29 09:54:00| 2|
| 8|2024-12-29 09:54:00| 4|
| 9| null| 4|
| 11|2024-12-29 09:54:00| 4|
| 15| null| 4|
| 10|2024-12-29 09:54:00| 4|
+------+-------------------+-------+
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
import java.sql.Timestamp
import spark.implicits._
val df = Seq(
(1, Timestamp.valueOf("2014-01-01 23:00:01"),0L),
(2, Timestamp.valueOf("2016-12-29 09:54:00"), 1L),
(3, null, 1L),
(4, null, 1L),
(5, Timestamp.valueOf("2016-12-29 09:54:00"), 1L),
(6, null, 1L)
). toDF("typeId","eventTime","preComb")
df.schema
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "insert").
option("hoodie.compact.inline","true").
option("hoodie.keep.max.commits","3").
option("hoodie.cleaner.commits.retained","1").
option("hoodie.compact.inline.max.delta.commits","2").
option("hoodie.keep.min.commits","2").
mode(Overwrite).
save(basePath)
var tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("describe hudi_trips_snapshot").show()
spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
var df1 = Seq(
(1, null,2L),
(2, Timestamp.valueOf("2021-12-29 09:54:00"), 2L),
(3, Timestamp.valueOf("2021-12-30 09:54:00"), 2L),
(4, null, 2L),
(7, Timestamp.valueOf("2020-12-29 09:54:00"), 2L),
(8, null, 2L)
). toDF("typeId","eventTime","preComb")
df1.schema
df1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true").
option("hoodie.keep.max.commits","3").
option("hoodie.compact.inline.max.delta.commits","2").
option("hoodie.cleaner.commits.retained","1").
option("hoodie.keep.min.commits","2").
mode(Append).
save(basePath)
// ingested df1 once again and compaction triggered. now I just see just parquet.
var df2 = Seq(
(3, null, 4L),
(8, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(9, null, 4L),
(10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L)
). toDF("typeId","eventTime","preComb")
df2.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true").
option("hoodie.keep.max.commits","3").
option("hoodie.compact.inline.max.delta.commits","2").
option("hoodie.cleaner.commits.retained","1").
option("hoodie.keep.min.commits","2").
mode(Append).
save(basePath)
// now, 1 parquet + 1 log file. read the table to ensure I do see nulls for corres records
var df3 = Seq(
(4, null, 4L),
(10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(5, null, 4L),
(11, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(15, null, 4L)
). toDF("typeId","eventTime","preComb")
df3.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true").
option("hoodie.keep.max.commits","3").
option("hoodie.compact.inline.max.delta.commits","2").
option("hoodie.cleaner.commits.retained","1").
option("hoodie.keep.min.commits","2").
mode(Append).
save(basePath)
// ingested df3 again to ensure compaction got triggered.
// again read to ensure corres records have null for timestamp col.
@eldhosepaul7
Copy link

//here is what I did and was able to reproduce the issue with spark 2.4 Hudi version 0.7.0

//spark-shell command

 spark-shell --master local --jars hudi-spark-bundle_2.11-0.7.0.jar,spark-avro_2.11-2.4.4.jar --conf spark.driver.extraClassPath=spark-avro_2.11-2.4.4.jar --conf spark.executor.extraClassPath=spark-avro_2.11-2.4.4.jar --conf "spark.sql.hive.convertMetastoreParquet=false" --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

// never had timestamps for key 4 and 6. it some how infers this value 2014-01-01 23:00:01

scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._

scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._

scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._

scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._

scala>

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala>

scala>

scala> val tableName = "hudi_trips_cow"
tableName: String = hudi_trips_cow

scala> val basePath = "file:///tmp/hudi_trips_cow"
basePath: String = file:///tmp/hudi_trips_cow

scala>

scala>

scala> import java.sql.Timestamp
import java.sql.Timestamp

scala> import spark.implicits._
import spark.implicits._

scala>

scala> val df = Seq(
     |   (1, Timestamp.valueOf("2014-01-01 23:00:01"),0L),
     |   (2, Timestamp.valueOf("2016-12-29 09:54:00"), 1L),
     |   (3, null, 1L),
     |   (4, null, 1L),
     |   (5, Timestamp.valueOf("2016-12-29 09:54:00"), 1L),
     |   (6, null, 1L)
     |   ). toDF("typeId","eventTime","preComb")
df: org.apache.spark.sql.DataFrame = [typeId: int, eventTime: timestamp ... 1 more field]

scala>

scala> df.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(typeId,IntegerType,false), StructField(eventTime,TimestampType,true), StructField(preComb,LongType,false))

scala>

scala> df.write.format("hudi").
     |         options(getQuickstartWriteConfigs).
     |         option(RECORDKEY_FIELD_OPT_KEY, "typeId").
     |         option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
     |         option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
     |         option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
     |         option(TABLE_NAME, tableName).
     |         option(OPERATION_OPT_KEY, "upsert").
     |         option("hoodie.datasource.compaction.async.enable", "true").
     |         option("hoodie.compact.inline","true").
     |         option("hoodie.keep.max.commits","13").
     |         option("hoodie.cleaner.commits.retained","5").
     |         option("hoodie.compact.inline.max.delta.commits","5").
     |         option("hoodie.keep.min.commits","12").
     |         mode("append").
     |         save(basePath)

scala> var df2 = Seq(
     |   (3, null, 4L),
     |   (8, Timestamp.valueOf("2024-12-29 09:55:00"), 4L),
     |   (9, null, 4L),
     |   (10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L)
     |   ). toDF("typeId","eventTime","preComb")
df2: org.apache.spark.sql.DataFrame = [typeId: int, eventTime: timestamp ... 1 more field]

scala> df2.write.format("hudi").
     |         options(getQuickstartWriteConfigs).
     |         option(RECORDKEY_FIELD_OPT_KEY, "typeId").
     |         option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
     |         option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
     |         option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
     |         option(TABLE_NAME, tableName).
     |         option(OPERATION_OPT_KEY, "upsert").
     |         option("hoodie.datasource.compaction.async.enable", "true").
     |         option("hoodie.compact.inline","true").
     |         option("hoodie.keep.max.commits","13").
     |         option("hoodie.cleaner.commits.retained","5").
     |         option("hoodie.compact.inline.max.delta.commits","5").
     |         option("hoodie.keep.min.commits","12").
     |         mode("append").
     |         save(basePath)

scala> var df3 = Seq(
     |   (3, null, 4L),
     |   (8, Timestamp.valueOf("2024-12-29 09:56:00"), 4L),
     |   (9, null, 4L),
     |   (10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L)
     |   ). toDF("typeId","eventTime","preComb")
df3: org.apache.spark.sql.DataFrame = [typeId: int, eventTime: timestamp ... 1 more field]

scala> df3.write.format("hudi").
     |         options(getQuickstartWriteConfigs).
     |         option(RECORDKEY_FIELD_OPT_KEY, "typeId").
     |         option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
     |         option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
     |         option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
     |         option(TABLE_NAME, tableName).
     |         option(OPERATION_OPT_KEY, "upsert").
     |         option("hoodie.datasource.compaction.async.enable", "true").
     |         option("hoodie.compact.inline","true").
     |         option("hoodie.keep.max.commits","13").
     |         option("hoodie.cleaner.commits.retained","5").
     |         option("hoodie.compact.inline.max.delta.commits","5").
     |         option("hoodie.keep.min.commits","12").
     |         mode("append").
     |         save(basePath)

scala> var df4 = Seq(
     |   (3, null, 4L),
     |   (8, Timestamp.valueOf("2024-12-29 09:57:00"), 4L),
     |   (9, null, 4L),
     |   (10, Timestamp.valueOf("2024-12-29 09:55:00"), 4L)
     |   ). toDF("typeId","eventTime","preComb")
df4: org.apache.spark.sql.DataFrame = [typeId: int, eventTime: timestamp ... 1 more field]

scala> df4.write.format("hudi").
     |         options(getQuickstartWriteConfigs).
     |         option(RECORDKEY_FIELD_OPT_KEY, "typeId").
     |         option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
     |         option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
     |         option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
     |         option(TABLE_NAME, tableName).
     |         option(OPERATION_OPT_KEY, "upsert").
     |         option("hoodie.datasource.compaction.async.enable", "true").
     |         option("hoodie.compact.inline","true").
     |         option("hoodie.keep.max.commits","13").
     |         option("hoodie.cleaner.commits.retained","5").
     |         option("hoodie.compact.inline.max.delta.commits","5").
     |         option("hoodie.keep.min.commits","12").
     |         mode("append").
     |         save(basePath)

scala> val out = spark.read.format("hudi").load(basePath)
out: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 6 more fields]

scala> out.show()
+-------------------+--------------------+------------------+----------------------+--------------------+------+-------------------+-------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|typeId|          eventTime|preComb|
+-------------------+--------------------+------------------+----------------------+--------------------+------+-------------------+-------+
|     20210513234449|  20210513234449_0_1|                 1|                      |b4c53dec-5658-48e...|     1|2014-01-01 23:00:01|      0|
|     20210513234714|  20210513234714_0_6|                 3|                      |b4c53dec-5658-48e...|     3|               null|      4|
|     20210513234449|  20210513234449_0_3|                 5|                      |b4c53dec-5658-48e...|     5|2016-12-29 09:54:00|      1|
|     20210513234449|  20210513234449_0_4|                 2|                      |b4c53dec-5658-48e...|     2|2016-12-29 09:54:00|      1|
|     20210513234449|  20210513234449_0_5|                 4|                      |b4c53dec-5658-48e...|     4|2014-01-01 23:00:01|      1|
|     20210513234449|  20210513234449_0_6|                 6|                      |b4c53dec-5658-48e...|     6|2014-01-01 23:00:01|      1|
|     20210513234714|  20210513234714_0_8|                 8|                      |b4c53dec-5658-48e...|     8|2024-12-29 09:57:00|      4|
|     20210513234714|  20210513234714_0_7|                 9|                      |b4c53dec-5658-48e...|     9|               null|      4|
|     20210513234714|  20210513234714_0_5|                10|                      |b4c53dec-5658-48e...|    10|2024-12-29 09:55:00|      4|
+-------------------+--------------------+------------------+----------------------+--------------------+------+-------------------+-------+

@nsivabalan
Copy link
Author

nsivabalan commented May 14, 2021

./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.6,org.apache.hudi:hudi-spark-bundle_2.11:0.7.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"

import java.sql.Timestamp
import spark.implicits._

val df = Seq(
(1, Timestamp.valueOf("2014-01-01 23:00:01"),0L),
(2, Timestamp.valueOf("2016-12-29 09:54:00"), 1L),
(3, null, 1L),
(4, null, 1L),
(5, Timestamp.valueOf("2016-12-29 09:54:00"), 1L),
(6, null, 1L)
). toDF("typeId","eventTime","preComb")

df.schema

df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "insert").
option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","5").
mode(Overwrite).
save(basePath)

var tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")

tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("describe hudi_trips_snapshot").show()

spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()

var df1 = Seq(
(1, null,2L),
(2, Timestamp.valueOf("2021-12-29 09:54:00"), 2L),
(3, Timestamp.valueOf("2021-12-30 09:54:00"), 2L),
(4, null, 2L),
(7, Timestamp.valueOf("2020-12-29 09:54:00"), 2L),
(8, null, 2L)
). toDF("typeId","eventTime","preComb")

df1.schema

df1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert"). option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","5").
mode(Append).
save(basePath)

tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")

tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("describe hudi_trips_snapshot").show()

spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()

var df2 = Seq(
(3, null, 4L),
(8, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(9, null, 4L),
(10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L)
). toDF("typeId","eventTime","preComb")

df2.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","5").
mode(Append).
save(basePath)

tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")

tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("describe hudi_trips_snapshot").show()

spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()

var df3 = Seq(
(4, null, 4L),
(10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(5, null, 4L),
(11, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(15, null, 4L)
). toDF("typeId","eventTime","preComb")

df3.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","5").
mode(Append).
save(basePath)

tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")

tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("describe hudi_trips_snapshot").show()

spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()

var df4 = Seq(
(3, null, 4L),
(8, Timestamp.valueOf("2024-12-29 09:57:00"), 4L),
(9, null, 4L),
(10, Timestamp.valueOf("2024-12-29 09:55:00"), 4L)
). toDF("typeId","eventTime","preComb")

df4.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","5").
mode(Append).
save(basePath)

tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")

tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("describe hudi_trips_snapshot").show()

spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()

// output after each ingestion. N - null, T -> valid timestamp. I don't see any issues.

id df df1 df2 df3 df4
1 TS N N N N
2 T T T T T
3 N T N N N
4 N N N N N
5 T T T N N
6 N N N N N
7 T T T T
8 N T T T
9 N N N
10 T T T
11 T T
15 N N

Also, I see you have enabled two contradicting configs.
option("hoodie.datasource.compaction.async.enable", "true").
| option("hoodie.compact.inline","true").

can you just enable compact.inline and remove the async one.

@nsivabalan
Copy link
Author

my final output

 spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
+------+-------------------+-------+
|typeId|          eventTime|preComb|
+------+-------------------+-------+
|     1|               null|      2|
|     2|2021-12-29 09:54:00|      2|
|     3|               null|      4|
|     4|               null|      4|
|     5|               null|      4|
|     6|               null|      1|
|     7|2020-12-29 09:54:00|      2|
|     8|2024-12-29 09:57:00|      4|
|     9|               null|      4|
|    10|2024-12-29 09:55:00|      4|
|    11|2024-12-29 09:54:00|      4|
|    15|               null|      4|
+------+-------------------+-------+

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment