Skip to content

Instantly share code, notes, and snippets.

@mkscrg
Created June 26, 2012 17:49
Show Gist options
  • Save mkscrg/2997413 to your computer and use it in GitHub Desktop.
Save mkscrg/2997413 to your computer and use it in GitHub Desktop.
Test topology for possible MAX_SPOUT_PENDING regression in storm 0.8.0-SNAPSHOT

storm-msp

The topology looks like this:

                                 "stream1"
           "one"              /-------------\
OneSpout --------> SplitBolt |               > CountBolt
                              \-------------/
                                 "stream2"

Compile:

mvn clean compile

Run locally:

mvn exec:java

Package and send to cluster:

mvn package
storm jar target/storm-msp-1.0-jar-with-dependencies.jar com.mkscrg.sandbox.StormMSPTest msp-test

Notice that in Topology Stats / All time, (Transferred - Acked) is not upper-bounded by (# spout tasks * MAX_SPOUT_PENDING).

  • # spout tasks = 10
  • MAX_SPOUT_PENDING = 10
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mkscrg.sandbox</groupId>
<artifactId>storm-msp</artifactId>
<version>1.0</version>
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.mkscrg.sandbox.StormMSPTest</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<configuration>
<classpathScope>compile</classpathScope>
<mainClass>com.mkscrg.sandbox.StormMSPTest</mainClass>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.8.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
package com.mkscrg.sandbox;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.log4j.Logger;
import java.util.Map;
import java.util.UUID;
public final class StormMSPTest {
private static final String ONE_FIELD = "one";
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
boolean isRemote = false;
String topologyName = "msp-test_local";
if (args != null && args.length > 0) {
isRemote = true;
topologyName = args[0];
}
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("one-spout", new OneSpout(), 10);
topologyBuilder.setBolt("split-bolt", new SplitBolt(), 1)
.shuffleGrouping("one-spout");
topologyBuilder.setBolt("count-bolt", new CountBolt(), 1)
.shuffleGrouping("split-bolt", SplitBolt.STREAM1)
.shuffleGrouping("split-bolt", SplitBolt.STREAM2);
StormTopology stormTopology = topologyBuilder.createTopology();
Config config = new Config();
config.setMaxSpoutPending(10);
config.setNumWorkers(2);
if (isRemote) {
StormSubmitter.submitTopology(topologyName, config, stormTopology);
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology(topologyName, config, stormTopology);
Thread.sleep(20000);
localCluster.shutdown();
}
}
public static final class OneSpout extends BaseRichSpout {
private static final Logger LOGGER = Logger.getLogger(OneSpout.class);
private SpoutOutputCollector spoutOutputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(ONE_FIELD));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
spoutOutputCollector.emit(new Values(1), UUID.randomUUID().toString());
LOGGER.info("Emitted 1");
}
@Override
public void fail(Object msgId) {
nextTuple();
}
}
public static final class SplitBolt extends BaseBasicBolt {
private static final Logger LOGGER = Logger.getLogger(SplitBolt.class);
public static final String STREAM1 = "stream1";
public static final String STREAM2 = "stream2";
private boolean streamFlop = false;
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
Integer i = tuple.getIntegerByField(ONE_FIELD);
LOGGER.info("Received " + i);
String streamId;
if (streamFlop) {
streamId = STREAM1;
streamFlop = false;
} else {
streamId = STREAM2;
streamFlop = true;
}
basicOutputCollector.emit(streamId, new Values(i));
LOGGER.info("Emitted " + i + " on " + streamId);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream(STREAM1, new Fields(ONE_FIELD));
outputFieldsDeclarer.declareStream(STREAM2, new Fields(ONE_FIELD));
}
}
public static final class CountBolt extends BaseBasicBolt {
private static final Logger LOGGER = Logger.getLogger(CountBolt.class);
private int counter = 0;
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
Integer i = tuple.getIntegerByField(ONE_FIELD);
LOGGER.info("Received " + i + "(" + ++counter + ")");
Utils.sleep(100);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// No output.
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment