Last active
January 6, 2022 19:27
-
-
Save nsivabalan/4b901f030bcb80d895fd641d6471fa02 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.apache.hudi.QuickstartUtils._ | |
| import scala.collection.JavaConversions._ | |
| import org.apache.spark.sql.SaveMode._ | |
| import org.apache.hudi.DataSourceReadOptions._ | |
| import org.apache.hudi.DataSourceWriteOptions._ | |
| import org.apache.hudi.config.HoodieWriteConfig._ | |
| val tableName = "hudi_trips_cow" | |
| val basePath = "file:///tmp/hudi_trips_cow" | |
| val dataGen = new DataGenerator | |
| val inserts = convertToStringList(dataGen.generateInserts(10)) | |
| val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
| df.write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option(TABLE_NAME, tableName). | |
| option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
| option("hoodie.metadata.enable","false"). | |
| mode(Overwrite). | |
| save(basePath) | |
| val updates = convertToStringList(dataGen.generateUpdates(10)) | |
| val df0 = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
| df0.withColumn("rider", lit("updates1")).write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option(TABLE_NAME, tableName). | |
| option("hoodie.metadata.enable","false"). | |
| mode(Append). | |
| save(basePath) | |
| val updates1 = convertToStringList(dataGen.generateUpdates(10)) | |
| val df1 = spark.read.json(spark.sparkContext.parallelize(updates1, 2)) | |
| df1.withColumn("rider", lit("updates2")).write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option(TABLE_NAME, tableName). | |
| option("hoodie.metadata.enable","false"). | |
| mode(Append). | |
| save(basePath) | |
| val tripsSnapshotDF = spark. | |
| read. | |
| format("hudi"). | |
| load(basePath) | |
| //load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery | |
| tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
| spark.sql("select rider, count(*) from hudi_trips_snapshot group by 1").show() | |
| spark.sql("select rider, partitionpath, uuid, fare from hudi_trips_snapshot").show(false) | |
| // savepoint via hudi-cli | |
| val updates3 = convertToStringList(dataGen.generateUpdates(10)) | |
| val df3 = spark.read.json(spark.sparkContext.parallelize(updates3, 2)) | |
| df3.withColumn("rider", lit("updates4")).write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option(TABLE_NAME, tableName). | |
| option("hoodie.metadata.enable","false"). | |
| mode(Append). | |
| save(basePath) | |
| val updates4 = convertToStringList(dataGen.generateUpdates(10)) | |
| val df4 = spark.read.json(spark.sparkContext.parallelize(updates4, 2)) | |
| df4.withColumn("rider", lit("updates5")).write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option(TABLE_NAME, tableName). | |
| option("hoodie.metadata.enable","false"). | |
| mode(Append). | |
| save(basePath) | |
| // rolback via hudi-cli | |
| val tripsSnapshotDF = spark. | |
| read. | |
| format("hudi"). | |
| load(basePath) | |
| //load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery | |
| tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
| spark.sql("select rider, count(*) from hudi_trips_snapshot group by 1").show() | |
| spark.sql("select rider, partitionpath, uuid, fare from hudi_trips_snapshot").show(false) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // savepoint | |
| connect --path /tmp/hudi_trips_cow | |
| commits show | |
| set --conf SPARK_HOME=/Users/nsb/Documents/personal/tools/spark-2.4.7-bin-hadoop2.7 | |
| savepoint create --commit 20220105222853592 --sparkMaster local[2] | |
| // rollback | |
| refresh | |
| savepoint rollback --savepoint 20220106085108487 --sparkMaster local[2] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val updates5 = convertToStringList(dataGen.generateUpdates(10)) | |
| val df5 = spark.read.json(spark.sparkContext.parallelize(updates5, 2)) | |
| df5.withColumn("rider", lit("updates6")).write.format("hudi"). | |
| options(getQuickstartWriteConfigs). | |
| option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
| option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
| option(TABLE_NAME, tableName). | |
| option("hoodie.metadata.enable","false"). | |
| option("hoodie.compact.inline","true"). | |
| option("hoodie.compact.inline.max.delta.commits","3"). | |
| mode(Append). | |
| save(basePath) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment