Last active
December 16, 2015 16:49
-
-
Save jizhang/5466149 to your computer and use it in GitHub Desktop.
Simple filter-count hadooop map-reduce job written in Java vs. Clojure.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns uv.raw | |
(:require [clojure-hadoop.gen :as gen] | |
[clojure-hadoop.imports :as imp]) | |
(:import [org.apache.hadoop.util Tool] | |
[com.hadoop.mapreduce LzoTextInputFormat])) | |
(imp/import-conf) | |
(imp/import-fs) | |
(imp/import-io) | |
(imp/import-mapreduce) | |
(imp/import-mapreduce-lib) | |
(gen/gen-job-classes) | |
(gen/gen-main-method) | |
(def cnt (atom 0)) | |
(defn mapper-map | |
[this key ^Text value context] | |
(let [^String log (.toString value)] | |
(when (.contains log "\"site\":\"anjuke\"") | |
(swap! cnt inc)))) | |
(defn mapper-cleanup | |
[this ^MapContext context] | |
(.write context (LongWritable. 0) (LongWritable. @cnt))) | |
(defn reducer-reduce | |
[this key values ^ReduceContext context] | |
(let [sum (reduce + (map (fn [^LongWritable v] (.get v)) values))] | |
(.write context key (LongWritable. sum)))) | |
(defn tool-run | |
[^Tool this args] | |
(let [conf (.getConf this) | |
output-path (Path. "/tmp/uv-test")] | |
(.delete (FileSystem/get conf) output-path true) | |
(doto (Job. (.getConf this)) | |
(.setJarByClass (.getClass this)) | |
(.setJobName "uv-raw") | |
(.setOutputKeyClass LongWritable) | |
(.setOutputValueClass LongWritable) | |
(.setMapperClass (Class/forName "uv.raw_mapper")) | |
(.setReducerClass (Class/forName "uv.raw_reducer")) | |
(.setInputFormatClass TextInputFormat) | |
(FileInputFormat/setInputPaths "test_logs/soj") | |
(.setOutputFormatClass TextOutputFormat) | |
(FileOutputFormat/setOutputPath output-path) | |
(.waitForCompletion true))) | |
0) | |
(comment | |
Average time taken by Map tasks: 19sec | |
Average time taken by Shuffle: 38sec | |
Average time taken by Reduce tasks: 8sec | |
13/04/26 17:33:38 INFO input.FileInputFormat: Total input paths to process : 96 | |
13/04/26 17:33:38 INFO mapred.JobClient: Running job: job_201304011713_3891 | |
13/04/26 17:33:39 INFO mapred.JobClient: map 0% reduce 0% | |
13/04/26 17:34:11 INFO mapred.JobClient: map 2% reduce 0% | |
13/04/26 17:34:14 INFO mapred.JobClient: map 5% reduce 0% | |
13/04/26 17:34:15 INFO mapred.JobClient: map 10% reduce 0% | |
13/04/26 17:34:16 INFO mapred.JobClient: map 11% reduce 0% | |
13/04/26 17:34:17 INFO mapred.JobClient: map 17% reduce 0% | |
13/04/26 17:34:18 INFO mapred.JobClient: map 35% reduce 0% | |
13/04/26 17:34:19 INFO mapred.JobClient: map 45% reduce 0% | |
13/04/26 17:34:25 INFO mapred.JobClient: map 45% reduce 15% | |
13/04/26 17:34:26 INFO mapred.JobClient: map 46% reduce 15% | |
13/04/26 17:34:32 INFO mapred.JobClient: map 48% reduce 15% | |
13/04/26 17:34:33 INFO mapred.JobClient: map 49% reduce 15% | |
13/04/26 17:34:35 INFO mapred.JobClient: map 53% reduce 15% | |
13/04/26 17:34:36 INFO mapred.JobClient: map 56% reduce 15% | |
13/04/26 17:34:37 INFO mapred.JobClient: map 60% reduce 15% | |
13/04/26 17:34:38 INFO mapred.JobClient: map 64% reduce 15% | |
13/04/26 17:34:39 INFO mapred.JobClient: map 71% reduce 15% | |
13/04/26 17:34:40 INFO mapred.JobClient: map 76% reduce 16% | |
13/04/26 17:34:41 INFO mapred.JobClient: map 77% reduce 16% | |
13/04/26 17:34:42 INFO mapred.JobClient: map 87% reduce 16% | |
13/04/26 17:34:43 INFO mapred.JobClient: map 90% reduce 16% | |
13/04/26 17:34:44 INFO mapred.JobClient: map 93% reduce 16% | |
13/04/26 17:34:45 INFO mapred.JobClient: map 95% reduce 16% | |
13/04/26 17:34:48 INFO mapred.JobClient: map 96% reduce 16% | |
13/04/26 17:34:49 INFO mapred.JobClient: map 97% reduce 25% | |
13/04/26 17:34:51 INFO mapred.JobClient: map 100% reduce 25% | |
13/04/26 17:34:55 INFO mapred.JobClient: map 100% reduce 32% | |
13/04/26 17:34:58 INFO mapred.JobClient: map 100% reduce 66% | |
13/04/26 17:35:04 INFO mapred.JobClient: map 100% reduce 100% | |
13/04/26 17:35:09 INFO mapred.JobClient: Job complete: job_201304011713_3891 | |
13/04/26 17:35:09 INFO mapred.JobClient: Counters: 30 | |
13/04/26 17:35:09 INFO mapred.JobClient: Job Counters | |
13/04/26 17:35:09 INFO mapred.JobClient: Launched reduce tasks=1 | |
13/04/26 17:35:09 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=1727856 | |
13/04/26 17:35:09 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 | |
13/04/26 17:35:09 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 | |
13/04/26 17:35:09 INFO mapred.JobClient: Rack-local map tasks=1 | |
13/04/26 17:35:09 INFO mapred.JobClient: Launched map tasks=89 | |
13/04/26 17:35:09 INFO mapred.JobClient: Data-local map tasks=88 | |
13/04/26 17:35:09 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=47089 | |
13/04/26 17:35:09 INFO mapred.JobClient: File Output Format Counters | |
13/04/26 17:35:09 INFO mapred.JobClient: Bytes Written=10 | |
13/04/26 17:35:09 INFO mapred.JobClient: FileSystemCounters | |
13/04/26 17:35:09 INFO mapred.JobClient: FILE_BYTES_READ=488 | |
13/04/26 17:35:09 INFO mapred.JobClient: HDFS_BYTES_READ=4331616258 | |
13/04/26 17:35:09 INFO mapred.JobClient: FILE_BYTES_WRITTEN=1966598 | |
13/04/26 17:35:09 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=10 | |
13/04/26 17:35:09 INFO mapred.JobClient: File Input Format Counters | |
13/04/26 17:35:09 INFO mapred.JobClient: Bytes Read=4331603306 | |
13/04/26 17:35:09 INFO mapred.JobClient: Map-Reduce Framework | |
13/04/26 17:35:09 INFO mapred.JobClient: Map output materialized bytes=3168 | |
13/04/26 17:35:09 INFO mapred.JobClient: Map input records=11517118 | |
13/04/26 17:35:09 INFO mapred.JobClient: Reduce shuffle bytes=3132 | |
13/04/26 17:35:09 INFO mapred.JobClient: Spilled Records=176 | |
13/04/26 17:35:09 INFO mapred.JobClient: Map output bytes=1408 | |
13/04/26 17:35:09 INFO mapred.JobClient: CPU time spent (ms)=1145440 | |
13/04/26 17:35:09 INFO mapred.JobClient: Total committed heap usage (bytes)=90363527168 | |
13/04/26 17:35:09 INFO mapred.JobClient: Combine input records=0 | |
13/04/26 17:35:09 INFO mapred.JobClient: SPLIT_RAW_BYTES=12760 | |
13/04/26 17:35:09 INFO mapred.JobClient: Reduce input records=88 | |
13/04/26 17:35:09 INFO mapred.JobClient: Reduce input groups=1 | |
13/04/26 17:35:09 INFO mapred.JobClient: Combine output records=0 | |
13/04/26 17:35:09 INFO mapred.JobClient: Physical memory (bytes) snapshot=50453352448 | |
13/04/26 17:35:09 INFO mapred.JobClient: Reduce output records=1 | |
13/04/26 17:35:09 INFO mapred.JobClient: Virtual memory (bytes) snapshot=129966493696 | |
13/04/26 17:35:09 INFO mapred.JobClient: Map output records=88 | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ops.test.uv; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
import com.hadoop.mapreduce.LzoTextInputFormat; | |
public class App extends Configured implements Tool { | |
public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> { | |
private long count = 0; | |
@Override | |
protected void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
String raw = value.toString(); | |
if (raw.contains("\"site\":\"anjuke\"")) { | |
++count; | |
} | |
} | |
@Override | |
protected void cleanup(Context context) throws IOException, | |
InterruptedException { | |
context.write(new LongWritable(0), new LongWritable(count)); | |
} | |
} | |
public static class MyReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> { | |
@Override | |
protected void reduce(LongWritable key, Iterable<LongWritable> values, Context context) | |
throws IOException, InterruptedException { | |
long sum = 0; | |
for (LongWritable value : values) { | |
sum += value.get(); | |
} | |
context.write(new LongWritable(0), new LongWritable(sum)); | |
} | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
Job job = new Job(getConf()); | |
job.setJarByClass(App.class); | |
job.setJobName("uv-java"); | |
job.setOutputKeyClass(LongWritable.class); | |
job.setOutputValueClass(LongWritable.class); | |
job.setMapperClass(MyMapper.class); | |
job.setReducerClass(MyReducer.class); | |
job.setInputFormatClass(TextInputFormat.class); | |
FileInputFormat.setInputPaths(job, "test_logs/soj"); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
FileOutputFormat.setOutputPath(job, new Path("/tmp/uv-test")); | |
FileOutputFormat.setCompressOutput(job, false); | |
FileSystem.get(job.getConfiguration()).delete(FileOutputFormat.getOutputPath(job), true); | |
job.submit(); | |
return job.waitForCompletion(true) ? 0 : 2; | |
} | |
public static void main(String[] args) throws Exception { | |
System.exit(ToolRunner.run(new Configuration(), new App(), args)); | |
} | |
} | |
/* | |
Average time taken by Map tasks: 7sec | |
Average time taken by Shuffle: 18sec | |
Average time taken by Reduce tasks: 4sec | |
13/04/26 17:23:01 INFO input.FileInputFormat: Total input paths to process : 96 | |
13/04/26 17:23:02 INFO mapred.JobClient: Running job: job_201304011713_3880 | |
13/04/26 17:23:03 INFO mapred.JobClient: map 0% reduce 0% | |
13/04/26 17:23:25 INFO mapred.JobClient: map 6% reduce 0% | |
13/04/26 17:23:26 INFO mapred.JobClient: map 22% reduce 0% | |
13/04/26 17:23:27 INFO mapred.JobClient: map 34% reduce 0% | |
13/04/26 17:23:28 INFO mapred.JobClient: map 38% reduce 0% | |
13/04/26 17:23:29 INFO mapred.JobClient: map 45% reduce 0% | |
13/04/26 17:23:31 INFO mapred.JobClient: map 51% reduce 0% | |
13/04/26 17:23:32 INFO mapred.JobClient: map 52% reduce 0% | |
13/04/26 17:23:33 INFO mapred.JobClient: map 55% reduce 0% | |
13/04/26 17:23:34 INFO mapred.JobClient: map 61% reduce 0% | |
13/04/26 17:23:35 INFO mapred.JobClient: map 79% reduce 17% | |
13/04/26 17:23:36 INFO mapred.JobClient: map 85% reduce 17% | |
13/04/26 17:23:37 INFO mapred.JobClient: map 90% reduce 17% | |
13/04/26 17:23:38 INFO mapred.JobClient: map 95% reduce 17% | |
13/04/26 17:23:39 INFO mapred.JobClient: map 100% reduce 17% | |
13/04/26 17:23:44 INFO mapred.JobClient: map 100% reduce 23% | |
13/04/26 17:23:50 INFO mapred.JobClient: map 100% reduce 100% | |
13/04/26 17:23:55 INFO mapred.JobClient: Job complete: job_201304011713_3880 | |
13/04/26 17:23:55 INFO mapred.JobClient: Counters: 29 | |
13/04/26 17:23:55 INFO mapred.JobClient: Job Counters | |
13/04/26 17:23:55 INFO mapred.JobClient: Launched reduce tasks=1 | |
13/04/26 17:23:55 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=684600 | |
13/04/26 17:23:55 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 | |
13/04/26 17:23:55 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 | |
13/04/26 17:23:55 INFO mapred.JobClient: Launched map tasks=88 | |
13/04/26 17:23:55 INFO mapred.JobClient: Data-local map tasks=88 | |
13/04/26 17:23:55 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=23243 | |
13/04/26 17:23:55 INFO mapred.JobClient: File Output Format Counters | |
13/04/26 17:23:55 INFO mapred.JobClient: Bytes Written=10 | |
13/04/26 17:23:55 INFO mapred.JobClient: FileSystemCounters | |
13/04/26 17:23:55 INFO mapred.JobClient: FILE_BYTES_READ=489 | |
13/04/26 17:23:55 INFO mapred.JobClient: HDFS_BYTES_READ=4331616258 | |
13/04/26 17:23:55 INFO mapred.JobClient: FILE_BYTES_WRITTEN=1968646 | |
13/04/26 17:23:55 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=10 | |
13/04/26 17:23:55 INFO mapred.JobClient: File Input Format Counters | |
13/04/26 17:23:55 INFO mapred.JobClient: Bytes Read=4331603306 | |
13/04/26 17:23:55 INFO mapred.JobClient: Map-Reduce Framework | |
13/04/26 17:23:55 INFO mapred.JobClient: Map output materialized bytes=3168 | |
13/04/26 17:23:55 INFO mapred.JobClient: Map input records=11517118 | |
13/04/26 17:23:55 INFO mapred.JobClient: Reduce shuffle bytes=3132 | |
13/04/26 17:23:55 INFO mapred.JobClient: Spilled Records=176 | |
13/04/26 17:23:55 INFO mapred.JobClient: Map output bytes=1408 | |
13/04/26 17:23:55 INFO mapred.JobClient: CPU time spent (ms)=244000 | |
13/04/26 17:23:55 INFO mapred.JobClient: Total committed heap usage (bytes)=90469695488 | |
13/04/26 17:23:55 INFO mapred.JobClient: Combine input records=0 | |
13/04/26 17:23:55 INFO mapred.JobClient: SPLIT_RAW_BYTES=12760 | |
13/04/26 17:23:55 INFO mapred.JobClient: Reduce input records=88 | |
13/04/26 17:23:55 INFO mapred.JobClient: Reduce input groups=1 | |
13/04/26 17:23:55 INFO mapred.JobClient: Combine output records=0 | |
13/04/26 17:23:55 INFO mapred.JobClient: Physical memory (bytes) snapshot=47529811968 | |
13/04/26 17:23:55 INFO mapred.JobClient: Reduce output records=1 | |
13/04/26 17:23:55 INFO mapred.JobClient: Virtual memory (bytes) snapshot=128776970240 | |
13/04/26 17:23:55 INFO mapred.JobClient: Map output records=88 | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment