Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created July 22, 2021 02:22
Show Gist options
  • Save nsivabalan/d75f3f948c5f01a0b3454b986c3b9fe6 to your computer and use it in GitHub Desktop.
Save nsivabalan/d75f3f948c5f01a0b3454b986c3b9fe6 to your computer and use it in GitHub Desktop.
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("value", LongType,true)
))
val data0 = Seq(Row("row_1", "part_0",0L,0L),
Row("row_2", "part_0",0L,0L),
Row("row_3", "part_0",0L,0L))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY.key(), "preComb").
option(RECORDKEY_FIELD_OPT_KEY.key(), "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY.key(), "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME.key(), tableName).
option(TABLE_TYPE_OPT_KEY.key(), "MERGE_ON_READ").
option(OPERATION_OPT_KEY.key(), "upsert").
mode(Overwrite).
save(basePath)
var tripsSnapshotDF = spark.
read.
format("hudi").
load(basePath + "/*")
tripsSnapshotDF.createOrReplaceTempView("tmp_tbl")
spark.sql("select rowId, preComb, value from tmp_tbl").show(false)
val data1 = Seq(Row("row_1", "part_0",1L,1L),
Row("row_2", "part_0",1L,1L),
Row("row_3", "part_0",1L,1L),
Row("row_4", "part_0",1L,1L),
Row("row_5", "part_0",1L,1L),
Row("row_6", "part_0",1L,1L))
var dfFromData1 = spark.createDataFrame(data1,schema)
dfFromData1.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY.key(), "preComb").
option(RECORDKEY_FIELD_OPT_KEY.key(), "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY.key(), "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME.key(), tableName).
option(TABLE_TYPE_OPT_KEY.key(), "MERGE_ON_READ").
option(OPERATION_OPT_KEY.key(), "upsert").
mode(Append).
save(basePath)
val data2 = Seq(Row("row_1", "part_0",2L,2L),
Row("row_2", "part_0",2L,2L),
Row("row_3", "part_0",2L,2L),
Row("row_4", "part_0",2L,2L),
Row("row_5", "part_0",2L,2L),
Row("row_6", "part_0",2L,2L),
Row("row_7", "part_0",2L,2L),
Row("row_8", "part_0",2L,2L),
Row("row_9", "part_0",2L,2L))
var dfFromData2 = spark.createDataFrame(data2,schema)
dfFromData2.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY.key(), "preComb").
option(RECORDKEY_FIELD_OPT_KEY.key(), "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY.key(), "partitionId").
option("hoodie.index.type","SIMPLE").
option(TABLE_NAME.key(), tableName).
option(TABLE_TYPE_OPT_KEY.key(), "MERGE_ON_READ").
option(OPERATION_OPT_KEY.key(), "upsert").
mode(Append).
save(basePath)
// find the first commit time and set appropriately below.
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY.key(), QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY.key(), "20210721221211").
load(basePath + "/*")
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select rowId, preComb, value from hudi_trips_incremental").show(false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment