Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active February 10, 2021 13:20
Show Gist options
  • Save nsivabalan/7250b794788516f1aec35650c2632364 to your computer and use it in GitHub Desktop.
Save nsivabalan/7250b794788516f1aec35650c2632364 to your computer and use it in GitHub Desktop.
MOR read optimized query
spark-shell --packages org.apache.hudi:hudi-spark-bundle_2.11:0.7.0,org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
import java.io.File
import java.nio.file.Paths
import java.sql.Timestamp
import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy
import org.apache.log4j.Level
import org.apache.spark.sql._
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
val hudiFormat: String = "org.apache.hudi";
val hudiOutputDir = "file:///tmp/hudi_trips_cow"
val tableName = "hudi_trips_cow"
val primaryKey: String = "id"
val precombineKey: String = "__timestamp"
val partitionKey: String = "__process_date"
val operationKey: String = "__op"
val schema: List[StructField] = List(
StructField(primaryKey, DataTypes.LongType, false),
StructField("name", DataTypes.StringType, false),
StructField("desc", DataTypes.StringType, true),
StructField(partitionKey, DataTypes.StringType, true),
StructField(precombineKey, DataTypes.TimestampType, false),
StructField(operationKey, DataTypes.StringType, true)
)
var data: Seq[Row] = Seq(
Row(1L, "Bob", "Manager II", "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), null),
Row(2L, "John", "Engineer I", "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), null),
Row(3L, "Michael", null, "2020-01-04", Timestamp.valueOf("2020-01-04 00:00:00"), "D"),
Row(4L, "William", "Manager I", "1998-04-13", Timestamp.valueOf("1998-04-13 00:00:00"), "I"),
Row(5L, "Fred", "Engineer III", "2020-01-01", Timestamp.valueOf("2020-01-01 00:00:00"), "I")
)
//Test DataFrame
var df: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))
//Hudi Configs
val hudiOptions = Map[String, String] (
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
//Table-specific configs
HoodieWriteConfig.TABLE_NAME -> tableName,
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> primaryKey,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> precombineKey,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> partitionKey
)
df.printSchema()
val hudiOutputTable = "file:///tmp/hudi_trips_cow"
df.write.format("hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiOutputTable)
var tripsSnapshotDF = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(hudiOutputTable + "/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, id, __op from hudi_trips_snapshot order by _hoodie_record_key").show(false)
val data1 = Seq(
Row(2L, "John", "Engineer I", "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), "D"),
Row(3L, "Michael", "Engineer II", "2020-01-04", Timestamp.valueOf("2020-01-04 00:00:00"), "D"),
Row(6L, "Smith", "Manager I", "1998-04-13", Timestamp.valueOf("1998-04-13 00:00:00"), "I")
)
val df1 = spark.createDataFrame(spark.sparkContext.parallelize(data1), StructType(schema))
df1.show(false)
df1.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiOutputTable)
val tripsSnapshotDF1 = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(hudiOutputTable + "/*/*")
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, id, __op from hudi_trips_snapshot order by _hoodie_record_key").show(false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment