Created
October 12, 2015 00:42
-
-
Save louisje/08e3f39a1e7b01b8576a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package acloudsvc.hadoop.examples; | |
import java.io.BufferedReader; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.GenericOptionsParser; | |
import org.apache.log4j.Logger; | |
public class ImageHasher { | |
static String userName = "hadoop"; | |
public static class ThumbnailMapper extends Mapper<LongWritable, Text, Text, Text> { | |
static final Logger logger = Logger.getLogger(ThumbnailMapper.class); | |
@Override | |
protected void map(LongWritable key0, Text value0, | |
Mapper<LongWritable, Text, Text, Text>.Context context) | |
throws IOException, InterruptedException { | |
String row = value0.toString(); | |
String[] split = row.split(" ", 2); | |
String storyId = split[0]; | |
String fileName = split[1]; | |
logger.info("(" + storyId + ", " + fileName + ")"); | |
String hdfsPath = "/user/" + userName + "/thumbnails/" + fileName; | |
String localPath = "/home/" + userName + "/hadoop/tmp/" + fileName; | |
Path src = new Path(hdfsPath); | |
Path dest = new Path(localPath); | |
FileSystem fs = FileSystem.get(new Configuration()); | |
fs.copyToLocalFile(src, dest); | |
Runtime runtime = Runtime.getRuntime(); | |
String[] args = { "/home/hadoop/blockhash", localPath }; | |
Process process = runtime.exec(args); | |
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
BufferedReader err = new BufferedReader(new InputStreamReader(process.getErrorStream())); | |
process.waitFor(); | |
while (true) { | |
String line = err.readLine(); | |
if (line == null) break; | |
System.out.println("ERR: " + line); | |
} | |
IOUtils.copy(process.getInputStream(), baos); | |
if (baos.size() > 0) { | |
String blockhashOutput = baos.toString().trim(); | |
String[] splited = blockhashOutput.split(" ", 2); | |
if (splited.length < 2) return; | |
String hashCode = splited[1]; | |
if (hashCode.isEmpty()) return; | |
Text key = new Text(); | |
key.set(hashCode); | |
Text value = new Text(); | |
value.set(storyId); | |
System.out.println("map => (" + hashCode + ", " + storyId + ")"); | |
context.write(key, value); | |
} | |
} | |
} | |
public static class HashReducer extends Reducer<Text, Text, Text, Text> { | |
@Override | |
protected void reduce(Text hashCode, Iterable<Text> storyIds, | |
Reducer<Text, Text, Text, Text>.Context context) | |
throws IOException, InterruptedException { | |
String merged = ""; | |
for (Text storyId : storyIds) { | |
merged += storyId.toString() + " "; | |
} | |
System.out.println("reduce => (" + hashCode.toString() + ", " + merged + ")"); | |
Text result = new Text(); | |
result.set(merged); | |
context.write(hashCode, result); | |
} | |
} | |
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { | |
Configuration conf = new Configuration(); | |
String[] remains = new GenericOptionsParser(conf, args).getRemainingArgs(); | |
if (remains.length > 0) | |
userName = remains[0]; | |
Job job = Job.getInstance(); | |
job.setJarByClass(ImageHasher.class); | |
job.setMapperClass(ThumbnailMapper.class); | |
job.setCombinerClass(HashReducer.class); | |
job.setReducerClass(HashReducer.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(Text.class); | |
FileInputFormat.addInputPath(job, new Path("input")); | |
FileOutputFormat.setOutputPath(job, new Path("output")); | |
System.exit(job.waitForCompletion(true) ? 0 : 1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment