-
-
Save nsivabalan/138781b76cb6e7cd97790e10b2640b29 to your computer and use it in GitHub Desktop.
| //output after 1st compaction( i.e. after first 2 delta commits) | |
| scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show() | |
| +------+-------------------+-------+ | |
| |typeId| eventTime|preComb| | |
| +------+-------------------+-------+ | |
| | 1| null| 2| | |
| | 2|2021-12-29 09:54:00| 2| | |
| | 3|2021-12-30 09:54:00| 2| | |
| | 4| null| 2| | |
| | 5|2016-12-29 09:54:00| 1| | |
| | 6| null| 1| | |
| | 7|2020-12-29 09:54:00| 2| | |
| | 8| null| 2| | |
| +------+-------------------+-------+ | |
| // after 1 ingestion, and ensuring I had parquet and log file | |
| scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show() | |
| +------+-------------------+-------+ | |
| |typeId| eventTime|preComb| | |
| +------+-------------------+-------+ | |
| | 1| null| 2| | |
| | 2|2021-12-29 09:54:00| 2| | |
| | 3| null| 4| | |
| | 4| null| 2| | |
| | 5|2016-12-29 09:54:00| 1| | |
| | 6| null| 1| | |
| | 7|2020-12-29 09:54:00| 2| | |
| | 8|2024-12-29 09:54:00| 4| | |
| | 9| null| 4| | |
| +------+-------------------+-------+ | |
| // after 2 more ingestion and 2nd time compaction got triggered. | |
| scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show() | |
| +------+-------------------+-------+ | |
| |typeId| eventTime|preComb| | |
| +------+-------------------+-------+ | |
| | 1| null| 2| | |
| | 2|2021-12-29 09:54:00| 2| | |
| | 3| null| 4| | |
| | 4| null| 4| | |
| | 5| null| 4| | |
| | 6| null| 1| | |
| | 7|2020-12-29 09:54:00| 2| | |
| | 8|2024-12-29 09:54:00| 4| | |
| | 9| null| 4| | |
| | 11|2024-12-29 09:54:00| 4| | |
| | 15| null| 4| | |
| | 10|2024-12-29 09:54:00| 4| | |
| +------+-------------------+-------+ | |
| import org.apache.hudi.QuickstartUtils._ | |
| import scala.collection.JavaConversions._ | |
| import org.apache.spark.sql.SaveMode._ | |
| import org.apache.hudi.DataSourceReadOptions._ | |
| import org.apache.hudi.DataSourceWriteOptions._ | |
| import org.apache.hudi.config.HoodieWriteConfig._ | |
| import org.apache.spark.sql.types._ | |
| import org.apache.spark.sql.Row | |
| val tableName = "hudi_trips_cow" | |
| val basePath = "file:///tmp/hudi_trips_cow" | |
| import java.sql.Timestamp | |
| import spark.implicits._ | |
| val df = Seq( | |
| (1, Timestamp.valueOf("2014-01-01 23:00:01"),0L), | |
| (2, Timestamp.valueOf("2016-12-29 09:54:00"), 1L), | |
| (3, null, 1L), | |
| (4, null, 1L), | |
| (5, Timestamp.valueOf("2016-12-29 09:54:00"), 1L), | |
| (6, null, 1L) | |
| ). toDF("typeId","eventTime","preComb") | |
| df.schema | |
| df.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(RECORDKEY_FIELD_OPT_KEY, "typeId"). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
| option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
| option("hoodie.index.type","SIMPLE"). | |
| option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
| option(TABLE_NAME, tableName). | |
| option(OPERATION_OPT_KEY, "insert"). | |
| option("hoodie.compact.inline","true"). | |
| option("hoodie.keep.max.commits","3"). | |
| option("hoodie.cleaner.commits.retained","1"). | |
| option("hoodie.compact.inline.max.delta.commits","2"). | |
| option("hoodie.keep.min.commits","2"). | |
| mode(Overwrite). | |
| save(basePath) | |
| var tripsSnapshotDF1 = spark. | |
| read. | |
| format("hudi"). | |
| load(basePath + "/*") | |
| tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
| spark.sql("describe hudi_trips_snapshot").show() | |
| spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show() | |
| var df1 = Seq( | |
| (1, null,2L), | |
| (2, Timestamp.valueOf("2021-12-29 09:54:00"), 2L), | |
| (3, Timestamp.valueOf("2021-12-30 09:54:00"), 2L), | |
| (4, null, 2L), | |
| (7, Timestamp.valueOf("2020-12-29 09:54:00"), 2L), | |
| (8, null, 2L) | |
| ). toDF("typeId","eventTime","preComb") | |
| df1.schema | |
| df1.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(RECORDKEY_FIELD_OPT_KEY, "typeId"). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
| option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
| option("hoodie.index.type","SIMPLE"). | |
| option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
| option(TABLE_NAME, tableName). | |
| option(OPERATION_OPT_KEY, "upsert"). | |
| option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true"). | |
| option("hoodie.keep.max.commits","3"). | |
| option("hoodie.compact.inline.max.delta.commits","2"). | |
| option("hoodie.cleaner.commits.retained","1"). | |
| option("hoodie.keep.min.commits","2"). | |
| mode(Append). | |
| save(basePath) | |
| // ingested df1 once again and compaction triggered. now I just see just parquet. | |
| var df2 = Seq( | |
| (3, null, 4L), | |
| (8, Timestamp.valueOf("2024-12-29 09:54:00"), 4L), | |
| (9, null, 4L), | |
| (10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L) | |
| ). toDF("typeId","eventTime","preComb") | |
| df2.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(RECORDKEY_FIELD_OPT_KEY, "typeId"). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
| option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
| option("hoodie.index.type","SIMPLE"). | |
| option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
| option(TABLE_NAME, tableName). | |
| option(OPERATION_OPT_KEY, "upsert"). | |
| option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true"). | |
| option("hoodie.keep.max.commits","3"). | |
| option("hoodie.compact.inline.max.delta.commits","2"). | |
| option("hoodie.cleaner.commits.retained","1"). | |
| option("hoodie.keep.min.commits","2"). | |
| mode(Append). | |
| save(basePath) | |
| // now, 1 parquet + 1 log file. read the table to ensure I do see nulls for corres records | |
| var df3 = Seq( | |
| (4, null, 4L), | |
| (10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L), | |
| (5, null, 4L), | |
| (11, Timestamp.valueOf("2024-12-29 09:54:00"), 4L), | |
| (15, null, 4L) | |
| ). toDF("typeId","eventTime","preComb") | |
| df3.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(RECORDKEY_FIELD_OPT_KEY, "typeId"). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
| option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
| option("hoodie.index.type","SIMPLE"). | |
| option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
| option(TABLE_NAME, tableName). | |
| option(OPERATION_OPT_KEY, "upsert"). | |
| option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true"). | |
| option("hoodie.keep.max.commits","3"). | |
| option("hoodie.compact.inline.max.delta.commits","2"). | |
| option("hoodie.cleaner.commits.retained","1"). | |
| option("hoodie.keep.min.commits","2"). | |
| mode(Append). | |
| save(basePath) | |
| // ingested df3 again to ensure compaction got triggered. | |
| // again read to ensure corres records have null for timestamp col. | |
./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.6,org.apache.hudi:hudi-spark-bundle_2.11:0.7.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
import java.sql.Timestamp
import spark.implicits._
val df = Seq(
(1, Timestamp.valueOf("2014-01-01 23:00:01"),0L),
(2, Timestamp.valueOf("2016-12-29 09:54:00"), 1L),
(3, null, 1L),
(4, null, 1L),
(5, Timestamp.valueOf("2016-12-29 09:54:00"), 1L),
(6, null, 1L)
). toDF("typeId","eventTime","preComb")
df.schema
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "insert").
option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","5").
mode(Overwrite).
save(basePath)
var tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("describe hudi_trips_snapshot").show()
spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
var df1 = Seq(
(1, null,2L),
(2, Timestamp.valueOf("2021-12-29 09:54:00"), 2L),
(3, Timestamp.valueOf("2021-12-30 09:54:00"), 2L),
(4, null, 2L),
(7, Timestamp.valueOf("2020-12-29 09:54:00"), 2L),
(8, null, 2L)
). toDF("typeId","eventTime","preComb")
df1.schema
df1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert"). option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","5").
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("describe hudi_trips_snapshot").show()
spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
var df2 = Seq(
(3, null, 4L),
(8, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(9, null, 4L),
(10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L)
). toDF("typeId","eventTime","preComb")
df2.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","5").
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("describe hudi_trips_snapshot").show()
spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
var df3 = Seq(
(4, null, 4L),
(10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(5, null, 4L),
(11, Timestamp.valueOf("2024-12-29 09:54:00"), 4L),
(15, null, 4L)
). toDF("typeId","eventTime","preComb")
df3.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","5").
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("describe hudi_trips_snapshot").show()
spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
var df4 = Seq(
(3, null, 4L),
(8, Timestamp.valueOf("2024-12-29 09:57:00"), 4L),
(9, null, 4L),
(10, Timestamp.valueOf("2024-12-29 09:55:00"), 4L)
). toDF("typeId","eventTime","preComb")
df4.write.format("hudi").
options(getQuickstartWriteConfigs).
option(RECORDKEY_FIELD_OPT_KEY, "typeId").
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option("hoodie.index.type","SIMPLE").
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","5").
mode(Append).
save(basePath)
tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("describe hudi_trips_snapshot").show()
spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
// output after each ingestion. N - null, T -> valid timestamp. I don't see any issues.
| id | df | df1 | df2 | df3 | df4 |
|---|---|---|---|---|---|
| 1 | TS | N | N | N | N |
| 2 | T | T | T | T | T |
| 3 | N | T | N | N | N |
| 4 | N | N | N | N | N |
| 5 | T | T | T | N | N |
| 6 | N | N | N | N | N |
| 7 | T | T | T | T | |
| 8 | N | T | T | T | |
| 9 | N | N | N | ||
| 10 | T | T | T | ||
| 11 | T | T | |||
| 15 | N | N |
Also, I see you have enabled two contradicting configs.
option("hoodie.datasource.compaction.async.enable", "true").
| option("hoodie.compact.inline","true").
can you just enable compact.inline and remove the async one.
my final output
spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show()
+------+-------------------+-------+
|typeId| eventTime|preComb|
+------+-------------------+-------+
| 1| null| 2|
| 2|2021-12-29 09:54:00| 2|
| 3| null| 4|
| 4| null| 4|
| 5| null| 4|
| 6| null| 1|
| 7|2020-12-29 09:54:00| 2|
| 8|2024-12-29 09:57:00| 4|
| 9| null| 4|
| 10|2024-12-29 09:55:00| 4|
| 11|2024-12-29 09:54:00| 4|
| 15| null| 4|
+------+-------------------+-------+
//here is what I did and was able to reproduce the issue with spark 2.4 Hudi version 0.7.0
//spark-shell command
// never had timestamps for key 4 and 6. it some how infers this value 2014-01-01 23:00:01