Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active December 16, 2021 08:55
Show Gist options
  • Save nsivabalan/3577c0c89918886555cc843ccaafe8e2 to your computer and use it in GitHub Desktop.
Save nsivabalan/3577c0c89918886555cc843ccaafe8e2 to your computer and use it in GitHub Desktop.
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_trips_cow1"
val basePath = "/tmp/hudi_trips_cow1"
val schema = StructType( Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))
val data0 = Seq(Row("row_1", "2021-01-01",0L,"bob","v_0","toBeDel0",0,1000000L),
Row("row_2", "2021-01-01",0L,"john","v_0","toBeDel0",0,1000000L),
Row("row_3", "2021-01-01",0L,"tom","v_0","toBeDel0",0,1000000L))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option("hoodie.datasource.write.hive_style_partitioning","true").
option(TABLE_NAME, tableName).
option("hoodie.index.type","SIMPLE").
option(OPERATION_OPT_KEY, "insert").
option("hoodie.datasource.hive_sync.mode","hms").
option("hoodie.datasource.hive_sync.database","testdb").
option("hoodie.datasource.hive_sync.table","testtable1").
option("hoodie.datasource.hive_sync.partition_fields","partitionId").
option("hoodie.datasource.hive_sync.enable","true").
option("hoodie.datasource.write.drop.partition.columns","true").
mode(Overwrite).
save(basePath)
val tripsSnapshotDF = spark.read.format("hudi").load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_tbl")
spark.sql("desc hudi_tbl").show()
spark.sql("select * from hudi_tbl limit 3").show()
spark.sql("desc testdb.testtable").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment