Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created August 6, 2021 03:45
Show Gist options
  • Save nsivabalan/7ebd3b27ad019b7f655725509b59d146 to your computer and use it in GitHub Desktop.
Save nsivabalan/7ebd3b27ad019b7f655725509b59d146 to your computer and use it in GitHub Desktop.
cat /tmp/temp1.out
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 520a956a4..353104340 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -18,14 +18,6 @@
package org.apache.hudi.common.fs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -37,12 +29,20 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.InvalidHoodiePathException;
import org.apache.hudi.metadata.HoodieTableMetadata;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -51,7 +51,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -199,11 +198,11 @@ public class FSUtils {
/**
* Obtain all the partition paths, that are present in this table, denoted by presence of
* {@link HoodiePartitionMetadata#HOODIE_PARTITION_METAFILE}.
- *
+ * <p>
* If the basePathStr is a subdirectory of .hoodie folder then we assume that the partitions of an internal
* table (a hoodie table within the .hoodie directory) are to be obtained.
*
- * @param fs FileSystem instance
+ * @param fs FileSystem instance
* @param basePathStr base directory
*/
public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr) throws IOException {
@@ -225,10 +224,10 @@ public class FSUtils {
/**
* Recursively processes all files in the base-path. If excludeMetaFolder is set, the meta-folder and all its subdirs
* are skipped
- *
- * @param fs File System
- * @param basePathStr Base-Path
- * @param consumer Callback for processing
+ *
+ * @param fs File System
+ * @param basePathStr Base-Path
+ * @param consumer Callback for processing
* @param excludeMetaFolder Exclude .hoodie folder
* @throws IOException -
*/
@@ -417,7 +416,7 @@ public class FSUtils {
}
public static String makeLogFileName(String fileId, String logFileExtension, String baseCommitTime, int version,
- String writeToken) {
+ String writeToken) {
String suffix =
(writeToken == null) ? String.format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version)
: String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken);
@@ -454,7 +453,7 @@ public class FSUtils {
* Get all the log files for the passed in FileId in the partition path.
*/
public static Stream<HoodieLogFile> getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId,
- final String logFileExtension, final String baseCommitTime) throws IOException {
+ final String logFileExtension, final String baseCommitTime) throws IOException {
try {
return Arrays
.stream(fs.listStatus(partitionPath,
@@ -469,7 +468,7 @@ public class FSUtils {
* Get the latest log version for the fileId in the partition path.
*/
public static Option<Pair<Integer, String>> getLatestLogVersion(FileSystem fs, Path partitionPath,
- final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException {
+ final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException {
Option<HoodieLogFile> latestLogFile =
getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime));
if (latestLogFile.isPresent()) {
@@ -483,7 +482,7 @@ public class FSUtils {
* computes the next log version for the specified fileId in the partition path.
*/
public static int computeNextLogVersion(FileSystem fs, Path partitionPath, final String fileId,
- final String logFileExtension, final String baseCommitTime) throws IOException {
+ final String logFileExtension, final String baseCommitTime) throws IOException {
Option<Pair<Integer, String>> currentVersionWithWriteToken =
getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
// handle potential overflow
@@ -577,13 +576,14 @@ public class FSUtils {
/**
* Get the FS implementation for this table.
- * @param path Path String
- * @param hadoopConf Serializable Hadoop Configuration
+ *
+ * @param path Path String
+ * @param hadoopConf Serializable Hadoop Configuration
* @param consistencyGuardConfig Consistency Guard Config
* @return HoodieWrapperFileSystem
*/
public static HoodieWrapperFileSystem getFs(String path, SerializableConfiguration hadoopConf,
- ConsistencyGuardConfig consistencyGuardConfig) {
+ ConsistencyGuardConfig consistencyGuardConfig) {
FileSystem fileSystem = FSUtils.getFs(path, hadoopConf.newCopy());
return new HoodieWrapperFileSystem(fileSystem,
consistencyGuardConfig.isConsistencyCheckEnabled()
@@ -593,7 +593,8 @@ public class FSUtils {
/**
* Helper to filter out paths under metadata folder when running fs.globStatus.
- * @param fs File System
+ *
+ * @param fs File System
* @param globPath Glob Path
* @return the file status list of globPath exclude the meta folder
* @throws IOException when having trouble listing the path
@@ -609,17 +610,21 @@ public class FSUtils {
* Deletes a directory by deleting sub-paths in parallel on the file system.
*
* @param hoodieEngineContext {@code HoodieEngineContext} instance
- * @param fs file system
- * @param dirPath directory path
- * @param parallelism parallelism to use for sub-paths
+ * @param fs file system
+ * @param dirPath directory path
+ * @param parallelism parallelism to use for sub-paths
* @return {@code true} if the directory is delete; {@code false} otherwise.
*/
public static boolean deleteDir(
HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, int parallelism) {
try {
if (fs.exists(dirPath)) {
- FSUtils.parallelizeSubPathProcess(hoodieEngineContext, fs, dirPath, parallelism, e -> true,
- pairOfSubPathAndConf -> deleteSubPath(pairOfSubPathAndConf.getKey(), pairOfSubPathAndConf.getValue())
+ FSUtils.parallelizeSubPathProcess(hoodieEngineContext, fs, dirPath, parallelism, new Predicate<String>() {
+ @Override
+ public boolean test(String s) {
+ return true;
+ }
+ }
);
boolean result = fs.delete(dirPath, true);
@@ -636,18 +641,15 @@ public class FSUtils {
* Processes sub-path in parallel.
*
* @param hoodieEngineContext {@code HoodieEngineContext} instance
- * @param fs file system
- * @param dirPath directory path
- * @param parallelism parallelism to use for sub-paths
- * @param subPathPredicate predicate to use to filter sub-paths for processing
- * @param pairFunction actual processing logic for each sub-path
- * @param <T> type of result to return for each sub-path
+ * @param fs file system
+ * @param dirPath directory path
+ * @param parallelism parallelism to use for sub-paths
+ * @param subPathPredicate predicate to use to filter sub-paths for processing
* @return a map of sub-path to result of the processing
*/
- public static <T> Map<String, T> parallelizeSubPathProcess(
- HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, int parallelism,
- Predicate<String> subPathPredicate, Function<Pair<String, SerializableConfiguration>, T> pairFunction) {
- Map<String, T> result = new HashMap<>();
+ public static void parallelizeSubPathProcess(
+ HoodieEngineContext hoodieEngineContext, FileSystem fs, Path dirPath, int parallelism, Predicate<String> subPathPredicate) {
+ //Map<String, T> result = new HashMap<>();
try {
FileStatus[] fileStatuses = fs.listStatus(dirPath);
List<String> subPaths = Arrays.stream(fileStatuses)
@@ -658,21 +660,26 @@ public class FSUtils {
if (subPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
int actualParallelism = Math.min(subPaths.size(), parallelism);
- result = hoodieEngineContext.mapToPair(subPaths,
+ hoodieEngineContext.foreach(subPaths, subPathStr -> {
+ Path subPath = new Path(subPathStr);
+ FileSystem fileSystem = subPath.getFileSystem(conf.get());
+ fileSystem.delete(subPath, true);
+ }, actualParallelism);
+ /*result = hoodieEngineContext.mapToPair(subPaths,
subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))),
- actualParallelism);
+ actualParallelism);*/
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
- return result;
+ //return result;
}
/**
* Deletes a sub-path.
*
* @param subPathStr sub-path String
- * @param conf serializable config
+ * @param conf serializable config
* @return {@code true} if the sub-path is deleted; {@code false} otherwise.
*/
private static boolean deleteSubPath(String subPathStr, SerializableConfiguration conf) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment