Last active
February 10, 2021 13:20
-
-
Save nsivabalan/7250b794788516f1aec35650c2632364 to your computer and use it in GitHub Desktop.
MOR read optimized query
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark-shell --packages org.apache.hudi:hudi-spark-bundle_2.11:0.7.0,org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' | |
import java.io.File | |
import java.nio.file.Paths | |
import java.sql.Timestamp | |
import org.apache.commons.io.FileUtils | |
import org.apache.hudi.DataSourceWriteOptions | |
import org.apache.hudi.DataSourceReadOptions | |
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieStorageConfig, HoodieWriteConfig} | |
import org.apache.hudi.hive.MultiPartKeysValueExtractor | |
import org.apache.hudi.index.HoodieIndex | |
import org.apache.hudi.keygen.ComplexKeyGenerator | |
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload | |
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy | |
import org.apache.log4j.Level | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.types.{DataTypes, StructField, StructType} | |
val hudiFormat: String = "org.apache.hudi"; | |
val hudiOutputDir = "file:///tmp/hudi_trips_cow" | |
val tableName = "hudi_trips_cow" | |
val primaryKey: String = "id" | |
val precombineKey: String = "__timestamp" | |
val partitionKey: String = "__process_date" | |
val operationKey: String = "__op" | |
val schema: List[StructField] = List( | |
StructField(primaryKey, DataTypes.LongType, false), | |
StructField("name", DataTypes.StringType, false), | |
StructField("desc", DataTypes.StringType, true), | |
StructField(partitionKey, DataTypes.StringType, true), | |
StructField(precombineKey, DataTypes.TimestampType, false), | |
StructField(operationKey, DataTypes.StringType, true) | |
) | |
var data: Seq[Row] = Seq( | |
Row(1L, "Bob", "Manager II", "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), null), | |
Row(2L, "John", "Engineer I", "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), null), | |
Row(3L, "Michael", null, "2020-01-04", Timestamp.valueOf("2020-01-04 00:00:00"), "D"), | |
Row(4L, "William", "Manager I", "1998-04-13", Timestamp.valueOf("1998-04-13 00:00:00"), "I"), | |
Row(5L, "Fred", "Engineer III", "2020-01-01", Timestamp.valueOf("2020-01-01 00:00:00"), "I") | |
) | |
//Test DataFrame | |
var df: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema)) | |
//Hudi Configs | |
val hudiOptions = Map[String, String] ( | |
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, | |
//Table-specific configs | |
HoodieWriteConfig.TABLE_NAME -> tableName, | |
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> primaryKey, | |
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> precombineKey, | |
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> partitionKey | |
) | |
df.printSchema() | |
val hudiOutputTable = "file:///tmp/hudi_trips_cow" | |
df.write.format("hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiOutputTable) | |
var tripsSnapshotDF = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(hudiOutputTable + "/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, id, __op from hudi_trips_snapshot order by _hoodie_record_key").show(false) | |
val data1 = Seq( | |
Row(2L, "John", "Engineer I", "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), "D"), | |
Row(3L, "Michael", "Engineer II", "2020-01-04", Timestamp.valueOf("2020-01-04 00:00:00"), "D"), | |
Row(6L, "Smith", "Manager I", "1998-04-13", Timestamp.valueOf("1998-04-13 00:00:00"), "I") | |
) | |
val df1 = spark.createDataFrame(spark.sparkContext.parallelize(data1), StructType(schema)) | |
df1.show(false) | |
df1.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiOutputTable) | |
val tripsSnapshotDF1 = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(hudiOutputTable + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, id, __op from hudi_trips_snapshot order by _hoodie_record_key").show(false) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment