Last active
August 29, 2015 14:22
-
-
Save asimjalis/2ee37063a38a8a2c2c0c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
import org.apache.crunch.DoFn; | |
import org.apache.crunch.Emitter; | |
import org.apache.crunch.PipelineResult; | |
import org.apache.crunch.io.From; | |
import org.apache.crunch.io.To; | |
import org.apache.crunch.types.writable.Writables; | |
import org.apache.crunch.util.CrunchTool; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.util.ToolRunner; | |
public class RegexCounter extends CrunchTool { | |
private static final long serialVersionUID = 1L; | |
public static DoFn<String,String> filterWithRegex(final String regex) { | |
return new DoFn<String,String>() { | |
private static final long serialVersionUID = 1L; | |
Pattern pattern = Pattern.compile(regex); | |
@Override | |
public void process(String line, Emitter<String> emitter) { | |
Matcher matcher = pattern.matcher(line); | |
if (matcher.find()) { | |
String matcherGroup = matcher.group(); | |
emitter.emit(matcherGroup); | |
} | |
} | |
}; | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
validateArgs(args); | |
int maximumResults = Integer.parseInt(getConf().get("maximumresults", "20")); | |
String regex = getConf().get("regex", ""); | |
// Create counts, get top N lines, write out | |
read(From.textFile(args[0])) | |
.parallelDo("output most popular lines", | |
filterWithRegex(regex), | |
Writables.strings()) | |
.count() | |
.top(maximumResults) | |
.write(To.textFile(args[1])); | |
PipelineResult result = done(); | |
return result.succeeded() ? 0 : 1; | |
} | |
private void validateArgs(String[] args) { | |
if (args.length != 2) { | |
System.out.printf("Usage: RegexCounter <input dir> <output dir>\n"); | |
System.exit(-1); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new Configuration(), new RegexCounter(), args); | |
System.exit(exitCode); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment