Created
July 5, 2016 07:25
-
-
Save HeartSaVioR/0e0e9fe934a6a7bbc28f951fda87deee to your computer and use it in GitHub Desktop.
Full source code of CustomMetricTopology
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.hortonworks.storm.test; | |
| import org.apache.storm.StormSubmitter; | |
| import org.apache.storm.generated.AlreadyAliveException; | |
| import org.apache.storm.generated.AuthorizationException; | |
| import org.apache.storm.generated.InvalidTopologyException; | |
| import org.apache.storm.generated.StormTopology; | |
| import org.apache.storm.metric.api.MeanReducer; | |
| import org.apache.storm.metric.api.ReducedMetric; | |
| import org.apache.storm.spout.SpoutOutputCollector; | |
| import org.apache.storm.task.OutputCollector; | |
| import org.apache.storm.task.TopologyContext; | |
| import org.apache.storm.topology.OutputFieldsDeclarer; | |
| import org.apache.storm.topology.TopologyBuilder; | |
| import org.apache.storm.topology.base.BaseRichBolt; | |
| import org.apache.storm.topology.base.BaseRichSpout; | |
| import org.apache.storm.tuple.Fields; | |
| import org.apache.storm.tuple.Tuple; | |
| import org.apache.storm.tuple.Values; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import java.util.HashMap; | |
| import java.util.Map; | |
| import java.util.Random; | |
| import java.util.UUID; | |
| public class CustomMetricTopology { | |
| static class URLSpout extends BaseRichSpout { | |
| private static final String[] urlList = {"http://hortonworks.com", "https://community.hortonworks.com"}; | |
| private Random random = new Random(); | |
| private SpoutOutputCollector collector; | |
| public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { | |
| this.collector = collector; | |
| } | |
| public void nextTuple() { | |
| try { | |
| Thread.sleep(100); | |
| } catch (InterruptedException e) { | |
| // skip | |
| } | |
| collector.emit(new Values(urlList[random.nextInt(urlList.length)]), UUID.randomUUID()); | |
| } | |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
| declarer.declare(new Fields("url")); | |
| } | |
| } | |
| static class StoreCrawledPageBolt extends BaseRichBolt { | |
| private static final Logger LOG = LoggerFactory.getLogger(StoreCrawledPageBolt.class); | |
| private static final int TIME_BUCKET_SIZE_IN_SECS = 60; | |
| private static final int LATENCY_BOUND_OF_CRAWL = 3000; | |
| private static final int LATENCY_BOUND_OF_STORE = 200; | |
| private transient ReducedMetric latencyForCrawl; | |
| private transient ReducedMetric latencyForStore; | |
| private Random random; | |
| private OutputCollector collector; | |
| @Override | |
| public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { | |
| this.collector = collector; | |
| random = new Random(); | |
| latencyForCrawl = new ReducedMetric(new MeanReducer()); | |
| latencyForStore = new ReducedMetric(new MeanReducer()); | |
| context.registerMetric("latency-crawl", latencyForCrawl, TIME_BUCKET_SIZE_IN_SECS); | |
| context.registerMetric("latency-store", latencyForStore, TIME_BUCKET_SIZE_IN_SECS); | |
| } | |
| @Override | |
| public void execute(Tuple input) { | |
| String url = input.getString(0); | |
| LOG.info("crawling and storing url: {}", url); | |
| try { | |
| long start = System.currentTimeMillis(); | |
| String html = crawlPage(url); | |
| latencyForCrawl.update(System.currentTimeMillis() - start); | |
| start = System.currentTimeMillis(); | |
| storePage(url, html); | |
| latencyForStore.update(System.currentTimeMillis() - start); | |
| } catch (InterruptedException e) { | |
| // skip | |
| LOG.info("Interrupted, skipping..."); | |
| } | |
| collector.ack(input); | |
| } | |
| private String crawlPage(String url) throws InterruptedException { | |
| Thread.sleep(random.nextInt(LATENCY_BOUND_OF_CRAWL)); | |
| return "hello world"; | |
| } | |
| private void storePage(String url, String html) throws InterruptedException { | |
| Thread.sleep(random.nextInt(LATENCY_BOUND_OF_STORE)); | |
| LOG.info("Storing page for {} complete.", url); | |
| } | |
| @Override | |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
| // not needed as we don't emit any tuples to downstream | |
| } | |
| } | |
| public static void main(String[] args) | |
| throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { | |
| URLSpout urlSpout = new URLSpout(); | |
| StoreCrawledPageBolt storeCrawledPageBolt = new StoreCrawledPageBolt(); | |
| TopologyBuilder builder = new TopologyBuilder(); | |
| builder.setSpout("urlSpout", urlSpout); | |
| builder.setBolt("storeCrawledPageBolt", storeCrawledPageBolt, 5).shuffleGrouping("urlSpout"); | |
| StormTopology topology = builder.createTopology(); | |
| Map<String, Object> conf = new HashMap<>(); | |
| conf.put("topology.max.spout.pending", 10); | |
| StormSubmitter.submitTopology("custom-metric-topology", conf, topology); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment