Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Created July 5, 2016 07:25
Show Gist options
  • Select an option

  • Save HeartSaVioR/0e0e9fe934a6a7bbc28f951fda87deee to your computer and use it in GitHub Desktop.

Select an option

Save HeartSaVioR/0e0e9fe934a6a7bbc28f951fda87deee to your computer and use it in GitHub Desktop.
Full source code of CustomMetricTopology
package com.hortonworks.storm.test;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
public class CustomMetricTopology {
static class URLSpout extends BaseRichSpout {
private static final String[] urlList = {"http://hortonworks.com", "https://community.hortonworks.com"};
private Random random = new Random();
private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
public void nextTuple() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// skip
}
collector.emit(new Values(urlList[random.nextInt(urlList.length)]), UUID.randomUUID());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url"));
}
}
static class StoreCrawledPageBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(StoreCrawledPageBolt.class);
private static final int TIME_BUCKET_SIZE_IN_SECS = 60;
private static final int LATENCY_BOUND_OF_CRAWL = 3000;
private static final int LATENCY_BOUND_OF_STORE = 200;
private transient ReducedMetric latencyForCrawl;
private transient ReducedMetric latencyForStore;
private Random random;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
random = new Random();
latencyForCrawl = new ReducedMetric(new MeanReducer());
latencyForStore = new ReducedMetric(new MeanReducer());
context.registerMetric("latency-crawl", latencyForCrawl, TIME_BUCKET_SIZE_IN_SECS);
context.registerMetric("latency-store", latencyForStore, TIME_BUCKET_SIZE_IN_SECS);
}
@Override
public void execute(Tuple input) {
String url = input.getString(0);
LOG.info("crawling and storing url: {}", url);
try {
long start = System.currentTimeMillis();
String html = crawlPage(url);
latencyForCrawl.update(System.currentTimeMillis() - start);
start = System.currentTimeMillis();
storePage(url, html);
latencyForStore.update(System.currentTimeMillis() - start);
} catch (InterruptedException e) {
// skip
LOG.info("Interrupted, skipping...");
}
collector.ack(input);
}
private String crawlPage(String url) throws InterruptedException {
Thread.sleep(random.nextInt(LATENCY_BOUND_OF_CRAWL));
return "hello world";
}
private void storePage(String url, String html) throws InterruptedException {
Thread.sleep(random.nextInt(LATENCY_BOUND_OF_STORE));
LOG.info("Storing page for {} complete.", url);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// not needed as we don't emit any tuples to downstream
}
}
public static void main(String[] args)
throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
URLSpout urlSpout = new URLSpout();
StoreCrawledPageBolt storeCrawledPageBolt = new StoreCrawledPageBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("urlSpout", urlSpout);
builder.setBolt("storeCrawledPageBolt", storeCrawledPageBolt, 5).shuffleGrouping("urlSpout");
StormTopology topology = builder.createTopology();
Map<String, Object> conf = new HashMap<>();
conf.put("topology.max.spout.pending", 10);
StormSubmitter.submitTopology("custom-metric-topology", conf, topology);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment