Created
July 10, 2023 19:31
-
-
Save nsivabalan/503bbc73f0060ffd1a11ef81dd9b7f3c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java | |
index c4323081a30..8287faf3ae4 100644 | |
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java | |
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java | |
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; | |
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; | |
import org.apache.hudi.avro.model.HoodieIndexPlan; | |
import org.apache.hudi.avro.model.HoodieRestoreMetadata; | |
+import org.apache.hudi.avro.model.HoodieRestorePlan; | |
import org.apache.hudi.avro.model.HoodieRollbackMetadata; | |
import org.apache.hudi.client.BaseHoodieWriteClient; | |
import org.apache.hudi.client.WriteStatus; | |
@@ -38,6 +39,7 @@ import org.apache.hudi.common.model.FileSlice; | |
import org.apache.hudi.common.model.HoodieCommitMetadata; | |
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; | |
import org.apache.hudi.common.model.HoodieFileFormat; | |
+import org.apache.hudi.common.model.HoodieFileGroup; | |
import org.apache.hudi.common.model.HoodieLogFile; | |
import org.apache.hudi.common.model.HoodiePartitionMetadata; | |
import org.apache.hudi.common.model.HoodieRecord; | |
@@ -51,6 +53,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; | |
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; | |
import org.apache.hudi.common.table.timeline.HoodieInstant; | |
import org.apache.hudi.common.table.timeline.HoodieTimeline; | |
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; | |
import org.apache.hudi.common.table.view.HoodieTableFileSystemView; | |
import org.apache.hudi.common.util.CompactionUtils; | |
import org.apache.hudi.common.util.HoodieTimer; | |
@@ -91,6 +94,7 @@ import java.util.Map; | |
import java.util.Objects; | |
import java.util.Set; | |
import java.util.function.Function; | |
+import java.util.function.Predicate; | |
import java.util.stream.Collectors; | |
import java.util.stream.IntStream; | |
import java.util.stream.Stream; | |
@@ -99,6 +103,7 @@ import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADAT | |
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER; | |
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED; | |
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; | |
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; | |
import static org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant; | |
import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan; | |
import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; | |
@@ -886,7 +891,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta | |
* Update from {@code HoodieCleanMetadata}. | |
* | |
* @param cleanMetadata {@code HoodieCleanMetadata} | |
- * @param instantTime Timestamp at which the clean was completed | |
+ * @param instantTime Timestamp at which the clean was completed | |
*/ | |
@Override | |
public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { | |
@@ -899,7 +904,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta | |
* Update from {@code HoodieRestoreMetadata}. | |
* | |
* @param restoreMetadata {@code HoodieRestoreMetadata} | |
- * @param instantTime Timestamp at which the restore was performed | |
+ * @param instantTime Timestamp at which the restore was performed | |
*/ | |
@Override | |
public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { | |
@@ -908,15 +913,39 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta | |
// Since the restore has completed on the dataset, the latest write timeline instant is the one to which the | |
// restore was performed. If this is not present, then the restore was performed to the earliest commit. This | |
// could happen in case of bootstrap followed by rollback e.g. TestBootstrap#testFullBootstrapWithRegexModeWithUpdatesMOR. | |
- Option<HoodieInstant> restoreInstant = dataMetaClient.getActiveTimeline().getWriteTimeline().lastInstant(); | |
- final String restoreToInstantTime; | |
+ /*Option<HoodieInstant> restoreInstant = dataMetaClient.getActiveTimeline().getRestoreTimeline().lastInstant(); | |
+ final String restoreToInstantTime = restoreMetadata.getStartRestoreTime(); | |
if (restoreInstant.isPresent()) { | |
restoreToInstantTime = restoreInstant.get().getTimestamp(); | |
} else { | |
- restoreToInstantTime = metadataMetaClient.getActiveTimeline().filterCompletedInstants().firstInstant().get().getTimestamp(); | |
+ restoreToInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().firstInstant().get().getTimestamp(); | |
+ }*/ | |
+ | |
+ HoodieInstant restoreInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, instantTime); | |
+ HoodieInstant requested = HoodieTimeline.getRestoreRequestedInstant(restoreInstant); | |
+ HoodieRestorePlan restorePlan = null; | |
+ try { | |
+ restorePlan = TimelineMetadataUtils.deserializeAvroMetadata( | |
+ dataMetaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), HoodieRestorePlan.class); | |
+ } catch (IOException e) { | |
+ throw new HoodieIOException("Deserialization of rollback plan failed ", e); | |
+ } | |
+ final String restoreToInstantTime = restorePlan.getSavepointToRestoreTimestamp(); | |
+ | |
+ Option<HoodieInstant> latestCleanInstant = metadataMetaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant(); | |
+ if (latestCleanInstant.isPresent()) { | |
+ // fetch the earliest commit to retain and ensure the base file prior to the time to restore is present | |
+ List<HoodieFileGroup> filesGroups = metadata.getMetadataFileSystemView().getAllFileGroups("files").collect(Collectors.toList()); | |
+ boolean canRestore = filesGroups.get(0).getAllFileSlices().map(fileSlice -> fileSlice.getBaseInstantTime()).anyMatch( | |
+ instantTime1 -> HoodieTimeline.compareTimestamps(instantTime1, LESSER_THAN_OR_EQUALS, restoreToInstantTime)); | |
+ if (!canRestore) { | |
+ throw new HoodieMetadataException("Can't restore since there is no base file in MDT lesser than the commit to restore to. " + | |
+ "Please delete metadata table and retry"); | |
+ } | |
+ } else { | |
+ // we are good to proceed. | |
} | |
- // We cannot restore to before the oldest compaction on MDT as we don't have the base files before that time. | |
Option<HoodieInstant> oldestCompaction = metadataMetaClient.getCommitTimeline().filterCompletedInstants().firstInstant(); | |
if (oldestCompaction.isPresent()) { | |
if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(restoreToInstantTime, oldestCompaction.get().getTimestamp())) { | |
@@ -982,7 +1011,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta | |
* Update from {@code HoodieRollbackMetadata}. | |
* | |
* @param rollbackMetadata {@code HoodieRollbackMetadata} | |
- * @param instantTime Timestamp at which the rollback was performed | |
+ * @param instantTime Timestamp at which the rollback was performed | |
*/ | |
@Override | |
public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) { | |
@@ -1018,7 +1047,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta | |
String rollbackInstantTime = createRollbackTimestamp(instantTime); | |
processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, metadataMetaClient.getActiveTimeline(), | |
- dataMetaClient, rollbackMetadata, instantTime)); | |
+ dataMetaClient, rollbackMetadata, instantTime)); | |
if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) { | |
LOG.info("Rolling back MDT deltacommit " + commitInstantTime); | |
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java | |
index f431283ac7a..2fc69ed5887 100644 | |
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java | |
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java | |
@@ -94,7 +94,7 @@ public class HoodieMetadataWriteUtils { | |
.withCleanerParallelism(parallelism) | |
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) | |
.withFailedWritesCleaningPolicy(failedWritesCleaningPolicy) | |
- .retainCommits(Math.min(writeConfig.getCleanerCommitsRetained(), DEFAULT_METADATA_CLEANER_COMMITS_RETAINED)) | |
+ .retainCommits(10) | |
.build()) | |
// we will trigger archive manually, to ensure only regular writer invokes it | |
.withArchivalConfig(HoodieArchivalConfig.newBuilder() | |
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java | |
index caeac4d322d..c18e28c9393 100644 | |
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java | |
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java | |
@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; | |
import org.apache.hudi.common.util.Option; | |
import org.apache.hudi.common.util.StringUtils; | |
import org.apache.hudi.config.HoodieWriteConfig; | |
+import org.apache.hudi.exception.HoodieIOException; | |
import org.apache.hudi.table.HoodieCompactionHandler; | |
import org.apache.hudi.table.HoodieTable; | |
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java | |
index fa654f2e706..08844604f88 100644 | |
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java | |
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java | |
@@ -1596,9 +1596,11 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { | |
* Test several table operations with restore. This test uses SparkRDDWriteClient. | |
* Once the restore support is ready in HoodieTestTable, then rewrite this test. | |
*/ | |
- @ParameterizedTest | |
- @EnumSource(HoodieTableType.class) | |
- public void testTableOperationsWithRestore(HoodieTableType tableType) throws Exception { | |
+ //@ParameterizedTest | |
+ //@EnumSource(HoodieTableType.class) | |
+ @Test | |
+ public void testTableOperationsWithRestore() throws Exception { | |
+ HoodieTableType tableType = MERGE_ON_READ; | |
init(tableType); | |
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); | |
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false) | |
@@ -1856,13 +1858,18 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { | |
// Deletes | |
newCommitTime = "20210101000900000"; | |
- records = dataGen.generateDeletes(newCommitTime, 10); | |
+ /*records = dataGen.generateDeletes(newCommitTime, 10); | |
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> r.getKey()); | |
client.startCommitWithTime(newCommitTime); | |
- client.delete(deleteKeys, newCommitTime); | |
+ client.delete(deleteKeys, newCommitTime);*/ | |
+ | |
+ client.startCommitWithTime(newCommitTime); | |
+ records = dataGen.generateUpdates(newCommitTime, 5); | |
+ writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); | |
+ assertNoWriteErrors(writeStatuses); | |
// Clean | |
- newCommitTime = "20210101000900000"; | |
+ newCommitTime = "20210101001000000"; | |
client.clean(newCommitTime); | |
validateMetadata(client); | |
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java | |
index 7a3006277e9..9500bbd98ff 100644 | |
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java | |
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java | |
@@ -342,6 +342,7 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness { | |
.enable(useFileListingMetadata) | |
.enableMetrics(enableMetrics) | |
.ignoreSpuriousDeletes(validateMetadataPayloadConsistency) | |
+ .withMaxNumDeltaCommitsBeforeCompaction(4) | |
.build()) | |
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) | |
.withExecutorMetrics(true).build()) | |
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java | |
index 63e0d5dbe1b..977706c5bbc 100644 | |
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java | |
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java | |
@@ -49,7 +49,7 @@ public final class HoodieMetadataConfig extends HoodieConfig { | |
// Meta fields are not populated by default for metadata table | |
public static final boolean DEFAULT_METADATA_POPULATE_META_FIELDS = false; | |
// Default number of commits to retain, without cleaning, on metadata table | |
- public static final int DEFAULT_METADATA_CLEANER_COMMITS_RETAINED = 3; | |
+ public static final int DEFAULT_METADATA_CLEANER_COMMITS_RETAINED = 10; | |
public static final String METADATA_PREFIX = "hoodie.metadata"; | |
public static final String OPTIMIZED_LOG_BLOCKS_SCAN = ".optimized.log.blocks.scan.enable"; | |
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java | |
index 6186010ab74..51fe34badd2 100644 | |
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java | |
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java | |
@@ -459,6 +459,10 @@ public interface HoodieTimeline extends Serializable { | |
return instant.isRequested() ? instant : HoodieTimeline.getRequestedInstant(instant); | |
} | |
+ static HoodieInstant getRestoreRequestedInstant(HoodieInstant instant) { | |
+ return instant.isRequested() ? instant : HoodieTimeline.getRequestedInstant(instant); | |
+ } | |
+ | |
static HoodieInstant getIndexRequestedInstant(final String timestamp) { | |
return new HoodieInstant(State.REQUESTED, INDEXING_ACTION, timestamp); | |
} | |
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java | |
index cc4f1dd5e32..b0d2b4aa304 100644 | |
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java | |
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java | |
@@ -90,6 +90,7 @@ import java.util.Map; | |
import java.util.Set; | |
import java.util.function.BiFunction; | |
import java.util.function.Function; | |
+import java.util.function.Predicate; | |
import java.util.stream.Collector; | |
import java.util.stream.Collectors; | |
import java.util.stream.Stream; | |
@@ -1286,6 +1287,11 @@ public class HoodieTableMetadataUtil { | |
validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline)); | |
}); | |
+ // add restore instants from MDT. | |
+ metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants() | |
+ .filter(instant -> instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) | |
+ .getInstants().forEach(instant -> validInstantTimestamps.add(instant.getTimestamp())); | |
+ | |
// SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid timestamp | |
validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, PARTITION_INITIALIZATION_TIME_SUFFIX)); | |
return validInstantTimestamps; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment