Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active January 6, 2022 19:27
Show Gist options
  • Save nsivabalan/4b901f030bcb80d895fd641d6471fa02 to your computer and use it in GitHub Desktop.
Save nsivabalan/4b901f030bcb80d895fd641d6471fa02 to your computer and use it in GitHub Desktop.
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.metadata.enable","false").
mode(Overwrite).
save(basePath)
val updates = convertToStringList(dataGen.generateUpdates(10))
val df0 = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df0.withColumn("rider", lit("updates1")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.metadata.enable","false").
mode(Append).
save(basePath)
val updates1 = convertToStringList(dataGen.generateUpdates(10))
val df1 = spark.read.json(spark.sparkContext.parallelize(updates1, 2))
df1.withColumn("rider", lit("updates2")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.metadata.enable","false").
mode(Append).
save(basePath)
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rider, count(*) from hudi_trips_snapshot group by 1").show()
spark.sql("select rider, partitionpath, uuid, fare from hudi_trips_snapshot").show(false)
// savepoint via hudi-cli
val updates3 = convertToStringList(dataGen.generateUpdates(10))
val df3 = spark.read.json(spark.sparkContext.parallelize(updates3, 2))
df3.withColumn("rider", lit("updates4")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.metadata.enable","false").
mode(Append).
save(basePath)
val updates4 = convertToStringList(dataGen.generateUpdates(10))
val df4 = spark.read.json(spark.sparkContext.parallelize(updates4, 2))
df4.withColumn("rider", lit("updates5")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.metadata.enable","false").
mode(Append).
save(basePath)
// rolback via hudi-cli
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rider, count(*) from hudi_trips_snapshot group by 1").show()
spark.sql("select rider, partitionpath, uuid, fare from hudi_trips_snapshot").show(false)
// savepoint
connect --path /tmp/hudi_trips_cow
commits show
set --conf SPARK_HOME=/Users/nsb/Documents/personal/tools/spark-2.4.7-bin-hadoop2.7
savepoint create --commit 20220105222853592 --sparkMaster local[2]
// rollback
refresh
savepoint rollback --savepoint 20220106085108487 --sparkMaster local[2]
val updates5 = convertToStringList(dataGen.generateUpdates(10))
val df5 = spark.read.json(spark.sparkContext.parallelize(updates5, 2))
df5.withColumn("rider", lit("updates6")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option("hoodie.metadata.enable","false").
option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","3").
mode(Append).
save(basePath)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment