Created
August 6, 2021 03:45
-
-
Save nsivabalan/7ebd3b27ad019b7f655725509b59d146 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cat /tmp/temp1.out | |
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java | |
index 520a956a4..353104340 100644 | |
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java | |
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java | |
@@ -18,14 +18,6 @@ | |
package org.apache.hudi.common.fs; | |
-import org.apache.hadoop.conf.Configuration; | |
-import org.apache.hadoop.fs.FileStatus; | |
-import org.apache.hadoop.fs.FileSystem; | |
-import org.apache.hadoop.fs.LocatedFileStatus; | |
-import org.apache.hadoop.fs.Path; | |
-import org.apache.hadoop.fs.PathFilter; | |
-import org.apache.hadoop.fs.RemoteIterator; | |
-import org.apache.hadoop.hdfs.DistributedFileSystem; | |
import org.apache.hudi.common.config.HoodieMetadataConfig; | |
import org.apache.hudi.common.config.SerializableConfiguration; | |
import org.apache.hudi.common.engine.HoodieEngineContext; | |
@@ -37,12 +29,20 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; | |
import org.apache.hudi.common.table.timeline.HoodieInstant; | |
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; | |
import org.apache.hudi.common.util.Option; | |
-import org.apache.hudi.common.util.collection.ImmutablePair; | |
import org.apache.hudi.common.util.collection.Pair; | |
import org.apache.hudi.exception.HoodieException; | |
import org.apache.hudi.exception.HoodieIOException; | |
import org.apache.hudi.exception.InvalidHoodiePathException; | |
import org.apache.hudi.metadata.HoodieTableMetadata; | |
+ | |
+import org.apache.hadoop.conf.Configuration; | |
+import org.apache.hadoop.fs.FileStatus; | |
+import org.apache.hadoop.fs.FileSystem; | |
+import org.apache.hadoop.fs.LocatedFileStatus; | |
+import org.apache.hadoop.fs.Path; | |
+import org.apache.hadoop.fs.PathFilter; | |
+import org.apache.hadoop.fs.RemoteIterator; | |
+import org.apache.hadoop.hdfs.DistributedFileSystem; | |
import org.apache.log4j.LogManager; | |
import org.apache.log4j.Logger; | |
@@ -51,7 +51,6 @@ import java.io.FileNotFoundException; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
-import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Map; | |
@@ -199,11 +198,11 @@ public class FSUtils { | |
/** | |
* Obtain all the partition paths, that are present in this table, denoted by presence of | |
* {@link HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE}. | |
- * | |
+ * <p> | |
* If the basePathStr is a subdirectory of .hoodie folder then we assume that the partitions of an internal | |
* table (a hoodie table within the .hoodie directory) are to be obtained. | |
* | |
- * @param fs FileSystem instance | |
+ * @param fs FileSystem instance | |
* @param basePathStr base directory | |
*/ | |
public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr) throws IOException { | |
@@ -225,10 +224,10 @@ public class FSUtils { | |
/** | |
* Recursively processes all files in the base-path. If excludeMetaFolder is set, the meta-folder and all its subdirs | |
* are skipped | |
- * | |
- * @param fs File System | |
- * @param basePathStr Base-Path | |
- * @param consumer Callback for processing | |
+ * | |
+ * @param fs File System | |
+ * @param basePathStr Base-Path | |
+ * @param consumer Callback for processing | |
* @param excludeMetaFolder Exclude .hoodie folder | |
* @throws IOException - | |
*/ | |
@@ -417,7 +416,7 @@ public class FSUtils { | |
} | |
public static String makeLogFileName(String fileId, String logFileExtension, String baseCommitTime, int version, | |
- String writeToken) { | |
+ String writeToken) { | |
String suffix = | |
(writeToken == null) ? String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version) | |
: String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken); | |
@@ -454,7 +453,7 @@ public class FSUtils { | |
* Get all the log files for the passed in FileId in the partition path. | |
*/ | |
public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId, | |
- final String logFileExtension, final String baseCommitTime) throws IOException { | |
+ final String logFileExtension, final String baseCommitTime) throws IOException { | |
try { | |
return Arrays | |
.stream(fs.listStatus(partitionPath, | |
@@ -469,7 +468,7 @@ public class FSUtils { | |
* Get the latest log version for the fileId in the partition path. | |
*/ | |
public static Option<Pair<Integer, String>> getLatestLogVersion(FileSystem fs, Path partitionPath, | |
- final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { | |
+ final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { | |
Option<HoodieLogFile> latestLogFile = | |
getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime)); | |
if (latestLogFile.isPresent()) { | |
@@ -483,7 +482,7 @@ public class FSUtils { | |
* computes the next log version for the specified fileId in the partition path. | |
*/ | |
public static int computeNextLogVersion(FileSystem fs, Path partitionPath, final String fileId, | |
- final String logFileExtension, final String baseCommitTime) throws IOException { | |
+ final String logFileExtension, final String baseCommitTime) throws IOException { | |
Option<Pair<Integer, String>> currentVersionWithWriteToken = | |
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime); | |
// handle potential overflow | |
@@ -577,13 +576,14 @@ public class FSUtils { | |
/** | |
* Get the FS implementation for this table. | |
- * @param path Path String | |
- * @param hadoopConf Serializable Hadoop Configuration | |
+ * | |
+ * @param path Path String | |
+ * @param hadoopConf Serializable Hadoop Configuration | |
* @param consistencyGuardConfig Consistency Guard Config | |
* @return HoodieWrapperFileSystem | |
*/ | |
public static HoodieWrapperFileSystem getFs(String path, SerializableConfiguration hadoopConf, | |
- ConsistencyGuardConfig consistencyGuardConfig) { | |
+ ConsistencyGuardConfig consistencyGuardConfig) { | |
FileSystem fileSystem = FSUtils.getFs(path, hadoopConf.newCopy()); | |
return new HoodieWrapperFileSystem(fileSystem, | |
consistencyGuardConfig.isConsistencyCheckEnabled() | |
@@ -593,7 +593,8 @@ public class FSUtils { | |
/** | |
* Helper to filter out paths under metadata folder when running fs.globStatus. | |
- * @param fs File System | |
+ * | |
+ * @param fs File System | |
* @param globPath Glob Path | |
* @return the file status list of globPath exclude the meta folder | |
* @throws IOException when having trouble listing the path | |
@@ -609,17 +610,21 @@ public class FSUtils { | |
* Deletes a directory by deleting sub-paths in parallel on the file system. | |
* | |
* @param hoodieEngineContext {@code HoodieEngineContext} instance | |
- * @param fs file system | |
- * @param dirPath directory path | |
- * @param parallelism parallelism to use for sub-paths | |
+ * @param fs file system | |
+ * @param dirPath directory path | |
+ * @param parallelism parallelism to use for sub-paths | |
* @return {@code true} if the directory is delete; {@code false} otherwise. | |
*/ | |
public static boolean deleteDir( | |
HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, int parallelism) { | |
try { | |
if (fs.exists(dirPath)) { | |
- FSUtils.parallelizeSubPathProcess(hoodieEngineContext, fs, dirPath, parallelism, e -> true, | |
- pairOfSubPathAndConf -> deleteSubPath(pairOfSubPathAndConf.getKey(), pairOfSubPathAndConf.getValue()) | |
+ FSUtils.parallelizeSubPathProcess(hoodieEngineContext, fs, dirPath, parallelism, new Predicate<String>() { | |
+ @Override | |
+ public boolean test(String s) { | |
+ return true; | |
+ } | |
+ } | |
); | |
boolean result = fs.delete(dirPath, true); | |
@@ -636,18 +641,15 @@ public class FSUtils { | |
* Processes sub-path in parallel. | |
* | |
* @param hoodieEngineContext {@code HoodieEngineContext} instance | |
- * @param fs file system | |
- * @param dirPath directory path | |
- * @param parallelism parallelism to use for sub-paths | |
- * @param subPathPredicate predicate to use to filter sub-paths for processing | |
- * @param pairFunction actual processing logic for each sub-path | |
- * @param <T> type of result to return for each sub-path | |
+ * @param fs file system | |
+ * @param dirPath directory path | |
+ * @param parallelism parallelism to use for sub-paths | |
+ * @param subPathPredicate predicate to use to filter sub-paths for processing | |
* @return a map of sub-path to result of the processing | |
*/ | |
- public static <T> Map<String, T> parallelizeSubPathProcess( | |
- HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, int parallelism, | |
- Predicate<String> subPathPredicate, Function<Pair<String, SerializableConfiguration>, T> pairFunction) { | |
- Map<String, T> result = new HashMap<>(); | |
+ public static void parallelizeSubPathProcess( | |
+ HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, int parallelism, Predicate<String> subPathPredicate) { | |
+ //Map<String, T> result = new HashMap<>(); | |
try { | |
FileStatus[] fileStatuses = fs.listStatus(dirPath); | |
List<String> subPaths = Arrays.stream(fileStatuses) | |
@@ -658,21 +660,26 @@ public class FSUtils { | |
if (subPaths.size() > 0) { | |
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf()); | |
int actualParallelism = Math.min(subPaths.size(), parallelism); | |
- result = hoodieEngineContext.mapToPair(subPaths, | |
+ hoodieEngineContext.foreach(subPaths, subPathStr -> { | |
+ Path subPath = new Path(subPathStr); | |
+ FileSystem fileSystem = subPath.getFileSystem(conf.get()); | |
+ fileSystem.delete(subPath, true); | |
+ }, actualParallelism); | |
+ /*result = hoodieEngineContext.mapToPair(subPaths, | |
subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))), | |
- actualParallelism); | |
+ actualParallelism);*/ | |
} | |
} catch (IOException ioe) { | |
throw new HoodieIOException(ioe.getMessage(), ioe); | |
} | |
- return result; | |
+ //return result; | |
} | |
/** | |
* Deletes a sub-path. | |
* | |
* @param subPathStr sub-path String | |
- * @param conf serializable config | |
+ * @param conf serializable config | |
* @return {@code true} if the sub-path is deleted; {@code false} otherwise. | |
*/ | |
private static boolean deleteSubPath(String subPathStr, SerializableConfiguration conf) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment