Created
July 1, 2013 08:20
-
-
Save aladagemre/5899199 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public Map<String,Object> run(Map<String,Object> args) throws Exception { | |
String crawlId = (String)args.get(Nutch.ARG_CRAWL); | |
numJobs = 1; | |
currentJobNum = 0; | |
currentJob = new NutchJob(getConf(), "update-table"); | |
if (crawlId != null) { | |
currentJob.getConfiguration().set(Nutch.CRAWL_ID_KEY, crawlId); | |
} | |
//job.setBoolean(ALL, updateAll); | |
ScoringFilters scoringFilters = new ScoringFilters(getConf()); | |
HashSet<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS); | |
fields.addAll(scoringFilters.getFields()); | |
// Partition by {url}, sort by {url,score} and group by {url}. | |
// This ensures that the inlinks are sorted by score when they enter | |
// the reducer. | |
currentJob.setPartitionerClass(UrlOnlyPartitioner.class); | |
currentJob.setSortComparatorClass(UrlScoreComparator.class); | |
currentJob.setGroupingComparatorClass(UrlOnlyComparator.class); | |
StorageUtils.initMapperJob(currentJob, fields, UrlWithScore.class, | |
NutchWritable.class, DbUpdateMapper.class); | |
StorageUtils.initReducerJob(currentJob, DbUpdateReducer.class); | |
currentJob.waitForCompletion(true); | |
LOG.info("LinkRank starts..."); | |
LinkRankJob2 linkRankJob = new LinkRankJob2(); | |
linkRankJob.run(null); | |
LOG.info("LinkRank has finished..."); | |
ToolUtil.recordJobStatus(null, currentJob, results); | |
return results; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.nutch.crawl; | |
import org.apache.giraph.conf.GiraphConfiguration; | |
import org.apache.giraph.edge.ByteArrayEdges; | |
import org.apache.giraph.examples.LinkRank.LinkRankComputation; | |
import org.apache.giraph.examples.LinkRank.LinkRankVertexMasterCompute; | |
import org.apache.giraph.examples.LinkRank.NutchTableEdgeInputFormat; | |
import org.apache.giraph.examples.LinkRank.NutchTableEdgeOutputFormat; | |
import org.apache.giraph.job.GiraphJob; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.Abortable; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.client.HBaseAdmin; | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; | |
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; | |
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.log4j.Logger; | |
public class LinkRankJob2 implements Tool { | |
private static final Logger LOG = Logger.getLogger(LinkRankJob2.class); | |
private GiraphConfiguration conf; | |
private static final String TABLE_NAME = "webpage"; | |
@Override | |
public int run(String[] strings) throws Exception { | |
Configuration config = HBaseConfiguration.create(); | |
config.clear(); | |
config.set("hbase.zookeeper.quorum", "localhost"); | |
config.set("hbase.zookeeper.property.clientPort", "2181"); | |
config.set("hbase.master", "localhost:60000"); | |
config.set("mapred.job.tracker", "localhost:9001"); | |
HBaseAdmin admin = new HBaseAdmin(config); | |
ZooKeeperWatcher zooKeeperWatcher = new ZooKeeperWatcher(config, "zkw", new Abortable() { | |
@Override | |
public void abort(String s, Throwable throwable) { | |
System.out.println(s); | |
} | |
}); | |
admin.getMaster(); | |
// Start the giraph job | |
GiraphConfiguration giraphConf = new GiraphConfiguration(config); //giraphJob.getConfiguration(); | |
giraphConf.setZooKeeperConfiguration( | |
zooKeeperWatcher.getQuorum()); | |
giraphConf.setComputationClass(LinkRankComputation.class); | |
giraphConf.setMasterComputeClass(LinkRankVertexMasterCompute.class); | |
giraphConf.setOutEdgesClass(ByteArrayEdges.class); | |
giraphConf.setVertexInputFormatClass(NutchTableEdgeInputFormat.class); | |
giraphConf.setVertexOutputFormatClass(NutchTableEdgeOutputFormat.class); | |
giraphConf.setInt("giraph.pageRank.superstepCount", 40); | |
giraphConf.setWorkerConfiguration(1, 1, 100.0f); | |
giraphConf.set(TableInputFormat.INPUT_TABLE, TABLE_NAME); | |
giraphConf.set(TableOutputFormat.OUTPUT_TABLE, TABLE_NAME); | |
GiraphJob giraphJob = new GiraphJob(giraphConf, "LinkRank2"); | |
return giraphJob.run(false) ? 0: -1; | |
} | |
@Override | |
public void setConf(final Configuration conf) { | |
this.conf = new GiraphConfiguration(conf); | |
} | |
@Override | |
public Configuration getConf() { | |
return conf; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment