Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created December 22, 2021 08:50
Show Gist options
  • Save nsivabalan/fa5d592d8ee08eb51c31835c7a51023c to your computer and use it in GitHub Desktop.
Save nsivabalan/fa5d592d8ee08eb51c31835c7a51023c to your computer and use it in GitHub Desktop.
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.withColumn("rider", lit("rider1")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option(PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload").
option(TABLE_NAME, tableName).
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
mode(Overwrite).
save(basePath)
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select uuid, partitionpath, rider, fare from hudi_trips_snapshot").show(false)
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.withColumn("rider",lit("rider2")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option(PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
import org.apache.spark.sql.types.StringType
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
val df2 = df.withColumn("rider", lit(null).cast(StringType))
df2.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator").
option(PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
val tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath)
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select uuid, partitionpath, rider, fare from hudi_trips_snapshot").show(false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment