Skip to content

Instantly share code, notes, and snippets.

@RicherMans
Created November 11, 2013 16:06
Show Gist options
  • Select an option

  • Save RicherMans/7415635 to your computer and use it in GitHub Desktop.

Select an option

Save RicherMans/7415635 to your computer and use it in GitHub Desktop.
PageRank.java
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class PageRank {
public static class Reduce extends MapReduceBase
implements
org.apache.hadoop.mapred.Reducer<Text, PRWritable, LongWritable, Text> {
@Override
public void reduce(Text url, Iterator<PRWritable> weight_url,
OutputCollector<LongWritable, Text> arg2, Reporter arg3)
throws IOException {
long pr = 0;
while(weight_url.hasNext()){
PRWritable pr_w = weight_url.next();
pr = pr + pr_w.getPr().get();
}
arg2.collect(new LongWritable(pr), new Text(url));
}
}
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, PRWritable> {
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, PRWritable> context, Reporter arg3)
throws IOException {
Pattern pat_title = Pattern.compile("&lttitle&gt(.+)&lt/title&gt");
Pattern pat_links = Pattern.compile("\\[\\[(.+?)\\]\\]");
StringTokenizer tok = new StringTokenizer(value.toString(), "\t");
HashMap<String, List<String>> title_line = new HashMap<>();
while (tok.hasMoreTokens()) {
String tline = tok.nextToken();
Matcher match = pat_title.matcher(tline);
while (match.find()) {
List<String> links = new ArrayList<>();
// finding title , and every link in that title
Matcher links_mat = pat_links.matcher(tline);
while(links_mat.find()) {
links.add(links_mat.group(1).toLowerCase());
}
title_line.put(match.group(1).toLowerCase(), links);
}
}
int size_out = title_line.values().size();
long page_rank = new java.util.Random().nextInt(100);
if (size_out > 1) {
page_rank = (key.get() / size_out);
}
Iterator<Entry<String, List<String>>> it = title_line.entrySet()
.iterator();
while (it.hasNext()) {
Entry<String, List<String>> entry = it.next();
context.collect(new Text(entry.getKey()), new PRWritable(
page_rank, entry.getValue()));
}
}
/*
* map ((url,PR), out_links) //PR = random at start for link in
* out_links emit(link, ((PR/size(out_links)), url))
*
*
*/
}
static String HDFS_PREFIX = "hdfs://localhost:9000";
public static void main(String args[]) throws IOException,
ClassNotFoundException, InterruptedException {
JobConf conf = new JobConf(PageRank.class);
conf.setJobName("Pagerank");
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
if(args.length>2){
HDFS_PREFIX = args[2];
}
Path p = new Path(HDFS_PREFIX + args[0]);
FileInputFormat.setInputPaths(conf, p);
FileOutputFormat.setOutputPath(conf, new Path(HDFS_PREFIX + args[1]
+ new java.util.Random().nextInt()));
JobClient.runJob(conf);
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class PRWritable implements Writable{
private LongWritable pr;
private List<Text> all_links;
public PRWritable() {
super();
this.pr = new LongWritable();
this.all_links = new ArrayList<>();
}
public PRWritable(LongWritable pr, List<Text> url) {
super();
this.pr = pr;
this.all_links = url;
}
public PRWritable(long pr, List<String> urls){
this.pr = new LongWritable(pr);
this.all_links = new ArrayList<>();
for(String url : urls){
all_links.add(new Text(url));
}
}
@Override
public void readFields(DataInput arg0) throws IOException {
pr.readFields(arg0);
for(Text t: all_links){
t.readFields(arg0);
}
}
@Override
public void write(DataOutput arg0) throws IOException {
pr.write(arg0);
for(Text t : all_links){
t.write(arg0);
}
}
public LongWritable getPr() {
return pr;
}
public void setPr(LongWritable pr) {
this.pr = pr;
}
public List<Text> getUrls() {
return all_links;
}
public void setUrls(List<Text> url) {
this.all_links = url;
}
}
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;
import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;
public class WikiXMLParser {
private static SAXParserFactory factory;
private static SAXParser parser;
//
public static Page parseXML(String file){
factory = SAXParserFactory.newInstance();
try {
parser = factory.newSAXParser();
parser.parse(file, new PageHandler());
} catch (ParserConfigurationException | SAXException e) {
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
/**
* Searches for [[LINK_NAME]] occurances in Text argument and returns every
* occurrance
*
* Useage:
*
* @author richman
*
*/
private static class TextParser {
private static TextParser textParser;
private Pattern pat;
// default pattern for wikipedia link parsing
private String pattern = "[[.*]]";
private TextParser() {
}
public static TextParser getInstance() {
if (textParser == null)
textParser = new TextParser();
return textParser;
}
public void setPattern(String pattern) {
this.pattern = pattern;
}
public List<String> parse(String text) {
pat = Pattern.compile(pattern);
Matcher mat = pat.matcher(text);
while (mat.find()) {
String group = mat.group();
System.out.println(group);
}
return null;
}
}
private static class PageHandler extends DefaultHandler {
private Deque<Page> pages = new ArrayDeque<>();
private Page curPage = null;
private String tmpT = null;
@Override
public void endElement(String uri, String localName, String qName)
throws SAXException {
switch (qName) {
case "title":
curPage.pageName = tmpT;
break;
case "text":
curPage.out_links.addAll(TextParser.getInstance().parse(tmpT));
break;
}
if (qName.equalsIgnoreCase("page"))
pages.add(curPage);
}
@Override
public void startElement(String uri, String localName, String qName,
Attributes attributes) throws SAXException {
if (qName.equalsIgnoreCase("page")) {
curPage = new Page();
}
}
@Override
public void characters(char[] ch, int start, int length)
throws SAXException {
tmpT = new String(ch, start, length);
}
}
public static class Page {
private String pageName;
private Deque<String> out_links;
Page() {
pageName = new String();
out_links = new ArrayDeque<>();
}
}
}
@RicherMans
Copy link
Author

Assignment 3 in Multicore Programming : Hadoop Pagerank

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment