Created
February 9, 2021 14:26
-
-
Save nsivabalan/1aaf0bfca044b0413f951d941bb8d801 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import java.nio.file.Paths | |
import java.sql.Timestamp | |
import org.apache.commons.io.FileUtils | |
import org.apache.hudi.DataSourceWriteOptions | |
import org.apache.hudi.DataSourceReadOptions | |
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieStorageConfig, HoodieWriteConfig} | |
import org.apache.hudi.hive.MultiPartKeysValueExtractor | |
import org.apache.hudi.index.HoodieIndex | |
import org.apache.hudi.keygen.ComplexKeyGenerator | |
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload | |
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy | |
import org.apache.log4j.Level | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.types.{DataTypes, StructField, StructType} | |
val hudiFormat: String = "org.apache.hudi"; | |
val hudiOutputDir = "file:///tmp/hudi_trips_cow" | |
val tableName = "hudi_trips_cow" | |
val primaryKey: String = "id"; | |
val precombineKey: String = "__timestamp"; | |
val partitionKey: String = "__process_date"; | |
val operationKey: String = "__op"; | |
val schema: List[StructField] = List( | |
StructField(primaryKey, DataTypes.LongType, false), | |
StructField("name", DataTypes.StringType, false), | |
StructField("desc", DataTypes.StringType, true), | |
StructField(partitionKey, DataTypes.StringType, true), | |
StructField(precombineKey, DataTypes.TimestampType, false), | |
StructField(operationKey, DataTypes.StringType, true) | |
); | |
var data: Seq[Row] = Seq( | |
Row(1L, "Bob", "Manager II", "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), null), | |
Row(2L, "John", "Engineer I", "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), null), | |
Row(3L, "Michael", null, "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), null), | |
Row(3L, "Michael", null, "2020-01-04", Timestamp.valueOf("2020-01-04 00:00:00"), "D"), | |
Row(4L, "William", "Manager I", "1998-04-13", Timestamp.valueOf("1998-04-13 00:00:00"), "I"), | |
Row(5L, "Fred", "Engineer III", "2020-01-01", Timestamp.valueOf("2020-01-01 00:00:00"), "I") | |
); | |
//Test DataFrame | |
var df: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema)); | |
//Hudi Configs | |
val hudiOptions = Map[String, String] ( | |
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, | |
DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> classOf[OverwriteWithLatestAvroPayload].getName, | |
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName, | |
HoodieCompactionConfig.PAYLOAD_CLASS_PROP -> classOf[OverwriteWithLatestAvroPayload].getName, | |
HoodieIndexConfig.INDEX_TYPE_PROP -> HoodieIndex.IndexType.GLOBAL_BLOOM.toString, | |
HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH -> String.valueOf(true), | |
//Table-specific configs | |
HoodieWriteConfig.TABLE_NAME -> tableName, | |
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> primaryKey, | |
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> precombineKey, | |
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> partitionKey | |
) | |
df.printSchema() | |
val hudiOutputTable = "file:///tmp/hudi_trips_cow" | |
df.write.format(hudiFormat).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiOutputTable) | |
var tripsSnapshotDF = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(hudiOutputTable + "/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, id, __op from hudi_trips_snapshot order by _hoodie_record_key").show(false) | |
data = Seq( | |
Row(2L, "John", "Engineer I", "2020-01-04", Timestamp.valueOf("2020-01-04 00:00:00"), "D"), | |
Row(6L, "Smith", "Manager I", "2020-01-04", Timestamp.valueOf("2020-01-04 00:00:00"), "I"), | |
Row(4L, "Will", "Manager II", "2020-08-01", Timestamp.valueOf("2020-08-01 00:00:00"), "U") | |
); | |
df = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema)) | |
df.write.format(hudiFormat).options(hudiOptions).mode(SaveMode.Append).save(hudiOutputTable) | |
tripsSnapshotDF = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(hudiOutputTable + "/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, id, __op from hudi_trips_snapshot order by _hoodie_record_key").show(false) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment