Last active
August 29, 2015 14:21
-
-
Save f-ewald/15a07050349ff520fe32 to your computer and use it in GitHub Desktop.
ex64
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ex64; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class FilterDriver extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
/* | |
* Validate that two arguments were passed from the command line. | |
*/ | |
if (args.length != 4) { | |
System.out.printf("Usage: WordCountDriver <input dir> <output dir> <user_a> <num of reducers>\n"); | |
System.exit(-1); | |
} | |
// Configuration processed by ToolRunner | |
Configuration conf = getConf(); | |
// Notify Hadoop that application uses GenericOptionsParser | |
// This is not required but prevents that a warning is printed during execution | |
conf.set("mapred.used.genericoptionsparser", "true"); | |
// Save the user we are looking for. | |
conf.set("user_a", args[2]); | |
// Create a Job using the processed conf | |
Job job = Job.getInstance(conf); | |
// Define Input and Output Format | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
FileInputFormat.setInputPaths(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
// Define Map Output Classes (Key, Value) | |
// We don't have to define this as it is the same as the Job Output. | |
// But if it is not the same, you have to define it! | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(Text.class); | |
// Define Job Output Classes (Key, Value) | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(Text.class); | |
// Set Mapper and Reducer Class | |
job.setMapperClass(FilterMapper.class); | |
job.setReducerClass(FilterReducer.class); | |
// For WordCount we can use the Reducer class also as Combiner class | |
job.setCombinerClass(FilterReducer.class); | |
// Set the Number of Reduce Tasks | |
job.setNumReduceTasks(Integer.parseInt(args[3])); | |
/* | |
* Specify the jar file that contains your driver, mapper, and reducer. | |
* Hadoop will transfer this jar file to nodes in your cluster running | |
* mapper and reducer tasks. | |
*/ | |
job.setJarByClass(FilterDriver.class); | |
/* | |
* Specify an easily-decipherable name for the job. | |
* This job name will appear in reports and logs. | |
*/ | |
job.setJobName("FilterUser"); | |
/* | |
* Start the MapReduce job and wait for it to finish. If it finishes | |
* successfully, return 0. If not, return 1. | |
*/ | |
return job.waitForCompletion(true) ? 0 : 1; | |
} | |
public int r(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new FilterDriver(), args); | |
return exitCode; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ex64; | |
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class FilterMapper extends Mapper<Object, Text, Text, Text> { | |
@Override | |
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { | |
// Split the string by spaces | |
String[] s = value.toString().split(" "); | |
// Load the configuration to filter by user_a | |
Configuration conf = context.getConfiguration(); | |
if (s[1].equals("foaf:knows") && s[0].equals(conf.get("user_a"))) { | |
// Only write edges with Subject user_a | |
context.write(new Text(s[0]), new Text(s[2])); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ex64; | |
import java.io.IOException; | |
import java.util.Iterator; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Reducer; | |
public class FilterReducer extends Reducer<Text, Text, Text, Text> { | |
@Override | |
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { | |
// Get iterator vor values | |
Iterator<Text> it = values.iterator(); | |
// As long as the iterator has a next value... | |
while (it.hasNext()) { | |
// ... write this to the output | |
context.write(key, it.next()); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ex64; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class PathDriver extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
/* | |
* Validate that two arguments were passed from the command line. | |
*/ | |
if (args.length != 6) { | |
System.out.printf("Usage: WordCountDriver <base_dir> <input dir> <output dir> <user_a> <user_b> <num of reducers>\n"); | |
System.exit(-1); | |
} | |
// Configuration processed by ToolRunner | |
Configuration conf = getConf(); | |
// Notify Hadoop that application uses GenericOptionsParser | |
// This is not required but prevents that a warning is printed during execution | |
conf.set("mapred.used.genericoptionsparser", "true"); | |
// Set the user_a and user_b | |
conf.set("user_a", args[3]); | |
conf.set("user_b", args[4]); | |
// Create a Job using the processed conf | |
Job job = Job.getInstance(conf); | |
// Define Input and Output Format | |
// Use multiple mappers to extract from base file and formatted file | |
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, PathMapperOriginal.class); | |
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, PathMapperTemp.class); | |
//FileInputFormat.setInputPaths(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[2])); | |
// Define Job Output Classes (Key, Value) | |
job.setOutputKeyClass(Text.class); //! | |
job.setOutputValueClass(Text.class); //! | |
// Set Mapper and Reducer Class | |
//job.setMapperClass(FilterMapper.class); | |
job.setReducerClass(PathReducer.class); //! | |
// For WordCount we can use the Reducer class also as Combiner class | |
//job.setCombinerClass(PathReducer.class); | |
// Set the Number of Reduce Tasks | |
job.setNumReduceTasks(Integer.parseInt(args[5])); | |
/* | |
* Specify the jar file that contains your driver, mapper, and reducer. | |
* Hadoop will transfer this jar file to nodes in your cluster running | |
* mapper and reducer tasks. | |
*/ | |
job.setJarByClass(PathDriver.class); | |
/* | |
* Specify an easily-decipherable name for the job. | |
* This job name will appear in reports and logs. | |
*/ | |
job.setJobName("FindPathFromUserToUser"); | |
/* | |
* Start the MapReduce job and wait for it to finish. If it finishes | |
* successfully, return 0. If not, return 1. | |
*/ | |
return job.waitForCompletion(true) ? 0 : 1; | |
} | |
public int r(String[] args) throws Exception { | |
int exitCode = ToolRunner.run(new PathDriver(), args); | |
return exitCode; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ex64; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class PathMapperOriginal extends Mapper<Object, Text, Text, Text> { | |
@Override | |
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { | |
// Split the original file and use only "foaf" edges | |
String[] s = value.toString().split(" "); | |
if (s[1].equals("foaf:knows")) { | |
// Post and mark as o(riginal) | |
context.write(new Text(s[0]), new Text("o" + s[2])); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ex64; | |
import java.io.IOException; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
public class PathMapperTemp extends Mapper<Object, Text, Text, Text> { | |
@Override | |
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { | |
// Split by tabstop | |
String[] s = value.toString().split("\t"); | |
// Split the second part by comma | |
String[] p = s[1].split(","); | |
// Flip the key | |
String newKey = p[p.length-1]; | |
String ipath = ""; | |
// Build new string | |
for(int i = 0; i < p.length-1; i++) { | |
ipath += p[i] + ","; | |
} | |
// Write and mark i(nverted) | |
context.write(new Text(newKey), new Text("i" + ipath + s[0])); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ex64; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Iterator; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Reducer; | |
public class PathReducer extends Reducer<Text, Text, Text, Text> { | |
@Override | |
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { | |
// Load the configuration to know for what user path we are looking for | |
Configuration conf = context.getConfiguration(); | |
String user_a = conf.get("user_a"); | |
Iterator<Text> it = values.iterator(); | |
ArrayList<String> path = new ArrayList<String>(); | |
ArrayList<String> knows = new ArrayList<String>(); | |
// Iterate over the values | |
while (it.hasNext()) { | |
String val = it.next().toString(); | |
// If the value starts with o it is original and needs to be added to the end of the line | |
if(val.startsWith("o")) { | |
knows.add(val.substring(1)); | |
} else if(val.startsWith("i")) { | |
// If it starts with "i" its inverse, needs to be flipped and represents the existing path to that user | |
String[] path_users = val.substring(1).split(","); | |
String old_key = path_users[path_users.length-1]; | |
String curr_path = val.substring(1).replaceAll(path_users[path_users.length-1], key.toString()); | |
// Only add to the path if it starts with the user we are looking for. | |
if(old_key.equals(user_a)) { | |
path.add(curr_path); | |
} | |
} | |
} | |
// Now loop for every path every user | |
for(String path_item: path) { | |
for(String know_item: knows) { | |
// If we found our path we need to mark the line to identify this in the "Runner" | |
if (know_item.equals(conf.get("user_b"))) { | |
int length = path_item.split(",").length + 1; | |
context.write(new Text("!!!" + user_a), new Text(path_item + "," + know_item + " Distance: " + length)); | |
} | |
// Else we just output it for the next iteration | |
else { | |
// Remove the loops | |
if(!user_a.equals(know_item)) { | |
context.write(new Text(user_a), new Text(path_item + "," + know_item)); | |
} | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ex64; | |
import java.io.BufferedReader; | |
import java.io.BufferedWriter; | |
import java.io.File; | |
import java.io.FileWriter; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.io.OutputStreamWriter; | |
import java.io.StringWriter; | |
import java.nio.charset.Charset; | |
import java.nio.file.FileAlreadyExistsException; | |
import java.nio.file.Files; | |
import java.nio.file.Path; | |
import java.nio.file.Paths; | |
import java.util.ArrayList; | |
import java.util.List; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.*; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.io.*; | |
import org.apache.hadoop.io.compress.CompressionCodecFactory; | |
public class Runner extends Configured { | |
/** | |
* @param args | |
*/ | |
public static void main(String[] args) throws Exception { | |
if (args.length != 6) { | |
System.out.printf("Usage: PathFinder <input dir> <tmp dir> <output dir> <user_a> <user_b> <num of reducers>\n"); | |
System.exit(-1); | |
} | |
String input_dir = args[0]; | |
String tmp_dir = args[1]; | |
String output_dir = args[2]; | |
String user_a = args[3]; | |
String user_b = args[4]; | |
Path path_filter = Paths.get(tmp_dir, "out_filtered"); | |
// First we filter only the foaf:knows edges to a temp dir. | |
String[] args_mod = new String[]{input_dir, path_filter.toString(), user_a, args[5]}; | |
FilterDriver f = new FilterDriver(); | |
f.r(args_mod); | |
int i = 0; | |
// Then use this path as current input directory | |
String current_input_dir = path_filter.toString(); | |
// Loop at most 10 times over the data so that we are able to find a path with the depth of 10. | |
while(i < 10) { | |
i++; | |
// Mark that we didn't find a path yet. | |
boolean foundPath = false; | |
// Use a temporary output dir | |
String current_output_dir = Paths.get(tmp_dir, "path", String.valueOf(i)).toString(); | |
// Instantiate and run the path driver with the base data dir (aka input dir) and the temp dir. | |
PathDriver pd = new PathDriver(); | |
pd.r(new String[] {input_dir, current_input_dir, current_output_dir, user_a, user_b, args[5]}); | |
// Flip to output to input dir. | |
current_input_dir = current_output_dir; | |
// Get all the files of the output directory | |
Configuration conf = new Configuration(); | |
String defaultFs = conf.get("fs.defaultFS"); | |
File directory = new File(defaultFs + current_output_dir); | |
FileSystem fs = FileSystem.get(new Configuration()); | |
FileStatus[] status = fs.listStatus(new org.apache.hadoop.fs.Path(current_output_dir)); | |
//File[] files = directory.listFiles(); | |
// Loop over them file by file but omit the ".crc"-files | |
for(FileStatus file: status) { | |
if(!file.toString().endsWith("crc")) { | |
System.out.println(file.toString()); | |
// Read all lines from every file. | |
/*org.apache.hadoop.fs.FSDataInputStream in = fs.open(file.getPath()); | |
String str; | |
while((str = in.readUTF()) != null) { | |
System.out.println(str); | |
}*/ | |
System.out.println("Testing"); | |
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(file.getPath()))); | |
String line; | |
line=br.readLine(); | |
while (line != null){ | |
if (line.startsWith("!!!")) { | |
System.out.println(line); | |
// Format the text to meet the requirement from the task | |
// and write it to the file | |
String[] part1 = line.substring(3).split("\t"); | |
String[] part2 = part1[1].split(","); | |
String newLine = part1[0]; | |
for(String partx: part2) { | |
newLine += " (foaf:knows) " + partx; | |
} | |
newLine = newLine.trim() + "\n"; | |
org.apache.hadoop.fs.Path out = new org.apache.hadoop.fs.Path(output_dir + "/output"); | |
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(out))); | |
bw.write(newLine); | |
bw.close(); | |
// Mark as found | |
foundPath = true; | |
} | |
line=br.readLine(); | |
} | |
br.close(); | |
} | |
} | |
if (foundPath) { | |
return; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment