Skip to content

Instantly share code, notes, and snippets.

View nsivabalan's full-sized avatar

Sivabalan Narayanan nsivabalan

View GitHub Profile
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "gs://dataproc-staging-us-central1-708638735970-brfkprvv/hudi_trips_cow"
@Test
def testCopyOnWriteStorageAutoCommitDisabled() {
// Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
scala> dfFromData5.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
| option(RECORDKEY_FIELD_OPT_KEY, "rowId").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
| option("hoodie.index.type","SIMPLE").
| option(TABLE_NAME, tableName).
| mode(Append).
| save(basePath)
21/04/01 10:33:17 ERROR BoundedInMemoryExecutor: error producing records
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 50a44ad0..32fab3ad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -257,7 +257,7 @@ public abstract class AbstractHoodieLogRecordScanner {
LOG.error("Got IOException when reading log file", e);
throw new HoodieIOException("IOException when reading log file ", e);
} catch (Exception e) {
- LOG.error("Got exception when reading log file", e);
+ LOG.error("Got exception when reading log file with reader schema " + readerSchema.toString());
scala> spark.sql("select rowId, partitionId, preComb, name, versionId, toBeDeletedStr, intToLong, longToInt, evolvedField from hudi_trips_snapshot").show()
Log schema in HoodieMergeOnReadRDD : {"type":"record","name":"hudi_trips_cow_record","namespace":"hoodie.hudi_trips_cow","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"rowId","type":["string","null"]},{"name":"partitionId","type":["string","null"]},{"name":"preComb","type":["long","null"]},{"name":"name","type":["string","null"]},{"name":"versionId","type":["string","null"]},{"name":"toBeDeletedStr","type":["string","null"]},{"name":"intToLong","type":["int","null"]},{"name":"longToInt","ty
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// spark-shell
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
scala> dfFromData8.write.format("hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
| option(RECORDKEY_FIELD_OPT_KEY, "rowId").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
| option("hoodie.index.type","SIMPLE").
| option(TABLE_NAME, tableName).
| mode(Append).
| save(basePath)
21/02/27 16:09:10 ERROR BoundedInMemoryExecutor: error producing records