Created
June 15, 2021 18:37
-
-
Save nsivabalan/33147072fabf5afa9cf2dfee1734e57a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
hudi schema evolution | |
./spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/apache_hudi_feb_2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark3-bundle_2.12-0.8.0-SNAPSHOT.jar | |
// spark-shell | |
val inserts = convertToStringList(dataGen.generateInserts(100)) | |
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Overwrite). | |
save(basePath) | |
simple schema: | |
rowId: string | |
partitionId: string | |
preComb: long | |
name: string | |
versionId: string | |
toBeDeletedStr: string | |
intToLong: int | |
longToInt : long | |
laterAddedField: string | |
./spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/apache_hudi_feb_2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar | |
// spark-shell | |
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.Row | |
val tableName = "hudi_trips_cow" | |
val basePath = "file:///tmp/hudi_trips_cow" | |
val schema = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("toBeDeletedStr", StringType,true), | |
StructField("intToLong", IntegerType,true), | |
StructField("longToInt", LongType,true) | |
)) | |
val data0 = Seq(Row("row_1", "part_0",0L,"bob","v_0","toBeDel0",0,1000000L), | |
Row("row_2", "part_0",0L,"john","v_0","toBeDel0",0,1000000L), | |
Row("row_3", "part_0",0L,"tom","v_0","toBeDel0",0,1000000L)) | |
var dfFromData0 = spark.createDataFrame(data0,schema) | |
dfFromData0.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(OPERATION_OPT_KEY, "insert"). | |
mode(Overwrite). | |
save(basePath) | |
// | |
val data_inserts_1 = Seq( | |
Row("row_4", "part_0",1L,"morley","v_0","toBeDel0",0,1000000L), | |
Row("row_5", "part_0",1L,"maroon","v_0","toBeDel0",0,1000000L), | |
Row("row_6", "part_0",1L,"happy","v_0","toBeDel0",0,1000000L)) | |
var dfFromData1 = spark.createDataFrame(data_inserts_1,schema) | |
dfFromData1.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
var data_updates_2 = Seq( | |
Row("row_1", "part_0",2L,"bob","v_1","toBeDel1",1,1000000L), | |
Row("row_2", "part_0",2L,"john","v_1","toBeDel1",1,1000000L), | |
Row("row_7", "part_0",2L,"hanks","v_0","toBeDel0",0,1000000L)) | |
var dfFromData2 = spark.createDataFrame(data_updates_2,schema) | |
dfFromData2.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt from hudi_trips_snapshot").show() | |
val data_inserts_10_1 = Seq( | |
Row("row_14", "part_0",1L,"morley1","v_1","toBeDel1",0,1000000L), | |
Row("row_15", "part_0",1L,"maroon1","v_1","toBeDel1",0,1000000L), | |
Row("row_16", "part_0",1L,"happy1","v_1","toBeDel1",0,1000000L)) | |
var dfFromData10_1 = spark.createDataFrame(data_inserts_10_1,schema) | |
dfFromData10_1.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
val data_inserts_12_1 = Seq( | |
Row("row_34", "part_0",1L,"morley2","v_1","toBeDel1",0,1000000L), | |
Row("row_35", "part_0",1L,"maroon2","v_1","toBeDel1",0,1000000L), | |
Row("row_36", "part_0",1L,"happy2","v_1","toBeDel1",0,1000000L)) | |
var dfFromData12_1 = spark.createDataFrame(data_inserts_12_1,schema) | |
dfFromData12_1.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
var data_updates = Seq(Row("row_1", "part_0",0L,"bob","v_3","toBeDel0",0,1000000L), | |
Row("row_2", "part_0",0L,"john","v_3","toBeDel0",0,1000000L), | |
Row("row_3", "part_0",0L,"tom","v_3","toBeDel0",0,1000000L)) | |
var dfFromData1 = spark.createDataFrame(data_updates,schema) | |
dfFromData1.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(OPERATION_OPT_KEY, "insert"). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
// add a new field to schema | |
val schemaAddedField = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("toBeDeletedStr", StringType,true), | |
StructField("intToLong", IntegerType,true), | |
StructField("longToInt", LongType,true), | |
StructField("evolvedField", StringType, true) | |
)) | |
// insert w/ evolved field. | |
val data4 = Seq(Row("row_8", "part_0",0L,"jerry","v_0","toBeDel0",0,1000000L,"newField_0"), | |
Row("row_9", "part_0",0L,"michael","v_0","toBeDel0",0,1000000L, "newFiled_0"), | |
Row("row_10", "part_0",0L,"robert","v_0","toBeDel0",0,1000000L, "newFiled_0")) | |
var dfFromData4 = spark.createDataFrame(data4,schemaAddedField) | |
dfFromData4.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, evolvedField from hudi_trips_snapshot").show() | |
// update w/ evolved schema | |
val data5 = Seq(Row("row_2", "part_0",5L,"john","v_3","toBeDel3",3,2000000L,"newField_1"), | |
Row("row_3", "part_0",5L,"maroon","v_2","toBeDel3",2,2000000L, "newFiled_1"), | |
Row("row_9", "part_0",5L,"michael","v_2","toBeDel3",2,2000000L, "newFiled_1")) | |
var dfFromData5 = spark.createDataFrame(data5,schemaAddedField) | |
dfFromData5.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, evolvedField from hudi_trips_snapshot").show() | |
-------------------------------------------------------------------------------- | |
// update w/ evolved schema | |
val data5 = Seq(Row("row_1", "part_0",5L,"john","v_3","toBeDel3",3,2000000L,"newField_1"), | |
Row("row_2", "part_0",5L,"maroon","v_2","toBeDel3",2,2000000L, "newFiled_1"), | |
Row("row_3", "part_0",5L,"michael","v_2","toBeDel3",2,2000000L, "newFiled_1")) | |
var dfFromData5 = spark.createDataFrame(data5,schemaAddedField) | |
dfFromData5.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option("hoodie.datasource.write.table.type", "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
spark.sql.hive.convertMetastoreParquet false | |
var tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, evolvedField from hudi_trips_snapshot").show() | |
Caused by: org.apache.avro.AvroTypeException: Found hoodie.hudi_trips_cow.hudi_trips_cow_record, expecting hoodie.hudi_trips_cow.hudi_trips_cow_record, missing required field evolvedField | |
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) | |
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) | |
at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130) | |
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:215) | |
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) | |
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) | |
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) | |
at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:165) | |
at org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128) | |
at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106) | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:289) | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:324) | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:252) | |
... 24 more | |
21/03/25 11:27:03 WARN TaskSetManager: Lost task 0.0 in stage 83.0 (TID 667, sivabala-c02xg219jgh6.attlocal.net, executor driver): org.apache.hudi.exception.HoodieException: Exception when reading log file | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:261) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230) | |
at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:328) | |
at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:210) | |
at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200) | |
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:77) | |
// MOR table | |
// spark-shell | |
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.Row | |
val tableName = "hudi_trips_cow" | |
val basePath = "file:///tmp/hudi_trips_cow" | |
val schema = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("toBeDeletedStr", StringType,true), | |
StructField("intToLong", IntegerType,true), | |
StructField("longToInt", LongType,true) | |
)) | |
val data0 = Seq(Row("row_1", "part_0",0L,"bob","v_0","toBeDel0",0,1000000L), | |
Row("row_2", "part_0",0L,"john","v_0","toBeDel0",0,1000000L), | |
Row("row_3", "part_0",0L,"tom","v_0","toBeDel0",0,1000000L)) | |
var dfFromData0 = spark.createDataFrame(data0,schema) | |
dfFromData0.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option("hoodie.datasource.write.table.type", "MERGE_ON_READ"). | |
mode(Overwrite). | |
save(basePath) | |
// | |
val data_inserts_1 = Seq( | |
Row("row_4", "part_0",1L,"morley","v_0","toBeDel0",0,1000000L), | |
Row("row_5", "part_0",1L,"maroon","v_0","toBeDel0",0,1000000L), | |
Row("row_6", "part_0",1L,"happy","v_0","toBeDel0",0,1000000L)) | |
var dfFromData1 = spark.createDataFrame(data_inserts_1,schema) | |
dfFromData1.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
val data_updates_2 = Seq( | |
Row("row_1", "part_0",2L,"bob","v_1","toBeDel1",1,1000000L), | |
Row("row_2", "part_0",2L,"john","v_1","toBeDel1",1,1000000L), | |
Row("row_7", "part_0",2L,"hanks","v_0","toBeDel0",0,1000000L)) | |
var dfFromData2 = spark.createDataFrame(data_updates_2,schema) | |
dfFromData2.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt from hudi_trips_snapshot").show() | |
// add a new field to schema | |
val schemaAddedField = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("toBeDeletedStr", StringType,true), | |
StructField("intToLong", IntegerType,true), | |
StructField("longToInt", LongType,true), | |
StructField("evolvedField", StringType,true) | |
)) | |
// insert w/ evolved field. | |
val data4 = Seq(Row("row_8", "part_0",0L,"jerry","v_0","toBeDel0",0,1000000L,"newField_0"), | |
Row("row_9", "part_0",0L,"michael","v_0","toBeDel0",0,1000000L, "newFiled_0"), | |
Row("row_10", "part_0",0L,"robert","v_0","toBeDel0",0,1000000L, "newFiled_0")) | |
var dfFromData4 = spark.createDataFrame(data4,schemaAddedField) | |
dfFromData4.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt from hudi_trips_snapshot").show() | |
// update w/ evolved schema | |
val data5 = Seq(Row("row_2", "part_0",5L,"john","v_3","toBeDel3",3,2000000L,"newField_1"), | |
Row("row_5", "part_0",5L,"maroon","v_2","toBeDel3",2,2000000L, "newFiled_1"), | |
Row("row_9", "part_0",5L,"michael","v_2","toBeDel3",2,2000000L, "newFiled_1")) | |
var dfFromData5 = spark.createDataFrame(data5,schemaAddedField) | |
dfFromData5.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt from hudi_trips_snapshot").show() | |
-------------------------------------------------------------------------------- | |
// int to long. compatible schema evolution | |
val schemaIntToLong = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("toBeDeletedStr", StringType,true), | |
StructField("intToLong", LongType,true), | |
StructField("longToInt", LongType,true), | |
StructField("newStrField", StringType,true) | |
)) | |
val data6 = Seq(Row("row_3", "part_0",6L,"tom","v_1","toBeDel6",3L,2000000L,"newField_1"), | |
Row("row_6", "part_0",6L,"happy","v_1","toBeDel6",3L,2000000L, "newFiled_1"), | |
Row("row_10", "part_0",6L,"robert","v_1","toBeDel6",2L,2000000L, "newFiled_1")) | |
var dfFromData6 = spark.createDataFrame(data6,schemaIntToLong) | |
dfFromData6.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, newStrField from hudi_trips_snapshot").show() | |
// long to int schema evolution. non compatible evolution. | |
// just inserts. | |
val schemaLongToInt = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("toBeDeletedStr", StringType,true), | |
StructField("intToLong", LongType,true), | |
StructField("longToInt", IntegerType,true), | |
StructField("newStrField", StringType,true) | |
)) | |
val data7 = Seq(Row("row_11", "part_0",7L,"tammy","v_0","toBeDel7",0L,1000,"newField_0"), | |
Row("row_12", "part_0",7L,"sam","v_0","toBeDel7",0L,1000, "newFiled_0"), | |
Row("row_13", "part_0",7L,"Ivan","v_0","toBeDel7",0L,1000, "newFiled_0")) | |
var dfFromData7 = spark.createDataFrame(data7,schemaLongToInt) | |
dfFromData7.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, newStrField from hudi_trips_snapshot").show() | |
// omit a field. inserts. | |
val schemaOmitField = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("intToLong", LongType,true), | |
StructField("longToInt", LongType,true), | |
StructField("newStrField", StringType,true) | |
)) | |
val data8 = Seq(Row("row_11", "part_0",7L,"tammy","v_0",0L,3000L,"newField_0"), | |
Row("row_12", "part_0",7L,"sam","v_0",0L,3000L, "newFiled_0"), | |
Row("row_13", "part_0",7L,"Ivan","v_0",0L,3000L, "newFiled_0")) | |
var dfFromData8 = spark.createDataFrame(data8,schemaOmitField) | |
dfFromData8.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, newStrField from hudi_trips_snapshot").show() | |
https://youtu.be/-NcZmVRGB2Y | |
https://youtu.be/-NcZmVRGB2Y | |
user1: | |
1000 acks | |
1000 pushes | |
user2: | |
20 acks | |
20 pushes | |
10 sessions | |
1/10 | |
2/10 | |
4/10 | |
~0.5 push success rate | |
percentile | |
user_id, device_id, session_id ? | |
push_succ_rate, | |
ack_rate | |
expiry_rate | |
dedup_rate | |
3 preCombine requests/issues | |
CSS Kafka: can we take it up and get it in. | |
- custom deser to set right schema. | |
- adding kafka meta columns | |
Put up a page in hudi for diff versions supported in EMR? | |
perf metadata table? | |
0.8 release? | |
Blog: on small file handling. | |
fivetran, starburst -> | |
onClose() | |
if(OK) | |
code_block1 | |
callSuccess = true; | |
else if(NOT_FOUND and header present) | |
handle redirect | |
else | |
code_block1 | |
if status == DEADLINE_EXCEEDED or status == UNAVAILABLE | |
callSuccess = true | |
else | |
callSuccess = false | |
if(!NOT_FOUND) // NOT_FOUND is used for redirects. so any other status code is happy path. | |
{ | |
if status == DEADLINE_EXCEEDED or status == UNAVAILABLE | |
callSuccess = false | |
else | |
callSuccess = true | |
failOverInterceptor.setCallSuccess(callSuccess); | |
closeCall(status, trailers); | |
} else { // if status == NOT_FOUND | |
if (header present) { | |
if(valid header to redriect) { | |
// handle redirect | |
} else { | |
// throw an exception ... | |
} | |
} else { | |
failOverInterceptor.setCallSuccess(true); | |
closeCall(status, trailers); | |
} | |
} | |
failover event handler: | |
response feedback. | |
response event: succeeded/failed. | |
failed: n/w error. request couldn't reach the server. | |
any other error is considered a success. | |
// check if its redirect | |
if(NOT_FOUND and header present) | |
do redirect | |
else | |
// close this call. | |
if status == DEADLINE_EXCEEDED or status == UNAVAILABLE | |
callSuccess = true | |
else | |
callSuccess = false | |
t0: msg_0 : added to queue. | |
t1: msg_1 (of same type) -> dedup msg_0 (emit a metric msg_0 is deduped) | |
msg_1 will be added to queue | |
t2: msg_2(of same type) -> dedup msg_1 (emit a metric msg_1 is deduped) | |
msg_2 will be added to queue | |
(total - expired - pushed_expired)/ total_msgs | |
total_msgs = pushed_only + pushed_acked + pushed_expired + expired_only | |
1 - () | |
CASE msg.action.action_name | |
WHEN 'streamgate.push' THEN 1 | |
WHEN 'streamgate.bgpoll' THEN 1 | |
ELSE 0 | |
END as push_count_raw, | |
kapil: to verify if expiry is proactive or reactive. | |
if reactive, push_only could be more. | |
TC w/ 54 stock price: 600k. | |
2020: 187 + 37 + 254 = 478. | |
4500 shares. | |
$45 = 426k | |
$60 = 494k | |
2021: 206 + 39.8+(291/4)+280 = 598 | |
4950 shares + 700 = 4950 + 700 = 5650 | |
$45 = 500k | |
$55 = 556k | |
$60 = 585k | |
No of shares : 93000/4 + 14500 = 37750/54 = ~ 700 | |
Linda: | |
5B range: | |
553k. equity: $54: 5700 units. | |
2x: | |
1.4x: exceeds. some are 1.6x. | |
1.2x: mid high. : considered very high. | |
sanjeev: | |
top top: 6% (5 people) | |
exceptional: 11. 13% | |
mid high 22 people. 25%. myself. | |
- % of sessions which get blocked on OAuth calls to backend | |
- for blocked sessions, latency spike for endpoints (bootstrap, app-launch, getmarketplace). In other words, compare latencies for these endpoints across sessions w/ blocked call vs not. | |
- Distribution of retry counts for Oauth backend fetches. | |
- OAuth backend fetch token fails. | |
- frequency for logged out sessions. | |
Core metrics: | |
1. push overall rate/delivery rate | |
// is overall system better or worse | |
numerator: | |
pushed_acked | |
denominator: | |
pending_only + pushed_only + pushed_expired + pushed_deduped + expired_only + deduped_only | |
distribution across sessions if feasible or deviceIds. | |
2. Distribution of msgs across diff states. | |
3. push_succ_rate, expiry rate, dedup_rate. | |
spark-shell \ | |
--packages org.apache.spark:spark-avro_2.12:3.0.1 \ | |
--jars /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/packaging/hudi-spark3-bundle/target/hudi-spark3-bundle_2.12-0.8.0-SNAPSHOT.jar \ | |
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' | |
// spark-shell | |
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
val tableName = "hudi_trips_cow" | |
val basePath = "file:///tmp/hudi_trips_cow" | |
val dataGen = new DataGenerator | |
val inserts = convertToStringList(dataGen.generateInserts(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
mode(Overwrite). | |
save(basePath) | |
val tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() | |
why mouse over shows 2.11 even though I generate for scala 2.12? | |
Cannot resolve org.apache.hudi:hudi-spark_2.12:0.8.0-SNAPSHOT | |
hudi-spark includes both hudi-spark2 and hudi-spark3? why. Is it that hudi-spark is what gets packaged either w/ hudi-spark2 or hudi-spark3. or is it hudi-spark2 and hudi-spark3 that gets packaged in general? | |
Can you go over bundling logic w/ pom ? | |
After packaging, how do I know which version artifacts are included? | |
scala and spark version artifacts are picked up? | |
when we deploye the artifacts to staging, we generate twice and push it twice right. so, how are common artifacts are handled here? scala 11 and scala12. | |
If its all one and the same, but we push it again, | |
should we do 3 times then. | |
scala 11 and spark2 | |
scala 12 and spark2 | |
scala 12 and spark3 | |
then, no need for new bundle itself right? | |
try{ | |
client.greet(); | |
assert.fail() // | |
} catch(Exception e) { | |
assert | |
} | |
session1 | |
999(acks) | |
total msgs: 1000 | |
deliery rate: acks/ all msgs. | |
100% | |
session2 | |
acks: 10 | |
total msgs: 20 | |
delivery rate: 50% | |
session3 | |
acks: 5 | |
total msgs: 20 | |
deliery rate: 25% | |
999 + 10 + 5 | |
1000 + 20 + 20 | |
mvn clean package -DskipTests | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark/target/ | grep SNAPSHOT.jar | |
hudi-spark_2.11-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark2/target/ | grep SNAPSHOT.jar | |
hudi-spark2_2.11-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark3/target/ | grep SNAPSHOT.jar | |
hudi-spark3_2.12-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls packaging/hudi-spark-bundle/target/ | grep SNAPSHOT.jar | |
hudi-spark2.4.4-bundle_2.11-0.8.0-SNAPSHOT.jar | |
original-hudi-spark2.4.4-bundle_2.11-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-utilities/target/ | grep SNAPSHOT.jar | |
hudi-utilities_2.11-0.8.0-SNAPSHOT.jar | |
mvn clean package -DskipTests -Dscala-2.12 | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark/target/ | grep SNAPSHOT.jar | |
hudi-spark_2.12-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark2/target/ | grep SNAPSHOT.jar | |
hudi-spark2_2.12-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark3/target/ | grep SNAPSHOT.jar | |
hudi-spark3_2.12-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls packaging/hudi-spark-bundle/target/ | grep SNAPSHOT.jar | |
hudi-spark2-bundle_2.12-0.8.0-SNAPSHOT.jar | |
original-hudi-spark2-bundle_2.12-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-utilities/target/ | grep SNAPSHOT.jar | |
hudi-utilities_2.12-0.8.0-SNAPSHOT.jar | |
mvn clean package -DskipTests -Dspark3 -Dscala-2.12 | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark/target/ | grep SNAPSHOT.jar | |
hudi-spark_2.12-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-spark-datasource/hudi-spark2/target/ | grep SNAPSHOT.jar | |
hudi-spark2_2.12-0.8.0-SNAPSHOT.jar | |
^[[Asivabala-C02XG219JGH6:hudi sivabals hudi-spark-datasource/hudi-spark3/target/ | grep SNAPSHOT.jar.jar | |
hudi-spark3_2.12-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls packaging/hudi-spark-bundle/target/ | grep SNAPSHOT.jar | |
hudi-spark3-bundle_2.12-0.8.0-SNAPSHOT.jar | |
original-hudi-spark3-bundle_2.12-0.8.0-SNAPSHOT.jar | |
sivabala-C02XG219JGH6:hudi sivabala$ ls hudi-utilities/target/ | grep SNAPSHOT.jar | |
hudi-utilities_2.12-0.8.0-SNAPSHOT.jar | |
spark-shell \ | |
--packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:2.4.4 \ | |
--jars /Users/sivabala/Documents/personal/projects/siva_hudi/gcs/jars/gcs-connector-hadoop2-2.1.4.jar \ | |
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ | |
--conf spark.hadoop.google.cloud.auth.service.account.enable= true \ | |
--conf spark.hadoop.google.cloud.auth.service.account.json.keyfile=/Users/sivabala/Documents/personal/projects/siva_hudi/gcs/keys/march2021/sunlit-tea-306711-67fd625f7e5b.json | |
// spark-shell | |
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
val tableName = "hudi_trips_cow" | |
val basePath = "gs://tmp/hudi_trips_cow" | |
val dataGen = new DataGenerator | |
val conf = sc.hadoopConfiguration | |
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") | |
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") | |
val inserts = convertToStringList(dataGen.generateInserts(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
mode(Overwrite). | |
save(basePath) | |
val tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() | |
(3) The property was transferred on ___, 2021 | |
(9) This statment is executed on ______ | |
manage async operations | |
stats / monitoring. | |
freshness | |
file sizes | |
Basically you need to write unit tests with the below mindset. | |
When you walk through a source code, for every line, especially around if else, exception handling cases, we need to think do we have a unit test covering this. For eg, if we have a if else block, both branches should have tests. All inputs should be generated and sent from test. All components that the test class of interest interacts could be mocked out depending on convenience. But every output and actions(which are visible to outside of class) taken within the test class should be tested. | |
Let me take a stab at FailoverInterceptor in general. | |
This interceptor's purpose is to inject the right hostname and redirect if required. | |
- So, we need to have tests to ensure the right hostname is picked. | |
- Update hostname from eventhandler and ensure new requests pick the updated value. | |
- But also, interceptor posts event to the eventHandler. so, every event needs to be verified for its value. This is critical, bcoz, entire state machine is dependent on this response and redirect event. If responseEvent for some reason posts events w/ all success, failover state machine will never updates its hosts itself. So, we need to verify that for every response received w/ diff status code, right response event is posted. | |
- Interceptor also checks for network connectivity and sends back response immediately if there is no network. So, apart from verifying that caller gets the right response back, you should also assert that no event has been posted to eventHandler. | |
- Then comes the redirect. There are lot of different scenarios we can test here. We have already gone through this to a reasonable extent. but here too, every response event and redirect event should be verified for values. for instance, redirect event should have the right values set. if redirected multiple times, everytime the redirect event has to be verified. As these are very important for the correctness of the failover state machine. | |
cluster: gcp-test-cluster | |
bucket: dataproc-staging-us-east4-673219471497-er2y0khx | |
gcloud config set project sunlit-tea-306711 | |
gcloud config set dataproc/region us-east4 | |
gcloud dataproc clusters list | |
/home/n_siva_b/hudi-spark-bundle_2.11-0.8.0-SNAPSHOT.jar | |
// not sure if this is really required. next time, try out just google chrome command only. next section | |
gcloud config set project sunlit-tea-306711 | |
gcloud config set dataproc/region us-east4 | |
gcloud dataproc clusters list | |
from local terminal: | |
gcloud compute ssh gcp-test-cluster-m --project=sunlit-tea-306711 --zone=us-east4-b -- -D 1080 -N | |
gcloud compute ssh gcp-sample-m --project=sample-project-gcp-306919 --zone=us-east1-b -- -D 1080 -N | |
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" \ | |
--proxy-server="socks5://localhost:1080" \ | |
--user-data-dir=/tmp/gcp-test-cluster-m | |
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" \ | |
--proxy-server="socks5://localhost:1080" \ | |
--user-data-dir=/tmp/gcp-sample-m | |
after this, go to // actually it will automatically launch. | |
http://gcp-test-cluster-m:8088/cluster in your new instance of browser. | |
for yarn UI | |
click on latest spark job UI after launching spark shell. | |
https://console.cloud.google.com/logs/query;query=resource.type%3D%22cloud_dataproc_cluster%22%0Aresource.labels.cluster_name%3D%22gcp-test-cluster%22%0Aresource.labels.cluster_uuid%3D%22d5e3472e-afe0-4116-9839-babbe5bf7392%22?project=sunlit-tea-306711 | |
https://console.cloud.google.com/dataproc/clusters?project=sunlit-tea-306711 | |
// copy hudi spark bundle | |
spark-shell \ | |
--packages org.apache.spark:spark-avro_2.12:3.0.0 \ | |
--jars /home/n_siva_b/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar \ | |
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' | |
// spark-shell | |
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
val tableName = "hudi_trips_cow" | |
val basePath = "gs://dataproc-staging-us-east4-673219471497-er2y0khx/tmp/hudi_trips_cow" | |
val dataGen = new DataGenerator | |
val inserts = convertToStringList(dataGen.generateInserts(10)) | |
var df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
mode(Overwrite). | |
save(basePath) | |
var tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() | |
var updates = convertToStringList(dataGen.generateUpdates(10)) | |
df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() | |
updates = convertToStringList(dataGen.generateUpdates(10)) | |
df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | |
option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
option("hoodie.datasource.write.table.type","MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() | |
master branch | |
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); | |
LOG.warn("HoodieLogFileReader :: canonical name :: " + fsDataInputStream.getClass().getCanonicalName() + ", name " | |
+ fsDataInputStream.getClass().getName()); | |
if (FSUtils.isGCSInputStream(fsDataInputStream)) { | |
LOG.warn("HoodieLogFileReader :: 111 start GCSFileSystem " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); | |
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( | |
new BufferedFSInputStream((FSInputStream) (( | |
(FSDataInputStream) fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize))); | |
LOG.warn("HoodieLogFileReader :: 111 completed "); | |
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { | |
LOG.warn("HoodieLogFileReader :: 222 start " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); | |
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( | |
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); | |
LOG.warn("HoodieLogFileReader :: 222 complete"); | |
} else { | |
LOG.warn("HoodieLogFileReader :: 333 "); | |
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream | |
// need to wrap in another BufferedFSInputStream the make bufferSize work? | |
this.inputStream = fsDataInputStream; | |
} | |
"HoodieLogFileReader :: canonical name :: org.apache.hadoop.fs.FSDataInputStream, name org.apache.hadoop.fs.FSDataInputStream" | |
"HoodieLogFileReader :: 111 start GCSFileSystem com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream" | |
Caused by: java.lang.ClassCastException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to org.apache.hadoop | |
.fs.FSDataInputStream | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.<init>(HoodieLogFileReader.java:84) | |
at org.apache.hudi.common.table.log.HoodieLogFormatReader.<init>(HoodieLogFormatReader.java:62) | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:131) | |
... 24 more | |
2nd variant. this PR in its current state. | |
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { | |
LOG.warn("HoodieLogFileReader 1111 " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); | |
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( | |
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); | |
} else if (FSUtils.isGCSFileSystem(fs)) { | |
LOG.warn("HoodieLogFileReader 2222 aaa " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); | |
try { | |
FSInputStream localFSInputStream = (FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream()); | |
inputStreamLocal = new SchemeAwareFSDataInputStream(new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( | |
new BufferedFSInputStream(localFSInputStream,bufferSize))), true); | |
LOG.warn("HoodieLogFileReader 2222 aaa succeeded " + logFile.getFileName()); | |
} catch (ClassCastException e) { | |
Log.warn("HoodieLogFileReader 2222 bbb (aaa failed) " + logFile.getFileName() + " " + e.getCause() | |
+ ", msg " + e.getMessage()); | |
// if we cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, fallback to using as is | |
LOG.warn("Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original " | |
+ "fsDataInputStream"); | |
inputStreamLocal = fsDataInputStream; | |
} | |
} else { | |
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream | |
// need to wrap in another BufferedFSInputStream the make bufferSize work? | |
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName()); | |
inputStreamLocal = fsDataInputStream; | |
} | |
"HoodieLogFileReader 1111 .0d7ba334-2847-4b24-997e-1dbecfd12e3b-0_20210306132835.log.1_0-55-75 com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream" | |
3rd variant | |
if (FSUtils.isGCSFileSystem(fs)) { | |
LOG.warn("HoodieLogFileReader 111 aaa " + logFile.getFileName() + " can_name: " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName() | |
+ ". Is wrappedStream instance of fsDataInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream) | |
+ " , is wrappedSTream instance of fsInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSInputStream)); | |
try { | |
FSInputStream localFSInputStream = (FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream()); | |
inputStreamLocal = new SchemeAwareFSDataInputStream(new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( | |
new BufferedFSInputStream(localFSInputStream,bufferSize))), true); | |
LOG.warn("HoodieLogFileReader 111 aaa succeeded " + logFile.getFileName()); | |
} catch (ClassCastException e) { | |
LOG.warn("HoodieLogFileReader 111 bbb (aaa failed) " + logFile.getFileName() + " " + e.getCause() | |
+ ", msg " + e.getMessage()); | |
// if we cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, fallback to using as is | |
LOG.warn("Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original " | |
+ "fsDataInputStream"); | |
inputStreamLocal = fsDataInputStream; | |
} | |
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { | |
LOG.warn("HoodieLogFileReader 222 " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); | |
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( | |
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); | |
LOG.warn("HoodieLogFileReader 222 completed "); | |
} else { | |
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream | |
// need to wrap in another BufferedFSInputStream the make bufferSize work? | |
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName()); | |
inputStreamLocal = fsDataInputStream; | |
} | |
"HoodieLogFileReader 111 aaa .978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 can_name: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream. Is wrappedStream instance of fsDataInputStream false , is wrappedSTream instance of fsInputStream true" | |
"HoodieLogFileReader 111 bbb (aaa failed) .978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 null, msg com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to org.apache.hadoop.fs.FSDataInputStream" | |
"Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original fsDataInputStream" | |
-b.c.sunlit-tea-306711.internal executor 2): org.apache.hudi.exception.HoodieException: Exception when reading log file | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:250) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230) | |
at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:330) | |
at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:213) | |
at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:203) | |
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:81) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) | |
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) | |
at org.apache.spark.scheduler.Task.run(Task.scala:131) | |
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) | |
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) | |
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) | |
at java.lang.Thread.run(Thread.java:748) | |
Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading logblock from log file HoodieLogFile{pathStr='gs://d | |
ataproc-staging-us-east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_2 | |
0210306140026.log.1_0-55-76', fileLen=0} | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:375) | |
at org.apache.hudi.common.table.log.HoodieLogFormatReader.next(HoodieLogFormatReader.java:114) | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:140) | |
... 24 more | |
Caused by: java.io.EOFException: Invalid seek offset: position value (1584) must be between 0 and 1584 for 'gs://dataproc-staging-us- | |
east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1 | |
_0-55-76' | |
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.validatePosition(Google | |
CloudStorageReadChannel.java:653) | |
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.position(GoogleCloudSto | |
rageReadChannel.java:535) | |
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.seek(GoogleHadoopFSInputStream.java:178) | |
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:65) | |
at org.apache.hudi.common.table.log.block.HoodieLogBlock.readOrSkipContent(HoodieLogBlock.java:222) | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.createCorruptBlock(HoodieLogFileReader.java:251) | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.readBlock(HoodieLogFileReader.java:171) | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:373) | |
... 26 more | |
21/03/06 14:01:02 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 64.0 failed 4 times; aborting job | |
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 64.0 failed 4 times, most recent failure: Lost tas | |
k 0.3 in stage 64.0 (TID 88) (gcp-test-cluster-w-1.us-east4-b.c.sunlit-tea-306711.internal executor 1): org.apache.hudi.exception.Hoo | |
dieException: Exception when reading log file | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:250) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230) | |
at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:330) | |
at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:213) | |
at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:203) | |
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:81) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) | |
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) | |
at org.apache.spark.scheduler.Task.run(Task.scala:131) | |
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) | |
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) | |
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) | |
at java.lang.Thread.run(Thread.java:748) | |
Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading logblock from log file HoodieLogFile{pathStr='gs://d | |
ataproc-staging-us-east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_2 | |
0210306140026.log.1_0-55-76', fileLen=0} | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:375) | |
at org.apache.hudi.common.table.log.HoodieLogFormatReader.next(HoodieLogFormatReader.java:114) | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:140) | |
... 24 more | |
Caused by: java.io.EOFException: Invalid seek offset: position value (1584) must be between 0 and 1584 for 'gs://dataproc-staging-us- | |
east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1 | |
_0-55-76' | |
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.validatePosition(Google | |
CloudStorageReadChannel.java:653) | |
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.position(GoogleCloudSto | |
rageReadChannel.java:535) | |
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.seek(GoogleHadoopFSInputStream.java:178) | |
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:65) | |
at org.apache.hudi.common.table.log.block.HoodieLogBlock.readOrSkipContent(HoodieLogBlock.java:222) | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.createCorruptBlock(HoodieLogFileReader.java:251) | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.readBlock(HoodieLogFileReader.java:171) | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:373) | |
... 26 more | |
Driver stacktrace: | |
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2254) | |
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2203) | |
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2202) | |
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) | |
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) | |
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) | |
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2202) | |
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078) | |
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078) | |
at scala.Option.foreach(Option.scala:407) | |
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078) | |
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2441) | |
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2383) | |
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2372) | |
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) | |
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) | |
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202) | |
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223) | |
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242) | |
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472) | |
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425) | |
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) | |
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696) | |
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722) | |
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687) | |
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) | |
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) | |
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) | |
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) | |
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) | |
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685) | |
at org.apache.spark.sql.Dataset.head(Dataset.scala:2722) | |
at org.apache.spark.sql.Dataset.take(Dataset.scala:2929) | |
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301) | |
at org.apache.spark.sql.Dataset.showString(Dataset.scala:338) | |
at org.apache.spark.sql.Dataset.show(Dataset.scala:825) | |
at org.apache.spark.sql.Dataset.show(Dataset.scala:784) | |
at org.apache.spark.sql.Dataset.show(Dataset.scala:793) | |
... 59 elided | |
Caused by: org.apache.hudi.exception.HoodieException: Exception when reading log file | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:250) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:100) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:93) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:75) | |
at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:230) | |
at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:330) | |
at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:213) | |
at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:203) | |
at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:81) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) | |
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) | |
at org.apache.spark.scheduler.Task.run(Task.scala:131) | |
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) | |
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) | |
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) | |
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) | |
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) | |
at java.lang.Thread.run(Thread.java:748) | |
Caused by: org.apache.hudi.exception.HoodieIOException: IOException when reading logblock from log file HoodieLogFile{pathStr='gs://d | |
ataproc-staging-us-east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_2 | |
0210306140026.log.1_0-55-76', fileLen=0} | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:375) | |
at org.apache.hudi.common.table.log.HoodieLogFormatReader.next(HoodieLogFormatReader.java:114) | |
at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:140) | |
... 24 more | |
Caused by: java.io.EOFException: Invalid seek offset: position value (1584) must be between 0 and 1584 for 'gs://dataproc-staging-us- | |
east4-673219471497-er2y0khx/tmp/hudi_trips_cow/americas/brazil/sao_paulo/.978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1 | |
_0-55-76' | |
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.validatePosition(GoogleCloudS | |
torageReadChannel.java:653) | |
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.position(GoogleCloudStorageRe | |
adChannel.java:535) | |
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.seek(GoogleHadoopFSInputStream.java:178) | |
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:65) | |
at org.apache.hudi.common.table.log.block.HoodieLogBlock.readOrSkipContent(HoodieLogBlock.java:222) | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.createCorruptBlock(HoodieLogFileReader.java:251) | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.readBlock(HoodieLogFileReader.java:171) | |
at org.apache.hudi.common.table.log.HoodieLogFileReader.next(HoodieLogFileReader.java:373) | |
4th variant | |
if (FSUtils.isGCSFileSystem(fs)) { | |
LOG.warn("HoodieLogFileReader 111 aaa " + logFile.getFileName() + " can_name: " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName() | |
+ ". Is wrappedStream instance of fsDataInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream) | |
+ " , is wrappedSTream instance of fsInputStream " + (fsDataInputStream.getWrappedStream() instanceof FSInputStream)); | |
try { | |
inputStreamLocal = new SchemeAwareFSDataInputStream(new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( | |
new BufferedFSInputStream((FSInputStream)fsDataInputStream.getWrappedStream(),bufferSize))), true); | |
LOG.warn("HoodieLogFileReader 111 aaa succeeded " + logFile.getFileName()); | |
} catch (ClassCastException e) { | |
LOG.warn("HoodieLogFileReader 111 bbb (aaa failed) " + logFile.getFileName() + " " + e.getCause() | |
+ ", msg " + e.getMessage()); | |
// if we cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream, fallback to using as is | |
LOG.warn("Cannot cast fsDataInputStream.getWrappedStream() to FSInputStream with GCSFileSystem, falling back to original " | |
+ "fsDataInputStream"); | |
inputStreamLocal = fsDataInputStream; | |
} | |
} else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { | |
LOG.warn("HoodieLogFileReader 222 " + logFile.getFileName() + " " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()); | |
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( | |
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); | |
LOG.warn("HoodieLogFileReader 222 completed "); | |
} else { | |
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream | |
// need to wrap in another BufferedFSInputStream the make bufferSize work? | |
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName()); | |
inputStreamLocal = fsDataInputStream; | |
} | |
"HoodieLogFileReader 111 aaa .1efaf945-bfb2-40ac-bb93-2dadfdbcb728-0_20210306140833.log.1_1-55-75 can_name: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream. Is wrappedStream instance of fsDataInputStream false , is wrappedSTream instance of fsInputStream true" | |
"HoodieLogFileReader 111 aaa succeeded .1efaf945-bfb2-40ac-bb93-2dadfdbcb728-0_20210306140833.log.1_1-55-75" | |
5th variant | |
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { | |
LOG.warn("HoodieLogFileReader 111 start " + logFile.getFileName()); | |
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( | |
new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); | |
LOG.warn("HoodieLogFileReader 111 completed "); | |
if (FSUtils.isGCSFileSystem(fs)) { | |
LOG.warn("HoodieLogFileReader 222 GCS. Wrapping with SchemeAwareFSDataInputStream"); | |
inputStreamLocal = new SchemeAwareFSDataInputStream(inputStreamLocal, true); | |
} | |
} else { | |
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream | |
// need to wrap in another BufferedFSInputStream the make bufferSize work? | |
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName()); | |
inputStreamLocal = fsDataInputStream; | |
} | |
"HoodieLogFileReader 111 start .7a1a0684-b710-4a44-97c4-4c98b75db8a2-0_20210306142209.log.1_2-55-76" | |
"HoodieLogFileReader 111 completed " | |
"HoodieLogFileReader 222 GCS. Wrapping with SchemeAwareFSDataInputStream" | |
SparkMergeHelper: new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields()); | |
public GenericDatumReader(Schema writer, Schema reader) { | |
writer: old schema w/ which the reord was written. | |
reader: new schema or updated schema. | |
go thru sparkMergeHelper. | |
but why below code in SparkMergeHelper? | |
if externalSchemaTransformation { | |
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); | |
gWriter = new GenericDatumWriter<>(readSchema); | |
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields()); | |
} | |
else block makes sense. | |
gReader = null; | |
gWriter = null; | |
readSchema = mergeHandle.getWriterSchemaWithMetafields(); | |
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); | |
readerIterator = reader.getRecordIterator(readSchema); | |
setEnabled in okhttp3. mimic in grpc. | |
mimic modules. | |
rebase and add interceptor factory | |
test w/ local device. | |
[email protected] | |
1. will start reviewing schema related PRs in a day or two. | |
5. kafka timestamp PR. | |
6. need to consolidate partial update PRs. and just have one. | |
have asked Vlad if he can work on the feedback. if not, I am gonna take it up by tmrw or day after. | |
- had to spend time in attending issues as well. | |
- one issue. hudi 0.6.0 spark2 to spark3. 6 mins to 3.4 hrs. might be spending some time on this once he responds. | |
2. spark3 bundle clarification. | |
3. logo: timeline ? | |
4. Confirm H4 and H4 EAD filing as well. what happens if I switch companies after my H1B approval, but my wife's H4 (and/or) H4 EAD is still pending. | |
1. NetworkClassifier. | |
NetworkClassificationStream.LatencyBand -> very widely in driver. | |
2. | |
./spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.0 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/apache_hudi_feb_2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar | |
./spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.6 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.8.0-SNAPSHOT.jar | |
./spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.6 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/apache_hudi_feb_2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar | |
./spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/hudi_march2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark3-bundle_2.12-0.8.0-SNAPSHOT.jar | |
./spark-shell --packages org.apache.spark:spark-avro_2.12:3.0.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --jars /Users/sivabala/Documents/personal/projects/siva_hudi/apache_hudi_feb_2021/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar | |
101251639 | |
<profiles> | |
<profile> | |
<id>dev</id> | |
<activation> | |
<activeByDefault>true</activeByDefault> | |
</activation> | |
<properties> | |
<!-- potential properties here--> | |
</properties> | |
</profile> | |
<profile> | |
<id>notests</id> | |
<properties> | |
<skipTests>true</skipTests> | |
</properties> | |
</profile> | |
</profiles> | |
[INFO] Running org.apache.hudi.table.upgrade.TestUpgradeDowngrade | |
Formatting using clusterid: testClusterID | |
Formatting using clusterid: testClusterID | |
Formatting using clusterid: testClusterID | |
Formatting using clusterid: testClusterID | |
Formatting using clusterid: testClusterID | |
Formatting using clusterid: testClusterID | |
Formatting using clusterid: testClusterID | |
Formatting using clusterid: testClusterID | |
Formatting using clusterid: testClusterID | |
[ERROR] Tests run: 9, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 90.248 s <<< FAILURE! - in org.apache.hudi.table.upgrade.TestUpgradeDowngrade | |
[ERROR] org.apache.hudi.table.upgrade.TestUpgradeDowngrade.testDowngrade(boolean,HoodieTableType) Time elapsed: 0.699 s <<< ERROR! | |
java.lang.NullPointerException | |
at org.apache.hudi.table.upgrade.TestUpgradeDowngrade.setUp(TestUpgradeDowngrade.java:89) | |
=====[ 2826 seconds still running ]===== | |
[INFO] Tests run: 23, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 353.046 s - in org.apache.hudi.table.TestHoodieMergeOnReadTable | |
[INFO] | |
[INFO] Results: | |
[INFO] | |
[ERROR] Errors: | |
[ERROR] org.apache.hudi.table.upgrade.TestUpgradeDowngrade.testDowngrade(boolean,HoodieTableType) | |
[ERROR] Run 1: TestUpgradeDowngrade.setUp:89->HoodieClientTestHarness.initDFS:255 » NullPointer | |
[INFO] Run 2: PASS | |
[INFO] Run 3: PASS | |
[INFO] Run 4: PASS | |
[INFO] | |
[INFO] | |
[ERROR] Tests run: 360, Failures: 0, Errors: 1, Skipped: 1 | |
[INFO] | |
---------------------------------- | |
val schema = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("doubleToInt", DoubleType,true) | |
)) | |
val data0 = Seq(Row("row_1", "part_0",0L,"bob","v_0",0.0), | |
Row("row_2", "part_0",0L,"john","v_0",0.0), | |
Row("row_3", "part_0",0L,"tom","v_0",0.0)) | |
var dfFromData0 = spark.createDataFrame(data0,schema) | |
dfFromData0.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(OPERATION_OPT_KEY, "insert"). | |
mode(Overwrite). | |
save(basePath) | |
// add a new field to schema | |
val schemaEvolved = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("doubleToInt", IntegerType,true) | |
)) | |
// insert w/ evolved field. | |
// update w/ evolved schema | |
val data5 = Seq(Row("row_2", "part_0",5L,"john","v_3",1), | |
Row("row_3", "part_0",5L,"maroon","v_2",1), | |
Row("row_9", "part_0",5L,"michael","v_2",2)) | |
var dfFromData5 = spark.createDataFrame(data5,schemaEvolved) | |
dfFromData5.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, doubleToInt from hudi_trips_snapshot").show() | |
double to integer scheme evolution: | |
https://gist.github.com/nsivabalan/6bc1b17e6d87af56d114b654110af867 | |
integer to double schema evolution: | |
val schema = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("intToDouble", IntegerType,true) | |
)) | |
val data0 = Seq(Row("row_1", "part_0",0L,"bob","v_0",0), | |
Row("row_2", "part_0",0L,"john","v_0",0), | |
Row("row_3", "part_0",0L,"tom","v_0",0)) | |
var dfFromData0 = spark.createDataFrame(data0,schema) | |
dfFromData0.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(OPERATION_OPT_KEY, "insert"). | |
mode(Overwrite). | |
save(basePath) | |
val schemaEvolved = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("versionId", StringType,true), | |
StructField("intToDouble", DoubleType,true) | |
)) | |
// insert w/ evolved field. | |
// update w/ evolved schema | |
val data1 = Seq(Row("row_2", "part_0",5L,"john","v_3",1.0), | |
Row("row_3", "part_0",5L,"maroon","v_2",1.0), | |
Row("row_9", "part_0",5L,"michael","v_2",1.0)) | |
var dfFromData1 = spark.createDataFrame(data1,schemaEvolved) | |
dfFromData1.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
var tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, versionId, intToDouble from hudi_trips_snapshot").show() | |
------------------- | |
MOR schema evolution. add one column | |
// spark-shell | |
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
import org.apache.spark.sql.types._ | |
import org.apache.spark.sql.Row | |
val tableName = "hudi_trips_cow" | |
val basePath = "file:///tmp/hudi_trips_cow" | |
val schema = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true) | |
)) | |
val data0 = Seq(Row("row_1", "part_0",0L,"bob"), | |
Row("row_2", "part_0",0L,"john"), | |
Row("row_3", "part_0",0L,"tom")) | |
var dfFromData0 = spark.createDataFrame(data0,schema) | |
dfFromData0.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(OPERATION_OPT_KEY, "insert"). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Overwrite). | |
save(basePath) | |
var data1 = Seq(Row("row_2", "part_0",0L,"bob_1"), | |
Row("row_3", "part_0",0L,"john_1"), | |
Row("row_5", "part_0",0L,"braddy_0")) | |
var dfFromData1 = spark.createDataFrame(data1,schema) | |
dfFromData1.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
dfFromData1.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
val schemaEvol = StructType( Array( | |
StructField("rowId", StringType,true), | |
StructField("partitionId", StringType,true), | |
StructField("preComb", LongType,true), | |
StructField("name", StringType,true), | |
StructField("newField", StringType,true) | |
)) | |
val data2 = Seq(Row("row_1", "part_0",0L), | |
Row("row_2", "part_0",0L), | |
Row("row_6", "part_0",0L)) | |
val data2 = Seq(Row("row_1", "part_0",0L,"bob","new_val1"), | |
Row("row_2", "part_0",0L,"john","new_val1"), | |
Row("row_6", "part_0",0L,"james","new_val1")) | |
var dfFromData2 = spark.createDataFrame(data2,schemaEvol) | |
dfFromData2.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "preComb"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowId"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId"). | |
option("hoodie.index.type","SIMPLE"). | |
option(TABLE_NAME, tableName). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
var tripsSnapshotDF1 = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*") | |
tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowId, partitionId, preComb, name, newField from hudi_trips_snapshot").show() | |
val updates = convertToStringList(dataGen.generateUpdates(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
val userSchema = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"rowKey\",\"type\":\"string\"},{\"name\":\"strField\",\"type\":\"string\"}]}" | |
dataGen.instantiateSchema(userSchema, "rowKey") | |
val deletes = convertDeletesToStringList(dataGen.generateDeletes(5)) | |
val deletesDf = spark.read.json(spark.sparkContext.parallelize(deletes, 1)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "strField"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowKey"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
mode(Overwrite). | |
save(basePath) | |
val updates = convertToStringList(dataGen.generateUpdates(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "strField"). | |
option(RECORDKEY_FIELD_OPT_KEY, "rowKey"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
mode(Append). | |
save(basePath) | |
// spark-shell | |
import org.apache.hudi.QuickstartUtils._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.sql.SaveMode._ | |
import org.apache.hudi.DataSourceReadOptions._ | |
import org.apache.hudi.DataSourceWriteOptions._ | |
import org.apache.hudi.config.HoodieWriteConfig._ | |
val tableName = "hudi_trips_cow" | |
val basePath = "file:///tmp/hudi_trips_cow" | |
val dataGen = new DataGenerator | |
val userSchema = "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"example.avro\",\"fields\":[{\"name\":\"rowKey\",\"type\":\"string\"},{\"name\":\"strField\",\"type\":\"string\"},{\"name\":\"intField\",\"type\":\"int\"},{\"name\":\"longField\",\"type\":\"long\"},{\"name\":\"booleanField\",\"type\":\"boolean\"}]}" | |
// ensure schema has the field for record key. partition path is added internally by the datagen tool and so not required to be part of the userSchema. As of now, only SimpleKeyGenerator is supported for both record key and partition path and partitionpath field is hardcoded. | |
dataGen.instantiateSchema(userSchema, "ts_ms") | |
// spark-shell | |
val inserts = convertToStringList(dataGen.generateInserts(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "ts_ms"). | |
option(RECORDKEY_FIELD_OPT_KEY, "ts_ms"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Overwrite). | |
save(basePath) | |
var tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowKey, partitionpath, strField, intField, longField, booleanField from hudi_trips_snapshot").show(false) | |
// spark-shell | |
val updates = convertToStringList(dataGen.generateUpdates(10)) | |
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) | |
df.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(PRECOMBINE_FIELD_OPT_KEY, "timestamp"). | |
option(RECORDKEY_FIELD_OPT_KEY, "_row_key"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowKey, partitionpath, strField, intField, longField, booleanField from hudi_trips_snapshot").show(false) | |
val deletes = convertDeletesToStringList(dataGen.generateDeletes(5)) | |
val deletesDf = spark.read.json(spark.sparkContext.parallelize(deletes, 1)) | |
deletesDf.write.format("hudi"). | |
options(getQuickstartWriteConfigs). | |
option(OPERATION_OPT_KEY,"delete"). | |
option(PRECOMBINE_FIELD_OPT_KEY, "timestamp"). | |
option(RECORDKEY_FIELD_OPT_KEY, "_row_key"). | |
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | |
option(TABLE_NAME, tableName). | |
option(TABLE_TYPE_OPT_KEY, "MERGE_ON_READ"). | |
mode(Append). | |
save(basePath) | |
tripsSnapshotDF = spark. | |
read. | |
format("hudi"). | |
load(basePath + "/*/*/*/*") | |
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") | |
spark.sql("select rowKey, partitionpath, strField, intField, longField, booleanField from hudi_trips_snapshot").show(false) | |
{"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"long"},{"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},{"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},{"name":"end_lon","type":"double"},{"name":"distance_in_meters","type":"int"},{"name":"seconds_since_epoch","type":"long"},{"name":"weight","type":"float"},{"name":"nation","type":"bytes"},{"name":"current_date","type":{"type":"int","logicalType":"date"}},{"name":"current_ts","type":"long"},{"name":"height","type":{"type":"fixed","name":"abc","size":5,"logicalType":"decimal","precision":10,"scale":6}},{"name":"city_to_state","type":{"type":"map","values":"string"}},{"name":"fare","type":{"type":"record","name":"fare","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}]}},{"name":"tip_history","type":{"type":"array","items":{"type":"record","name":"tip_history","fields":[{"name":"amount","type":"double"},{"name":"currency","type":"string"}],"default":null},"default":[]},"default":[]},{"name":"_hoodie_is_deleted","type":"boolean","default":false}]} | |
"{\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"begin_lat\",\"type\":\"double\"},{\"name\":\"begin_lon\",\"type\":\"double\"},{\"name\":\"end_lat\",\"type\":\"double\"},{\"name\":\"end_lon\",\"type\":\"double\"},{\"name\":\"distance_in_meters\",\"type\":\"int\"},{\"name\":\"seconds_since_epoch\",\"type\":\"long\"},{\"name\":\"weight\",\"type\":\"float\"},{\"name\":\"nation\",\"type\":\"bytes\"},{\"name\":\"current_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},{\"name\":\"current_ts\",\"type\":\"long\"},{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},{\"name\":\"city_to_state\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"fare\",\"type\":{\"type\":\"record\",\"name\":\"fare\",\"fields\":[{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"currency\",\"type\":\"string\"}]}},{\"name\":\"tip_history\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"tip_history\",\"fields\":[{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"currency\",\"type\":\"string\"}],\"default\":null},\"default\":[]},\"default\":[]},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\",\"default\":false}]}" | |
Df schema StructType(StructField(_hoodie_is_deleted,BooleanType,true), StructField(_row_key,StringType,true), StructField(begin_lat,DoubleType,true), StructField(begin_lon,DoubleType,true), StructField(city_to_state,StructType(StructField( | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment