Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created December 19, 2021 16:09
Show Gist options
  • Save nsivabalan/3daa4f007afaa7f97b6a3f4bdd8248ea to your computer and use it in GitHub Desktop.
Save nsivabalan/3daa4f007afaa7f97b6a3f4bdd8248ea to your computer and use it in GitHub Desktop.
- @ParameterizedTest
- @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
- public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws Exception {
+ //@ParameterizedTest
+ //@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"})
+ @RepeatedTest(20)
+ public void testHoodieClientBasicMultiWriter() throws Exception {
+ HoodieTableType tableType = HoodieTableType.MERGE_ON_READ;
if (tableType == HoodieTableType.MERGE_ON_READ) {
setUpMORTestTable();
}
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
- properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "500");
+ properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "200");
+ properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "10");
+ properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "200");
+ properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10");
HoodieWriteConfig writeConfig = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
@@ -125,14 +132,13 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
cyclicBarrier.await();
// Commit the update before the 2nd writer
- assertDoesNotThrow(() -> {
- client1.commit(nextCommitTime, writeStatusList);
- });
+ client1.commit(nextCommitTime, writeStatusList);
// Signal the 2nd writer to go ahead for his commit
- cyclicBarrier.await();
+ //cyclicBarrier.await();
writer1Completed.set(true);
} catch (Exception e) {
+ assertTrue(e instanceof HoodieWriteConflictException);
writer1Completed.set(false);
}
});
@@ -142,16 +148,14 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
final String nextCommitTime = "003";
// Wait for the 1st writer to start the commit
- cyclicBarrier.await();
final JavaRDD<WriteStatus> writeStatusList = startCommitForUpdate(writeConfig, client2, nextCommitTime, 100);
-
- // Wait for the 1st writer to complete the commit
cyclicBarrier.await();
- assertThrows(HoodieWriteConflictException.class, () -> {
- client2.commit(nextCommitTime, writeStatusList);
- });
+ // Wait for the 1st writer to complete the commit
+ //cyclicBarrier.await();
+ client2.commit(nextCommitTime, writeStatusList);
writer2Completed.set(true);
} catch (Exception e) {
+ assertTrue(e instanceof HoodieWriteConflictException);
writer2Completed.set(false);
}
});
@@ -159,8 +163,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
future1.get();
future2.get();
- assertTrue(writer1Completed.get());
- assertTrue(writer2Completed.get());
+ assertTrue(writer1Completed.get() || writer2Completed.get());
+ assertFalse(writer1Completed.get() && writer2Completed.get());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment