Skip to content

Instantly share code, notes, and snippets.

@f-ewald
Last active August 29, 2015 14:21
Show Gist options
  • Save f-ewald/15a07050349ff520fe32 to your computer and use it in GitHub Desktop.
Save f-ewald/15a07050349ff520fe32 to your computer and use it in GitHub Desktop.
ex64
package ex64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FilterDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
/*
* Validate that two arguments were passed from the command line.
*/
if (args.length != 4) {
System.out.printf("Usage: WordCountDriver <input dir> <output dir> <user_a> <num of reducers>\n");
System.exit(-1);
}
// Configuration processed by ToolRunner
Configuration conf = getConf();
// Notify Hadoop that application uses GenericOptionsParser
// This is not required but prevents that a warning is printed during execution
conf.set("mapred.used.genericoptionsparser", "true");
// Save the user we are looking for.
conf.set("user_a", args[2]);
// Create a Job using the processed conf
Job job = Job.getInstance(conf);
// Define Input and Output Format
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Define Map Output Classes (Key, Value)
// We don't have to define this as it is the same as the Job Output.
// But if it is not the same, you have to define it!
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// Define Job Output Classes (Key, Value)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// Set Mapper and Reducer Class
job.setMapperClass(FilterMapper.class);
job.setReducerClass(FilterReducer.class);
// For WordCount we can use the Reducer class also as Combiner class
job.setCombinerClass(FilterReducer.class);
// Set the Number of Reduce Tasks
job.setNumReduceTasks(Integer.parseInt(args[3]));
/*
* Specify the jar file that contains your driver, mapper, and reducer.
* Hadoop will transfer this jar file to nodes in your cluster running
* mapper and reducer tasks.
*/
job.setJarByClass(FilterDriver.class);
/*
* Specify an easily-decipherable name for the job.
* This job name will appear in reports and logs.
*/
job.setJobName("FilterUser");
/*
* Start the MapReduce job and wait for it to finish. If it finishes
* successfully, return 0. If not, return 1.
*/
return job.waitForCompletion(true) ? 0 : 1;
}
public int r(String[] args) throws Exception {
int exitCode = ToolRunner.run(new FilterDriver(), args);
return exitCode;
}
}
package ex64;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FilterMapper extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// Split the string by spaces
String[] s = value.toString().split(" ");
// Load the configuration to filter by user_a
Configuration conf = context.getConfiguration();
if (s[1].equals("foaf:knows") && s[0].equals(conf.get("user_a"))) {
// Only write edges with Subject user_a
context.write(new Text(s[0]), new Text(s[2]));
}
}
}
package ex64;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FilterReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// Get iterator vor values
Iterator<Text> it = values.iterator();
// As long as the iterator has a next value...
while (it.hasNext()) {
// ... write this to the output
context.write(key, it.next());
}
}
}
package ex64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class PathDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
/*
* Validate that two arguments were passed from the command line.
*/
if (args.length != 6) {
System.out.printf("Usage: WordCountDriver <base_dir> <input dir> <output dir> <user_a> <user_b> <num of reducers>\n");
System.exit(-1);
}
// Configuration processed by ToolRunner
Configuration conf = getConf();
// Notify Hadoop that application uses GenericOptionsParser
// This is not required but prevents that a warning is printed during execution
conf.set("mapred.used.genericoptionsparser", "true");
// Set the user_a and user_b
conf.set("user_a", args[3]);
conf.set("user_b", args[4]);
// Create a Job using the processed conf
Job job = Job.getInstance(conf);
// Define Input and Output Format
// Use multiple mappers to extract from base file and formatted file
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, PathMapperOriginal.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PathMapperTemp.class);
//FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
// Define Job Output Classes (Key, Value)
job.setOutputKeyClass(Text.class); //!
job.setOutputValueClass(Text.class); //!
// Set Mapper and Reducer Class
//job.setMapperClass(FilterMapper.class);
job.setReducerClass(PathReducer.class); //!
// For WordCount we can use the Reducer class also as Combiner class
//job.setCombinerClass(PathReducer.class);
// Set the Number of Reduce Tasks
job.setNumReduceTasks(Integer.parseInt(args[5]));
/*
* Specify the jar file that contains your driver, mapper, and reducer.
* Hadoop will transfer this jar file to nodes in your cluster running
* mapper and reducer tasks.
*/
job.setJarByClass(PathDriver.class);
/*
* Specify an easily-decipherable name for the job.
* This job name will appear in reports and logs.
*/
job.setJobName("FindPathFromUserToUser");
/*
* Start the MapReduce job and wait for it to finish. If it finishes
* successfully, return 0. If not, return 1.
*/
return job.waitForCompletion(true) ? 0 : 1;
}
public int r(String[] args) throws Exception {
int exitCode = ToolRunner.run(new PathDriver(), args);
return exitCode;
}
}
package ex64;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class PathMapperOriginal extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// Split the original file and use only "foaf" edges
String[] s = value.toString().split(" ");
if (s[1].equals("foaf:knows")) {
// Post and mark as o(riginal)
context.write(new Text(s[0]), new Text("o" + s[2]));
}
}
}
package ex64;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class PathMapperTemp extends Mapper<Object, Text, Text, Text> {
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// Split by tabstop
String[] s = value.toString().split("\t");
// Split the second part by comma
String[] p = s[1].split(",");
// Flip the key
String newKey = p[p.length-1];
String ipath = "";
// Build new string
for(int i = 0; i < p.length-1; i++) {
ipath += p[i] + ",";
}
// Write and mark i(nverted)
context.write(new Text(newKey), new Text("i" + ipath + s[0]));
}
}
package ex64;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class PathReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// Load the configuration to know for what user path we are looking for
Configuration conf = context.getConfiguration();
String user_a = conf.get("user_a");
Iterator<Text> it = values.iterator();
ArrayList<String> path = new ArrayList<String>();
ArrayList<String> knows = new ArrayList<String>();
// Iterate over the values
while (it.hasNext()) {
String val = it.next().toString();
// If the value starts with o it is original and needs to be added to the end of the line
if(val.startsWith("o")) {
knows.add(val.substring(1));
} else if(val.startsWith("i")) {
// If it starts with "i" its inverse, needs to be flipped and represents the existing path to that user
String[] path_users = val.substring(1).split(",");
String old_key = path_users[path_users.length-1];
String curr_path = val.substring(1).replaceAll(path_users[path_users.length-1], key.toString());
// Only add to the path if it starts with the user we are looking for.
if(old_key.equals(user_a)) {
path.add(curr_path);
}
}
}
// Now loop for every path every user
for(String path_item: path) {
for(String know_item: knows) {
// If we found our path we need to mark the line to identify this in the "Runner"
if (know_item.equals(conf.get("user_b"))) {
int length = path_item.split(",").length + 1;
context.write(new Text("!!!" + user_a), new Text(path_item + "," + know_item + " Distance: " + length));
}
// Else we just output it for the next iteration
else {
// Remove the loops
if(!user_a.equals(know_item)) {
context.write(new Text(user_a), new Text(path_item + "," + know_item));
}
}
}
}
}
}
package ex64;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
public class Runner extends Configured {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
if (args.length != 6) {
System.out.printf("Usage: PathFinder <input dir> <tmp dir> <output dir> <user_a> <user_b> <num of reducers>\n");
System.exit(-1);
}
String input_dir = args[0];
String tmp_dir = args[1];
String output_dir = args[2];
String user_a = args[3];
String user_b = args[4];
Path path_filter = Paths.get(tmp_dir, "out_filtered");
// First we filter only the foaf:knows edges to a temp dir.
String[] args_mod = new String[]{input_dir, path_filter.toString(), user_a, args[5]};
FilterDriver f = new FilterDriver();
f.r(args_mod);
int i = 0;
// Then use this path as current input directory
String current_input_dir = path_filter.toString();
// Loop at most 10 times over the data so that we are able to find a path with the depth of 10.
while(i < 10) {
i++;
// Mark that we didn't find a path yet.
boolean foundPath = false;
// Use a temporary output dir
String current_output_dir = Paths.get(tmp_dir, "path", String.valueOf(i)).toString();
// Instantiate and run the path driver with the base data dir (aka input dir) and the temp dir.
PathDriver pd = new PathDriver();
pd.r(new String[] {input_dir, current_input_dir, current_output_dir, user_a, user_b, args[5]});
// Flip to output to input dir.
current_input_dir = current_output_dir;
// Get all the files of the output directory
Configuration conf = new Configuration();
String defaultFs = conf.get("fs.defaultFS");
File directory = new File(defaultFs + current_output_dir);
FileSystem fs = FileSystem.get(new Configuration());
FileStatus[] status = fs.listStatus(new org.apache.hadoop.fs.Path(current_output_dir));
//File[] files = directory.listFiles();
// Loop over them file by file but omit the ".crc"-files
for(FileStatus file: status) {
if(!file.toString().endsWith("crc")) {
System.out.println(file.toString());
// Read all lines from every file.
/*org.apache.hadoop.fs.FSDataInputStream in = fs.open(file.getPath());
String str;
while((str = in.readUTF()) != null) {
System.out.println(str);
}*/
System.out.println("Testing");
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(file.getPath())));
String line;
line=br.readLine();
while (line != null){
if (line.startsWith("!!!")) {
System.out.println(line);
// Format the text to meet the requirement from the task
// and write it to the file
String[] part1 = line.substring(3).split("\t");
String[] part2 = part1[1].split(",");
String newLine = part1[0];
for(String partx: part2) {
newLine += " (foaf:knows) " + partx;
}
newLine = newLine.trim() + "\n";
org.apache.hadoop.fs.Path out = new org.apache.hadoop.fs.Path(output_dir + "/output");
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(out)));
bw.write(newLine);
bw.close();
// Mark as found
foundPath = true;
}
line=br.readLine();
}
br.close();
}
}
if (foundPath) {
return;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment