Last active
February 24, 2023 09:15
-
-
Save nsivabalan/7ed9aabc65e8de536b07390e6e872b87 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git diff | |
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java | |
index e2de9c7633..ede050b1b4 100644 | |
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java | |
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java | |
@@ -33,8 +33,11 @@ import org.apache.hudi.client.heartbeat.HeartbeatUtils; | |
import org.apache.hudi.client.utils.TransactionUtils; | |
import org.apache.hudi.common.HoodiePendingRollbackInfo; | |
import org.apache.hudi.common.config.HoodieCommonConfig; | |
+import org.apache.hudi.common.config.HoodieMetadataConfig; | |
import org.apache.hudi.common.engine.HoodieEngineContext; | |
+import org.apache.hudi.common.fs.FSUtils; | |
import org.apache.hudi.common.model.ActionType; | |
+import org.apache.hudi.common.model.FileSlice; | |
import org.apache.hudi.common.model.HoodieCommitMetadata; | |
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; | |
import org.apache.hudi.common.model.HoodieKey; | |
@@ -48,6 +51,8 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; | |
import org.apache.hudi.common.table.timeline.HoodieInstant; | |
import org.apache.hudi.common.table.timeline.HoodieInstant.State; | |
import org.apache.hudi.common.table.timeline.HoodieTimeline; | |
+import org.apache.hudi.common.table.view.FileSystemViewManager; | |
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView; | |
import org.apache.hudi.common.util.CleanerUtils; | |
import org.apache.hudi.common.util.CommitUtils; | |
import org.apache.hudi.common.util.Option; | |
@@ -102,6 +107,7 @@ import java.util.List; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.function.BiConsumer; | |
+import java.util.stream.Collectors; | |
import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName; | |
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY; | |
@@ -217,6 +223,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient | |
return true; | |
} | |
LOG.info("Committing " + instantTime + " action " + commitActionType); | |
+ | |
// Create a Hoodie table which encapsulated the commits and files visible | |
HoodieTable table = createTable(config, hadoopConf); | |
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, | |
@@ -231,6 +238,10 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient | |
extraPreCommitFunc.get().accept(table.getMetaClient(), metadata); | |
} | |
commit(table, commitActionType, instantTime, metadata, stats); | |
+ | |
+ if (config.isMetadataTableEnabled()) { | |
+ validateFileGroups(table, stats); | |
+ } | |
postCommit(table, metadata, instantTime, extraMetadata); | |
LOG.info("Committed " + instantTime); | |
releaseResources(); | |
@@ -267,8 +278,36 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient | |
return true; | |
} | |
+ private void validateFileGroups(HoodieTable hoodieTable, List<HoodieWriteStat> writeStatList) { | |
+ LOG.info("Validating file groups for missed updated :: "); | |
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().enable(false).build(); | |
+ HoodieTableFileSystemView fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(context, hoodieTable.getMetaClient(), metadataConfig); | |
+ List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, metadataConfig, config.getBasePath()); | |
+ partitionPaths.forEach(pPath -> fileSystemView.getLatestBaseFiles(pPath).collect(Collectors.toList())); | |
+ List<HoodieWriteStat> toProcessStats = writeStatList.stream().filter(writeStat -> !writeStat.getPath().endsWith(".parquet")).collect(Collectors.toList()); | |
+ | |
+ for (HoodieWriteStat writeStat : toProcessStats) { | |
+ String fileID = writeStat.getFileId(); | |
+ String filePath = writeStat.getPath(); | |
+ String prevCommitTime = writeStat.getPrevCommit(); | |
+ String pPath = writeStat.getPartitionPath(); | |
+ String logFilesBaseCommitTime = FSUtils.getCommitTime(filePath); | |
+ Option<FileSlice> fileSlice = fileSystemView.getLatestFileSlice(pPath, fileID); | |
+ if (!fileID.isEmpty()) { | |
+ String latestBaseFileInstantTime = fileSlice.get().getBaseInstantTime(); | |
+ if (logFilesBaseCommitTime.equals(latestBaseFileInstantTime)) { | |
+ throw new HoodieIOException("XXX Log file added to last but one file slice for " + pPath + ", " + fileID + ", new log file " + filePath | |
+ + ", latest base file commit time : " + fileSlice.get().getBaseInstantTime()); | |
+ } else { | |
+ LOG.debug("Log file added to latest file slice for " + pPath + ", " + fileID + ", new log file " + filePath | |
+ + ", latest base file commit time : " + fileSlice.get().getBaseInstantTime()); | |
+ } | |
+ } | |
+ } | |
+ } | |
+ | |
protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata, | |
- List<HoodieWriteStat> stats) throws IOException { | |
+ List<HoodieWriteStat> stats) throws IOException { | |
LOG.info("Committing " + instantTime + " action " + commitActionType); | |
HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); | |
// Finalize write | |
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | |
index 2ccd0435d3..d38c5940ee 100644 | |
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | |
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | |
@@ -68,6 +68,7 @@ import org.apache.hudi.index.HoodieIndex; | |
import org.apache.hudi.keygen.SimpleAvroKeyGenerator; | |
import org.apache.hudi.keygen.constant.KeyGeneratorOptions; | |
import org.apache.hudi.keygen.constant.KeyGeneratorType; | |
+import org.apache.hudi.metadata.HoodieTableMetadata; | |
import org.apache.hudi.metrics.MetricsReporterType; | |
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; | |
import org.apache.hudi.table.RandomFileIdPrefixProvider; | |
@@ -3042,7 +3043,9 @@ public class HoodieWriteConfig extends HoodieConfig { | |
if (writeConfig.isEmbeddedTimelineServerEnabled()) { | |
return MarkerType.TIMELINE_SERVER_BASED.toString(); | |
} else { | |
- LOG.warn("Embedded timeline server is disabled, fallback to use direct marker type for spark"); | |
+ if (!HoodieTableMetadata.isMetadataTable(this.writeConfig.getBasePath())) { | |
+ LOG.warn("Embedded timeline server is disabled, fallback to use direct marker type for spark"); | |
+ } | |
return MarkerType.DIRECT.toString(); | |
} | |
case FLINK: | |
diff --git a/hudi-tests-common/src/main/resources/log4j2-surefire.properties b/hudi-tests-common/src/main/resources/log4j2-surefire.properties | |
index 6b6b2fa5e5..b4da3f6cc7 100644 | |
--- a/hudi-tests-common/src/main/resources/log4j2-surefire.properties | |
+++ b/hudi-tests-common/src/main/resources/log4j2-surefire.properties | |
@@ -32,6 +32,6 @@ rootLogger.appenderRef.stdout.ref = CONSOLE | |
logger.apache.name = org.apache | |
logger.apache.level = info | |
logger.hudi.name = org.apache.hudi | |
-logger.hudi.level = debug | |
+logger.hudi.level = warn | |
logger.hbase.name = org.apache.hadoop.hbase | |
logger.hbase.level = error | |
\ No newline at end of file | |
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java | |
index 4d91e5076e..ebc3d2c2bc 100644 | |
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java | |
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java | |
@@ -129,8 +129,6 @@ import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName; | |
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; | |
import static org.apache.hudi.common.table.HoodieTableConfig.DROP_PARTITION_COLUMNS; | |
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE; | |
-import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING; | |
-import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT; | |
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE; | |
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT; | |
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT; | |
@@ -139,6 +137,7 @@ import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SP | |
import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger; | |
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; | |
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY; | |
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.counter; | |
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; | |
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; | |
@@ -712,7 +711,7 @@ public class DeltaSync implements Serializable, Closeable { | |
String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType)); | |
boolean success = writeClient.commit(instantTime, writeStatusRDD, Option.of(checkpointCommitMetadata), commitActionType, Collections.emptyMap()); | |
if (success) { | |
- LOG.info("Commit " + instantTime + " successful!"); | |
+ LOG.warn("Commit " + instantTime + " successful! " + counter.incrementAndGet()); | |
this.formatAdapter.getSource().onCommit(checkpointStr); | |
// Schedule compaction if needed | |
if (cfg.isAsyncCompactionEnabled()) { | |
@@ -916,10 +915,10 @@ public class DeltaSync implements Serializable, Closeable { | |
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.from(props); | |
// Validate what deltastreamer assumes of write-config to be really safe | |
- ValidationUtils.checkArgument(config.inlineCompactionEnabled() == cfg.isInlineCompactionEnabled(), | |
- String.format("%s should be set to %s", INLINE_COMPACT.key(), cfg.isInlineCompactionEnabled())); | |
- ValidationUtils.checkArgument(config.inlineClusteringEnabled() == clusteringConfig.isInlineClusteringEnabled(), | |
- String.format("%s should be set to %s", INLINE_CLUSTERING.key(), clusteringConfig.isInlineClusteringEnabled())); | |
+ //ValidationUtils.checkArgument(config.inlineCompactionEnabled() == cfg.isInlineCompactionEnabled(), | |
+ // String.format("%s should be set to %s", INLINE_COMPACT.key(), cfg.isInlineCompactionEnabled())); | |
+ //ValidationUtils.checkArgument(config.inlineClusteringEnabled() == clusteringConfig.isInlineClusteringEnabled(), | |
+ // String.format("%s should be set to %s", INLINE_CLUSTERING.key(), clusteringConfig.isInlineClusteringEnabled())); | |
ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == clusteringConfig.isAsyncClusteringEnabled(), | |
String.format("%s should be set to %s", ASYNC_CLUSTERING_ENABLE.key(), clusteringConfig.isAsyncClusteringEnabled())); | |
ValidationUtils.checkArgument(!config.shouldAutoCommit(), | |
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java | |
index 834938a317..a383b75db3 100644 | |
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java | |
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java | |
@@ -83,6 +83,7 @@ import java.util.Objects; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
+import java.util.concurrent.atomic.AtomicInteger; | |
import static java.lang.String.format; | |
import static org.apache.hudi.common.util.ValidationUtils.checkArgument; | |
@@ -117,6 +118,8 @@ public class HoodieDeltaStreamer implements Serializable { | |
public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync"; | |
+ public static AtomicInteger counter = new AtomicInteger(0); | |
+ | |
public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException { | |
this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()), | |
jssc.hadoopConfiguration(), Option.empty()); | |
@@ -425,7 +428,7 @@ public class HoodieDeltaStreamer implements Serializable { | |
public boolean isInlineCompactionEnabled() { | |
// Inline compaction is disabled for continuous mode, otherwise enabled for MOR | |
- return !continuousMode && !forceDisableCompaction | |
+ return !forceDisableCompaction | |
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType)); | |
} | |
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java | |
index eb6ab80b5f..5b3cb4cc9d 100644 | |
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java | |
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java | |
@@ -821,9 +821,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { | |
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true, recordType); | |
} | |
- @Disabled("HUDI-5815 for investigation") | |
+ // @Disabled("HUDI-5815 for investigation") | |
@ParameterizedTest | |
- @EnumSource(value = HoodieRecordType.class, names = {"AVRO", "SPARK"}) | |
+ @EnumSource(value = HoodieRecordType.class, names = {"AVRO"}) | |
public void testUpsertsMORContinuousMode(HoodieRecordType recordType) throws Exception { | |
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor", recordType); | |
} | |
@@ -863,6 +863,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { | |
cfg.tableType = tableType.name(); | |
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); | |
cfg.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key())); | |
+ cfg.configs.add(String.format("%s=4", HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key())); | |
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); | |
deltaStreamerTestRunner(ds, cfg, (r) -> { | |
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment