Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created April 28, 2021 14:42
Show Gist options
  • Save nsivabalan/03736cda20c10781957b83a89e2f6650 to your computer and use it in GitHub Desktop.
Save nsivabalan/03736cda20c10781957b83a89e2f6650 to your computer and use it in GitHub Desktop.
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "gs://dataproc-staging-us-central1-708638735970-brfkprvv/hudi_trips_cow"
val dataGen = new DataGenerator
// spark-shell
val inserts = convertToStringList(dataGen.generateInserts(500000))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 10))
df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "10").
option("hoodie.upsert.shuffle.parallelism", "10").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Overwrite).
save(basePath)
var tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
val updates = convertToStringList(dataGen.generateUpdates(500000))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 10))
df.write.format("hudi").
option("hoodie.insert.shuffle.parallelism", "10").
option("hoodie.upsert.shuffle.parallelism", "10").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ").
mode(Append).
save(basePath)
tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*/*/*/*")
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment