Skip to content

Instantly share code, notes, and snippets.

@qrtt1
Created July 8, 2015 10:10
Show Gist options
  • Save qrtt1/20367116378ed72d5c59 to your computer and use it in GitHub Desktop.
Save qrtt1/20367116378ed72d5c59 to your computer and use it in GitHub Desktop.
gcs log awared input format
package qty.patched.org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
/**
* An {@link InputFormat} for plain text files. Files are broken into lines.
* Either linefeed or carriage-return are used to signal end of line. Keys are
* the position in the file, and values are the line of text..
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class GCSLogFilenameAwaredTextInputFormat extends TextInputFormat {
private static final Log LOG = LogFactory.getLog(GCSLogFilenameAwaredTextInputFormat.class);
private static final PathFilter myHiddenFileFilter = new PathFilter() {
public boolean accept(Path p) {
String name = p.getName();
/* accept gcs usage log pattern */
if (name.contains("_usage_")) {
return true;
}
return !name.startsWith("_") && !name.startsWith(".");
}
};
/**
* List input directories. Subclasses may override to, e.g., select only
* files matching a regular expression.
*
* @param job
* the job to list input paths for
* @return array of FileStatus objects
* @throws IOException
* if zero items.
*/
protected List<FileStatus> listStatus(JobContext job) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
List<IOException> errors = new ArrayList<IOException>();
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(myHiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
for (int i = 0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
} else {
for (FileStatus globStat : matches) {
if (globStat.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(globStat.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
LOG.info("Total input paths to process : " + result.size());
return result;
}
/**
* Proxy PathFilter that accepts a path only if all filters given in the
* constructor do. Used by the listPaths() to apply the built-in
* hiddenFileFilter together with a user provided one (if any).
*/
private static class MultiPathFilter implements PathFilter {
private List<PathFilter> filters;
public MultiPathFilter(List<PathFilter> filters) {
this.filters = filters;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment