Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Last active March 26, 2016 07:00
Show Gist options
  • Save HeartSaVioR/0e2555633a5f7d12cb68 to your computer and use it in GitHub Desktop.
Save HeartSaVioR/0e2555633a5f7d12cb68 to your computer and use it in GitHub Desktop.
"Speed of light topology Arun provided" + printing metrics
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.starter;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.hdfs.spout.Configs;
import org.apache.storm.hdfs.spout.HdfsSpout;
import org.apache.storm.metric.LoggingMetricsConsumer;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.starter.FastWordCountTopology;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class BasicTopology {
public static final String SPOUT_ID = "spoutRand";
public static final String BOLT_ID = "boltIncDouble";
/** Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir
* after its done consuming */
public static void main(String[] args) throws Exception {
String topologyName = "BasicTopology";
TopologyBuilder builder = new TopologyBuilder();
// setup spout
IRichSpout spout = new RandomNumberSpout();
builder.setSpout(SPOUT_ID, spout, 1);
// setup bolt
builder.setBolt(BOLT_ID, new IncrementDoubleBolt(), 1)
.localOrShuffleGrouping(SPOUT_ID);
builder.createTopology();
// configure topology and submit
Config conf = new Config();
conf.setNumWorkers(1);
conf.setNumAckers(0);
conf.setMaxTaskParallelism(1);
conf.setNumWorkers(1);
conf.setDebug(false);
// submit topology
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
Map clusterConf = Utils.readStormConfig();
clusterConf.putAll(Utils.readCommandLineOpts());
Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
TopologyInfo latestTopologyInfo = null;
//Sleep for 10 mins
for (int i = 0; i < 20; i++) {
Thread.sleep(30 * 1000);
TopologyInfo newTopologyInfo = getLatestTopologyInfo(client, topologyName);
printMetrics(latestTopologyInfo, newTopologyInfo);
latestTopologyInfo = newTopologyInfo;
}
kill(client, topologyName);
} // main
private static TopologyInfo getLatestTopologyInfo(Nimbus.Client client, String topologyName) throws Exception {
ClusterSummary summary = client.getClusterInfo();
String id = null;
for (TopologySummary ts: summary.get_topologies()) {
if (topologyName.equals(ts.get_name())) {
id = ts.get_id();
}
}
if (id == null) {
throw new Exception("Could not find a topology named "+topologyName);
}
return client.getTopologyInfo(id);
}
static class MetricInfo {
private int uptime;
private long emittedSpout;
private long emittedBolt;
public MetricInfo(int uptime, long emittedSpout, long emittedBolt) {
this.uptime = uptime;
this.emittedSpout = emittedSpout;
this.emittedBolt = emittedBolt;
}
public int getUptime() {
return uptime;
}
public long getEmittedSpout() {
return emittedSpout;
}
public long getEmittedBolt() {
return emittedBolt;
}
}
public static void printMetrics(TopologyInfo oldInfo, TopologyInfo newInfo) throws Exception {
MetricInfo lastMetric;
if (oldInfo == null) {
lastMetric = new MetricInfo(0, 0, 0);
} else {
lastMetric = extractMetricInfo(oldInfo);
}
MetricInfo currMetric = extractMetricInfo(newInfo);
int uptimeDiff = currMetric.getUptime() - lastMetric.getUptime();
long emittedSpoutDiff = (currMetric.getEmittedSpout() - lastMetric.getEmittedSpout());
long emittedBoltDiff = (currMetric.getEmittedBolt() - lastMetric.getEmittedBolt());
double spoutEmitsPerSecInDuration = ((double)emittedSpoutDiff) / uptimeDiff;
double boltEmitsPerSecInDuration = ((double)emittedBoltDiff) / uptimeDiff;
System.out.println("uptime: " + currMetric.getUptime() + " duration: " + uptimeDiff + " secs");
System.out.println("spout emitted: " + emittedSpoutDiff + " (total: " + currMetric.getEmittedSpout() + ") emitted/sec (in duration): " + spoutEmitsPerSecInDuration);
System.out.println("bolt emitted: " + emittedBoltDiff + " (total: " + currMetric.getEmittedBolt() + ") emitted/sec (in duration): " + boltEmitsPerSecInDuration);
}
private static MetricInfo extractMetricInfo(TopologyInfo info) {
int uptime = info.get_uptime_secs();
long emittedSpout = 0;
long emittedBolt = 0;
for (ExecutorSummary exec: info.get_executors()) {
if (SPOUT_ID.equals(exec.get_component_id())) {
Map<String, Long> emittedMap = exec.get_stats().get_emitted().get(":all-time");
for (String key: emittedMap.keySet()) {
long emitVal = emittedMap.get(key);
emittedSpout += emitVal;
}
} else if (BOLT_ID.equals(exec.get_component_id())) {
Map<String, Long> emittedMap = exec.get_stats().get_emitted().get(":all-time");
for (String key: emittedMap.keySet()) {
long emitVal = emittedMap.get(key);
emittedBolt += emitVal;
}
}
}
return new MetricInfo(uptime, emittedSpout, emittedBolt);
}
public static void kill(Nimbus.Client client, String name) throws Exception {
KillOptions opts = new KillOptions();
opts.set_wait_secs(0);
client.killTopologyWithOpts(name, opts);
}
public static class RandomNumberSpout extends BaseRichSpout {
private static final long serialVersionUID = -4100642374496292646L;
public static final String FIELDS = "number";
private long messageCount = 0;
private SpoutOutputCollector collector = null;
// private StringBuffer message = null;
private Random rand = null;
private Long longValue = new Long(Integer.MAX_VALUE);
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.rand = new Random();
this.collector = collector;
}
@Override
public void nextTuple() {
collector.emit(new Values(longValue), messageCount);
messageCount++;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FIELDS));
}
} // class RandomNumberSpout
public static class IncrementDoubleBolt extends BaseRichBolt {
private static final long serialVersionUID = -5313598399155365865L;
public static final String FIELDS = "number";
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
Long val = (Long) tuple.getValue(0);
collector.emit(new Values( 2*(val+1)) ); // increment and double the value
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FIELDS));
}
} // class IncrementDoubleBolt
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment