Created
July 8, 2015 10:10
-
-
Save qrtt1/20367116378ed72d5c59 to your computer and use it in GitHub Desktop.
gcs log awared input format
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package qty.patched.org.apache.hadoop.mapreduce.lib.input; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.classification.InterfaceAudience; | |
import org.apache.hadoop.classification.InterfaceStability; | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.LocatedFileStatus; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.fs.PathFilter; | |
import org.apache.hadoop.fs.RemoteIterator; | |
import org.apache.hadoop.mapreduce.InputFormat; | |
import org.apache.hadoop.mapreduce.JobContext; | |
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.security.TokenCache; | |
/** | |
* An {@link InputFormat} for plain text files. Files are broken into lines. | |
* Either linefeed or carriage-return are used to signal end of line. Keys are | |
* the position in the file, and values are the line of text.. | |
*/ | |
@InterfaceAudience.Public | |
@InterfaceStability.Stable | |
public class GCSLogFilenameAwaredTextInputFormat extends TextInputFormat { | |
private static final Log LOG = LogFactory.getLog(GCSLogFilenameAwaredTextInputFormat.class); | |
private static final PathFilter myHiddenFileFilter = new PathFilter() { | |
public boolean accept(Path p) { | |
String name = p.getName(); | |
/* accept gcs usage log pattern */ | |
if (name.contains("_usage_")) { | |
return true; | |
} | |
return !name.startsWith("_") && !name.startsWith("."); | |
} | |
}; | |
/** | |
* List input directories. Subclasses may override to, e.g., select only | |
* files matching a regular expression. | |
* | |
* @param job | |
* the job to list input paths for | |
* @return array of FileStatus objects | |
* @throws IOException | |
* if zero items. | |
*/ | |
protected List<FileStatus> listStatus(JobContext job) throws IOException { | |
List<FileStatus> result = new ArrayList<FileStatus>(); | |
Path[] dirs = getInputPaths(job); | |
if (dirs.length == 0) { | |
throw new IOException("No input paths specified in job"); | |
} | |
// get tokens for all the required FileSystems.. | |
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration()); | |
// Whether we need to recursive look into the directory structure | |
boolean recursive = getInputDirRecursive(job); | |
List<IOException> errors = new ArrayList<IOException>(); | |
// creates a MultiPathFilter with the hiddenFileFilter and the | |
// user provided one (if any). | |
List<PathFilter> filters = new ArrayList<PathFilter>(); | |
filters.add(myHiddenFileFilter); | |
PathFilter jobFilter = getInputPathFilter(job); | |
if (jobFilter != null) { | |
filters.add(jobFilter); | |
} | |
PathFilter inputFilter = new MultiPathFilter(filters); | |
for (int i = 0; i < dirs.length; ++i) { | |
Path p = dirs[i]; | |
FileSystem fs = p.getFileSystem(job.getConfiguration()); | |
FileStatus[] matches = fs.globStatus(p, inputFilter); | |
if (matches == null) { | |
errors.add(new IOException("Input path does not exist: " + p)); | |
} else if (matches.length == 0) { | |
errors.add(new IOException("Input Pattern " + p + " matches 0 files")); | |
} else { | |
for (FileStatus globStat : matches) { | |
if (globStat.isDirectory()) { | |
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(globStat.getPath()); | |
while (iter.hasNext()) { | |
LocatedFileStatus stat = iter.next(); | |
if (inputFilter.accept(stat.getPath())) { | |
if (recursive && stat.isDirectory()) { | |
addInputPathRecursively(result, fs, stat.getPath(), inputFilter); | |
} else { | |
result.add(stat); | |
} | |
} | |
} | |
} else { | |
result.add(globStat); | |
} | |
} | |
} | |
} | |
if (!errors.isEmpty()) { | |
throw new InvalidInputException(errors); | |
} | |
LOG.info("Total input paths to process : " + result.size()); | |
return result; | |
} | |
/** | |
* Proxy PathFilter that accepts a path only if all filters given in the | |
* constructor do. Used by the listPaths() to apply the built-in | |
* hiddenFileFilter together with a user provided one (if any). | |
*/ | |
private static class MultiPathFilter implements PathFilter { | |
private List<PathFilter> filters; | |
public MultiPathFilter(List<PathFilter> filters) { | |
this.filters = filters; | |
} | |
public boolean accept(Path path) { | |
for (PathFilter filter : filters) { | |
if (!filter.accept(path)) { | |
return false; | |
} | |
} | |
return true; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment