Last active
March 26, 2016 07:00
-
-
Save HeartSaVioR/9fd277307d4d5efcd47f to your computer and use it in GitHub Desktop.
"Speed of light topology Roshan provided" + printing metrics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package storm.starter; | |
import org.apache.storm.Config; | |
import org.apache.storm.StormSubmitter; | |
import org.apache.storm.generated.ClusterSummary; | |
import org.apache.storm.generated.ExecutorSummary; | |
import org.apache.storm.generated.KillOptions; | |
import org.apache.storm.generated.Nimbus; | |
import org.apache.storm.generated.SpoutStats; | |
import org.apache.storm.generated.TopologyInfo; | |
import org.apache.storm.generated.TopologySummary; | |
import org.apache.storm.hdfs.spout.Configs; | |
import org.apache.storm.hdfs.spout.HdfsSpout; | |
import org.apache.storm.metric.LoggingMetricsConsumer; | |
import org.apache.storm.spout.SpoutOutputCollector; | |
import org.apache.storm.starter.FastWordCountTopology; | |
import org.apache.storm.task.OutputCollector; | |
import org.apache.storm.task.TopologyContext; | |
import org.apache.storm.topology.IRichSpout; | |
import org.apache.storm.topology.OutputFieldsDeclarer; | |
import org.apache.storm.topology.TopologyBuilder; | |
import org.apache.storm.topology.base.BaseRichBolt; | |
import org.apache.storm.topology.base.BaseRichSpout; | |
import org.apache.storm.tuple.Fields; | |
import org.apache.storm.tuple.Tuple; | |
import org.apache.storm.tuple.Values; | |
import org.apache.storm.utils.NimbusClient; | |
import org.apache.storm.utils.Utils; | |
import java.util.Map; | |
import java.util.Random; | |
public class BasicTopology { | |
public static final String SPOUT_ID = "spoutRand"; | |
public static final String BOLT_ID = "boltIncDouble"; | |
/** Copies text file content from sourceDir to destinationDir. Moves source files into sourceDir | |
* after its done consuming */ | |
public static void main(String[] args) throws Exception { | |
String topologyName = "BasicTopology"; | |
TopologyBuilder builder = new TopologyBuilder(); | |
// setup spout | |
IRichSpout spout = new RandomNumberSpout(); | |
builder.setSpout(SPOUT_ID, spout, 1); | |
// setup bolt | |
builder.setBolt(BOLT_ID, new IncrementDoubleBolt(), 1) | |
.localOrShuffleGrouping(SPOUT_ID); | |
builder.createTopology(); | |
// configure topology and submit | |
Config conf = new Config(); | |
conf.setNumWorkers(1); | |
conf.setNumAckers(0); | |
conf.setMaxTaskParallelism(1); | |
conf.setNumWorkers(1); | |
conf.setDebug(false); | |
// submit topology | |
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology()); | |
Map clusterConf = Utils.readStormConfig(); | |
clusterConf.putAll(Utils.readCommandLineOpts()); | |
Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); | |
TopologyInfo latestTopologyInfo = null; | |
//Sleep for 10 mins | |
for (int i = 0; i < 20; i++) { | |
Thread.sleep(30 * 1000); | |
TopologyInfo newTopologyInfo = getLatestTopologyInfo(client, topologyName); | |
printMetrics(latestTopologyInfo, newTopologyInfo); | |
latestTopologyInfo = newTopologyInfo; | |
} | |
kill(client, topologyName); | |
} // main | |
private static TopologyInfo getLatestTopologyInfo(Nimbus.Client client, String topologyName) throws Exception { | |
ClusterSummary summary = client.getClusterInfo(); | |
String id = null; | |
for (TopologySummary ts: summary.get_topologies()) { | |
if (topologyName.equals(ts.get_name())) { | |
id = ts.get_id(); | |
} | |
} | |
if (id == null) { | |
throw new Exception("Could not find a topology named "+topologyName); | |
} | |
return client.getTopologyInfo(id); | |
} | |
static class MetricInfo { | |
private int uptime; | |
private long emittedSpout; | |
private long emittedBolt; | |
public MetricInfo(int uptime, long emittedSpout, long emittedBolt) { | |
this.uptime = uptime; | |
this.emittedSpout = emittedSpout; | |
this.emittedBolt = emittedBolt; | |
} | |
public int getUptime() { | |
return uptime; | |
} | |
public long getEmittedSpout() { | |
return emittedSpout; | |
} | |
public long getEmittedBolt() { | |
return emittedBolt; | |
} | |
} | |
public static void printMetrics(TopologyInfo oldInfo, TopologyInfo newInfo) throws Exception { | |
MetricInfo lastMetric; | |
if (oldInfo == null) { | |
lastMetric = new MetricInfo(0, 0, 0); | |
} else { | |
lastMetric = extractMetricInfo(oldInfo); | |
} | |
MetricInfo currMetric = extractMetricInfo(newInfo); | |
int uptimeDiff = currMetric.getUptime() - lastMetric.getUptime(); | |
long emittedSpoutDiff = (currMetric.getEmittedSpout() - lastMetric.getEmittedSpout()); | |
long emittedBoltDiff = (currMetric.getEmittedBolt() - lastMetric.getEmittedBolt()); | |
double spoutEmitsPerSecInDuration = ((double)emittedSpoutDiff) / uptimeDiff; | |
double boltEmitsPerSecInDuration = ((double)emittedBoltDiff) / uptimeDiff; | |
System.out.println("uptime: " + currMetric.getUptime() + " duration: " + uptimeDiff + " secs"); | |
System.out.println("spout emitted: " + emittedSpoutDiff + " (total: " + currMetric.getEmittedSpout() + ") emitted/sec (in duration): " + spoutEmitsPerSecInDuration); | |
System.out.println("bolt emitted: " + emittedBoltDiff + " (total: " + currMetric.getEmittedBolt() + ") emitted/sec (in duration): " + boltEmitsPerSecInDuration); | |
} | |
private static MetricInfo extractMetricInfo(TopologyInfo info) { | |
int uptime = info.get_uptime_secs(); | |
long emittedSpout = 0; | |
long emittedBolt = 0; | |
for (ExecutorSummary exec: info.get_executors()) { | |
if (SPOUT_ID.equals(exec.get_component_id())) { | |
Map<String, Long> emittedMap = exec.get_stats().get_emitted().get(":all-time"); | |
for (String key: emittedMap.keySet()) { | |
long emitVal = emittedMap.get(key); | |
emittedSpout += emitVal; | |
} | |
} else if (BOLT_ID.equals(exec.get_component_id())) { | |
Map<String, Long> emittedMap = exec.get_stats().get_emitted().get(":all-time"); | |
for (String key: emittedMap.keySet()) { | |
long emitVal = emittedMap.get(key); | |
emittedBolt += emitVal; | |
} | |
} | |
} | |
return new MetricInfo(uptime, emittedSpout, emittedBolt); | |
} | |
public static void kill(Nimbus.Client client, String name) throws Exception { | |
KillOptions opts = new KillOptions(); | |
opts.set_wait_secs(0); | |
client.killTopologyWithOpts(name, opts); | |
} | |
public static class RandomNumberSpout extends BaseRichSpout { | |
private static final long serialVersionUID = -4100642374496292646L; | |
public static final String FIELDS = "number"; | |
private long messageCount = 0; | |
private SpoutOutputCollector collector = null; | |
// private StringBuffer message = null; | |
private Random rand = null; | |
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { | |
this.rand = new Random(); | |
this.collector = collector; | |
} | |
@Override | |
public void nextTuple() { | |
collector.emit(new Values(new Long(rand.nextInt(Integer.MAX_VALUE))), messageCount); | |
messageCount++; | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields(FIELDS)); | |
} | |
} // class RandomNumberSpout | |
public static class IncrementDoubleBolt extends BaseRichBolt { | |
private static final long serialVersionUID = -5313598399155365865L; | |
public static final String FIELDS = "number"; | |
private OutputCollector collector; | |
@Override | |
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { | |
this.collector = collector; | |
} | |
@Override | |
public void execute(Tuple tuple) { | |
Long val = (Long) tuple.getValue(0); | |
collector.emit(new Values( 2*(val+1)) ); // increment and double the value | |
collector.ack(tuple); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields(FIELDS)); | |
} | |
} // class IncrementDoubleBolt | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment