Created
February 9, 2021 14:24
-
-
Save nsivabalan/a0b8345f3675c6db431045c511052cfb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
import org.apache.hudi.config.HoodieIndexConfig | |
import org.apache.hudi.DataSourceReadOptions | |
val tableName = "hudi_trips_cow" | |
val basePath = "file:///tmp/hudi_trips_cow" | |
val dataGen = new DataGenerator | |
val inserts = convertToStringList(dataGen.generateInserts(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(HoodieIndexConfig.INDEX_TYPE_PROP, "GLOBAL_BLOOM"). | |
option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH,"true"). | |
option(TABLE_TYPE_OPT_KEY,"MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Overwrite). | |
save(basePath) | |
var tripsSnapshotDF = spark. | |
read. | |
format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL). | |
load(basePath + "/*/*/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, fare from hudi_trips_snapshot order by _hoodie_record_key").show(false) | |
val updates = convertToStringList(dataGen.generateUpdates(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF = spark. | |
read. | |
format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL). | |
load(basePath + "/*/*/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, fare from hudi_trips_snapshot order by _hoodie_record_key").show(false) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment