Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created February 9, 2021 14:24
Show Gist options
  • Save nsivabalan/a0b8345f3675c6db431045c511052cfb to your computer and use it in GitHub Desktop.
Save nsivabalan/a0b8345f3675c6db431045c511052cfb to your computer and use it in GitHub Desktop.
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.DataSourceReadOptions
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(HoodieIndexConfig.INDEX_TYPE_PROP, "GLOBAL_BLOOM").
option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH,"true").
option(TABLE_TYPE_OPT_KEY,"MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)
var tripsSnapshotDF = spark.
read.
format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, fare from hudi_trips_snapshot order by _hoodie_record_key").show(false)
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
tripsSnapshotDF = spark.
read.
format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).
load(basePath + "/*/*/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, fare from hudi_trips_snapshot order by _hoodie_record_key").show(false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment