Created
June 6, 2019 04:55
-
-
Save cozos/e72ef287570bc9af44109ac90b1b2e4c to your computer and use it in GitHub Desktop.
TextInputFormat with improved S3A listing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class S3ATextInputFormat extends TextInputFormat { | |
private static final PathFilter hiddenFileFilter = p -> { | |
String name = p.getName(); | |
return !name.startsWith("_") && !name.startsWith("."); | |
}; | |
private static class MultiPathFilter implements PathFilter { | |
private List<PathFilter> filters; | |
public MultiPathFilter(List<PathFilter> filters) { | |
this.filters = filters; | |
} | |
public boolean accept(Path path) { | |
for (PathFilter filter : filters) { | |
if (!filter.accept(path)) { | |
return false; | |
} | |
} | |
return true; | |
} | |
} | |
@Override | |
protected List<FileStatus> listStatus(JobContext job) throws IOException { | |
Path[] inputPaths = getInputPaths(job); | |
Configuration conf = job.getConfiguration(); | |
List<PathFilter> filters = new ArrayList<PathFilter>(); | |
filters.add(hiddenFileFilter); | |
PathFilter jobFilter = getInputPathFilter(job); | |
if (jobFilter != null) { | |
filters.add(jobFilter); | |
} | |
PathFilter inputFilter = new MultiPathFilter(filters); | |
return Arrays | |
.stream(inputPaths) | |
.flatMap( | |
inputPath -> { | |
try { | |
return inputPath.getFileSystem(conf).listFiles(inputPath, true) | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
) | |
.collect(Collectors.toList()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment