Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created April 1, 2021 16:24
Show Gist options
  • Save nsivabalan/91f12109e0fe1ca9749ff5290c946778 to your computer and use it in GitHub Desktop.
Save nsivabalan/91f12109e0fe1ca9749ff5290c946778 to your computer and use it in GitHub Desktop.
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("intToDouble", IntegerType,true)
))
val data0 = Seq(Row("row_1", "part_0",0L,"bob","v_0",0),
Row("row_2", "part_0",0L,"john","v_0",0),
Row("row_3", "part_0",0L,"tom","v_0",0))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
option(OPERATION_OPT_KEY, "insert").
mode(Overwrite).
save(basePath)
val schemaEvolved = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("intToDouble", DoubleType,true)
))
// insert w/ evolved field.
// update w/ evolved schema
val data1 = Seq(Row("row_2", "part_0",5L,"john","v_3",1.0),
Row("row_3", "part_0",5L,"maroon","v_2",1.0),
Row("row_9", "part_0",5L,"michael","v_2",1.0))
var dfFromData1 = spark.createDataFrame(data1,schemaEvolved)
dfFromData1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
var tripsSnapshotDF1 = spark.
read.
format("hudi").
load(basePath + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select rowId, partitionId, preComb, name, versionId, intToDouble from hudi_trips_snapshot").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment