Created
December 22, 2021 08:50
-
-
Save nsivabalan/fa5d592d8ee08eb51c31835c7a51023c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
val tableName = "hudi_trips_cow" | |
val basePath = "file:///tmp/hudi_trips_cow" | |
val dataGen = new DataGenerator | |
val inserts = convertToStringList(dataGen.generateInserts(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df.withColumn("rider", lit("rider1")).write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
option(PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload"). | |
option(TABLE_NAME, tableName). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
mode(Overwrite). | |
save(basePath) | |
val tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath) | |
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select uuid, partitionpath, rider, fare from hudi_trips_snapshot").show(false) | |
val updates = convertToStringList(dataGen.generateUpdates(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df.withColumn("rider",lit("rider2")).write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
option(PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
import org.apache.spark.sql.types.StringType | |
val updates = convertToStringList(dataGen.generateUpdates(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
val df2 = df.withColumn("rider", lit(null).cast(StringType)) | |
df2.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.NonpartitionedKeyGenerator"). | |
option(PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
val tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath) | |
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select uuid, partitionpath, rider, fare from hudi_trips_snapshot").show(false) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment