Last active
May 14, 2021 16:10
-
-
Save nsivabalan/138781b76cb6e7cd97790e10b2640b29 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //output after 1st compaction( i.e. after first 2 delta commits) | |
| scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show() | |
| +------+-------------------+-------+ | |
| |typeId| eventTime|preComb| | |
| +------+-------------------+-------+ | |
| | 1| null| 2| | |
| | 2|2021-12-29 09:54:00| 2| | |
| | 3|2021-12-30 09:54:00| 2| | |
| | 4| null| 2| | |
| | 5|2016-12-29 09:54:00| 1| | |
| | 6| null| 1| | |
| | 7|2020-12-29 09:54:00| 2| | |
| | 8| null| 2| | |
| +------+-------------------+-------+ | |
| // after 1 ingestion, and ensuring I had parquet and log file | |
| scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show() | |
| +------+-------------------+-------+ | |
| |typeId| eventTime|preComb| | |
| +------+-------------------+-------+ | |
| | 1| null| 2| | |
| | 2|2021-12-29 09:54:00| 2| | |
| | 3| null| 4| | |
| | 4| null| 2| | |
| | 5|2016-12-29 09:54:00| 1| | |
| | 6| null| 1| | |
| | 7|2020-12-29 09:54:00| 2| | |
| | 8|2024-12-29 09:54:00| 4| | |
| | 9| null| 4| | |
| +------+-------------------+-------+ | |
| // after 2 more ingestion and 2nd time compaction got triggered. | |
| scala> spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show() | |
| +------+-------------------+-------+ | |
| |typeId| eventTime|preComb| | |
| +------+-------------------+-------+ | |
| | 1| null| 2| | |
| | 2|2021-12-29 09:54:00| 2| | |
| | 3| null| 4| | |
| | 4| null| 4| | |
| | 5| null| 4| | |
| | 6| null| 1| | |
| | 7|2020-12-29 09:54:00| 2| | |
| | 8|2024-12-29 09:54:00| 4| | |
| | 9| null| 4| | |
| | 11|2024-12-29 09:54:00| 4| | |
| | 15| null| 4| | |
| | 10|2024-12-29 09:54:00| 4| | |
| +------+-------------------+-------+ | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.apache.hudi.QuickstartUtils._ | |
| import scala.collection.JavaConversions._ | |
| import org.apache.spark.sql.SaveMode._ | |
| import org.apache.hudi.DataSourceReadOptions._ | |
| import org.apache.hudi.DataSourceWriteOptions._ | |
| import org.apache.hudi.config.HoodieWriteConfig._ | |
| import org.apache.spark.sql.types._ | |
| import org.apache.spark.sql.Row | |
| val tableName = "hudi_trips_cow" | |
| val basePath = "file:///tmp/hudi_trips_cow" | |
| import java.sql.Timestamp | |
| import spark.implicits._ | |
| val df = Seq( | |
| (1, Timestamp.valueOf("2014-01-01 23:00:01"),0L), | |
| (2, Timestamp.valueOf("2016-12-29 09:54:00"), 1L), | |
| (3, null, 1L), | |
| (4, null, 1L), | |
| (5, Timestamp.valueOf("2016-12-29 09:54:00"), 1L), | |
| (6, null, 1L) | |
| ). toDF("typeId","eventTime","preComb") | |
| df.schema | |
| df.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(RECORDKEY_FIELD_OPT_KEY, "typeId"). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
| option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
| option("hoodie.index.type","SIMPLE"). | |
| option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
| option(TABLE_NAME, tableName). | |
| option(OPERATION_OPT_KEY, "insert"). | |
| option("hoodie.compact.inline","true"). | |
| option("hoodie.keep.max.commits","3"). | |
| option("hoodie.cleaner.commits.retained","1"). | |
| option("hoodie.compact.inline.max.delta.commits","2"). | |
| option("hoodie.keep.min.commits","2"). | |
| mode(Overwrite). | |
| save(basePath) | |
| var tripsSnapshotDF1 = spark. | |
| read. | |
| format("hudi"). | |
| load(basePath + "/*") | |
| tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
| spark.sql("describe hudi_trips_snapshot").show() | |
| spark.sql("select typeId, eventTime, preComb from hudi_trips_snapshot").show() | |
| var df1 = Seq( | |
| (1, null,2L), | |
| (2, Timestamp.valueOf("2021-12-29 09:54:00"), 2L), | |
| (3, Timestamp.valueOf("2021-12-30 09:54:00"), 2L), | |
| (4, null, 2L), | |
| (7, Timestamp.valueOf("2020-12-29 09:54:00"), 2L), | |
| (8, null, 2L) | |
| ). toDF("typeId","eventTime","preComb") | |
| df1.schema | |
| df1.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(RECORDKEY_FIELD_OPT_KEY, "typeId"). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
| option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
| option("hoodie.index.type","SIMPLE"). | |
| option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
| option(TABLE_NAME, tableName). | |
| option(OPERATION_OPT_KEY, "upsert"). | |
| option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true"). | |
| option("hoodie.keep.max.commits","3"). | |
| option("hoodie.compact.inline.max.delta.commits","2"). | |
| option("hoodie.cleaner.commits.retained","1"). | |
| option("hoodie.keep.min.commits","2"). | |
| mode(Append). | |
| save(basePath) | |
| // ingested df1 once again and compaction triggered. now I just see just parquet. | |
| var df2 = Seq( | |
| (3, null, 4L), | |
| (8, Timestamp.valueOf("2024-12-29 09:54:00"), 4L), | |
| (9, null, 4L), | |
| (10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L) | |
| ). toDF("typeId","eventTime","preComb") | |
| df2.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(RECORDKEY_FIELD_OPT_KEY, "typeId"). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
| option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
| option("hoodie.index.type","SIMPLE"). | |
| option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
| option(TABLE_NAME, tableName). | |
| option(OPERATION_OPT_KEY, "upsert"). | |
| option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true"). | |
| option("hoodie.keep.max.commits","3"). | |
| option("hoodie.compact.inline.max.delta.commits","2"). | |
| option("hoodie.cleaner.commits.retained","1"). | |
| option("hoodie.keep.min.commits","2"). | |
| mode(Append). | |
| save(basePath) | |
| // now, 1 parquet + 1 log file. read the table to ensure I do see nulls for corres records | |
| var df3 = Seq( | |
| (4, null, 4L), | |
| (10, Timestamp.valueOf("2024-12-29 09:54:00"), 4L), | |
| (5, null, 4L), | |
| (11, Timestamp.valueOf("2024-12-29 09:54:00"), 4L), | |
| (15, null, 4L) | |
| ). toDF("typeId","eventTime","preComb") | |
| df3.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(RECORDKEY_FIELD_OPT_KEY, "typeId"). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
| option(KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
| option("hoodie.index.type","SIMPLE"). | |
| option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
| option(TABLE_NAME, tableName). | |
| option(OPERATION_OPT_KEY, "upsert"). | |
| option("hoodie.combine.before.upsert","false"). option("hoodie.compact.inline","true"). | |
| option("hoodie.keep.max.commits","3"). | |
| option("hoodie.compact.inline.max.delta.commits","2"). | |
| option("hoodie.cleaner.commits.retained","1"). | |
| option("hoodie.keep.min.commits","2"). | |
| mode(Append). | |
| save(basePath) | |
| // ingested df3 again to ensure compaction got triggered. | |
| // again read to ensure corres records have null for timestamp col. | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
my final output