Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created April 16, 2021 22:19
Show Gist options
  • Save nsivabalan/479320351bec95ba3e0c0dfa5abb2b06 to your computer and use it in GitHub Desktop.
Save nsivabalan/479320351bec95ba3e0c0dfa5abb2b06 to your computer and use it in GitHub Desktop.
@Test
def testCopyOnWriteStorageAutoCommitDisabled() {
// Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
// Snapshot query
val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(100, snapshotDF1.count())
// Upsert based on the written table with Hudi metadata columns
val verificationRowKey = snapshotDF1.limit(1).select("_row_key").first.getString(0)
val updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
updateDf.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "false")
.mode(SaveMode.Append)
.save(basePath)
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val snapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*/*/*")
val snapShot2List = snapshotDF2.collect()
assertEquals(100, snapshotDF2.count())
assertEquals(verificationCol, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment