Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created February 9, 2021 14:26
Show Gist options
  • Save nsivabalan/1aaf0bfca044b0413f951d941bb8d801 to your computer and use it in GitHub Desktop.
Save nsivabalan/1aaf0bfca044b0413f951d941bb8d801 to your computer and use it in GitHub Desktop.
import java.io.File
import java.nio.file.Paths
import java.sql.Timestamp
import org.apache.commons.io.FileUtils
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy
import org.apache.log4j.Level
import org.apache.spark.sql._
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
val hudiFormat: String = "org.apache.hudi";
val hudiOutputDir = "file:///tmp/hudi_trips_cow"
val tableName = "hudi_trips_cow"
val primaryKey: String = "id";
val precombineKey: String = "__timestamp";
val partitionKey: String = "__process_date";
val operationKey: String = "__op";
val schema: List[StructField] = List(
StructField(primaryKey, DataTypes.LongType, false),
StructField("name", DataTypes.StringType, false),
StructField("desc", DataTypes.StringType, true),
StructField(partitionKey, DataTypes.StringType, true),
StructField(precombineKey, DataTypes.TimestampType, false),
StructField(operationKey, DataTypes.StringType, true)
);
var data: Seq[Row] = Seq(
Row(1L, "Bob", "Manager II", "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), null),
Row(2L, "John", "Engineer I", "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), null),
Row(3L, "Michael", null, "1970-01-01", Timestamp.valueOf("1970-01-01 00:00:00"), null),
Row(3L, "Michael", null, "2020-01-04", Timestamp.valueOf("2020-01-04 00:00:00"), "D"),
Row(4L, "William", "Manager I", "1998-04-13", Timestamp.valueOf("1998-04-13 00:00:00"), "I"),
Row(5L, "Fred", "Engineer III", "2020-01-01", Timestamp.valueOf("2020-01-01 00:00:00"), "I")
);
//Test DataFrame
var df: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema));
//Hudi Configs
val hudiOptions = Map[String, String] (
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> classOf[OverwriteWithLatestAvroPayload].getName,
DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName,
HoodieCompactionConfig.PAYLOAD_CLASS_PROP -> classOf[OverwriteWithLatestAvroPayload].getName,
HoodieIndexConfig.INDEX_TYPE_PROP -> HoodieIndex.IndexType.GLOBAL_BLOOM.toString,
HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH -> String.valueOf(true),
//Table-specific configs
HoodieWriteConfig.TABLE_NAME -> tableName,
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> primaryKey,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> precombineKey,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> partitionKey
)
df.printSchema()
val hudiOutputTable = "file:///tmp/hudi_trips_cow"
df.write.format(hudiFormat).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiOutputTable)
var tripsSnapshotDF = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(hudiOutputTable + "/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, id, __op from hudi_trips_snapshot order by _hoodie_record_key").show(false)
data = Seq(
Row(2L, "John", "Engineer I", "2020-01-04", Timestamp.valueOf("2020-01-04 00:00:00"), "D"),
Row(6L, "Smith", "Manager I", "2020-01-04", Timestamp.valueOf("2020-01-04 00:00:00"), "I"),
Row(4L, "Will", "Manager II", "2020-08-01", Timestamp.valueOf("2020-08-01 00:00:00"), "U")
);
df = spark.createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))
df.write.format(hudiFormat).options(hudiOptions).mode(SaveMode.Append).save(hudiOutputTable)
tripsSnapshotDF = spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(hudiOutputTable + "/*/*")
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, id, __op from hudi_trips_snapshot order by _hoodie_record_key").show(false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment