Skip to content

Instantly share code, notes, and snippets.

@mkscrg
Created July 26, 2012 16:57
Show Gist options
  • Save mkscrg/3183211 to your computer and use it in GitHub Desktop.
Save mkscrg/3183211 to your computer and use it in GitHub Desktop.
Test topology for Issue 276
*.iml
.idea/
target/
*.DS_STORE*

storm-kryo-test

This is a test case for Issue 276. I've used several Git branches to show what works and what doesn't. git diff branch1 branch2 is useful for seeing the exact differences.

Local-mode produces these results, but it's the same if running on a remote cluster. The first issue (error after double-registering ArrayList.class) appears only if topology.workers >= 2 and the number of bolts running is >= 2. The second issue (exception if List.class is not registered) appears regardless of other configuration.

  • On master: AModel.strings is an ArrayList<String>, AModelKryoSerializer#write uses ArrayList.class, and we don't register ArrayList.class. Everything works:

    mvn clean package exec:java
    
  • On register-array-list: AModel and AModelKryoSerializer are the same as on master, but we call Config#registerSerialization(ArrayList.class). We get an exception from Kryo:

    git checkout register-array-list
    mvn clean package exec:java
    
  • On use-list: AModel.strings is a List<String>, AModelKryoSerializer#write uses List.class, and we don't register List.class. We get an exception telling us to register List.class.

    git checkout use-list
    mvn clean package exec:java
    
  • On register-list: AModel and AModelKryoSerializer are the same as on use-list, but we call Config#registerSerialization(List.class). Everything works:

    git checkout register-list
    mvn clean package exec:java
    
package com.mkscrg.sandbox;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.log4j.Logger;
import java.util.*;
public final class KryoTestTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new AModelSpout(), 1).setNumTasks(1);
topologyBuilder.setBolt("bolt1", new AModelConduitBolt(), 2)
.shuffleGrouping("spout");
StormTopology stormTopology = topologyBuilder.createTopology();
Config config = new Config();
config.setFallBackOnJavaSerialization(false);
config.setMaxSpoutPending(1);
config.setNumWorkers(2);
config.registerSerialization(AModel.class, AModelKryoSerializer.class);
if (args != null && args.length > 0) {
StormSubmitter.submitTopology(args[0], config, stormTopology);
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("kryo-test_local", config, stormTopology);
Utils.sleep(15000);
localCluster.shutdown();
}
}
public static final class AModelSpout extends BaseRichSpout {
private static final Logger LOGGER = Logger.getLogger(AModelSpout.class);
private SpoutOutputCollector spoutOutputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("aModel"));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
ArrayList<String> stringList = new ArrayList<String>();
stringList.add("Hello");
stringList.add("world");
AModel AModel = new AModel(stringList);
LOGGER.info("Emitting " + AModel);
spoutOutputCollector.emit(new Values(AModel), UUID.randomUUID().toString());
Utils.sleep(500);
}
}
public static final class AModelConduitBolt extends BaseBasicBolt {
private static final Logger LOGGER = Logger.getLogger(AModelConduitBolt.class);
public AModelConduitBolt() {
super();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("aModel"));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
AModel aModel = (AModel) tuple.getValueByField("aModel");
LOGGER.info("Received and emitting " + aModel);
basicOutputCollector.emit(new Values(aModel));
}
}
public static final class AModelKryoSerializer extends Serializer<AModel> {
@Override
public void write(Kryo kryo, Output output, AModel aModel) {
kryo.writeObjectOrNull(output, aModel.getStrings(), ArrayList.class);
}
@Override
public AModel read(Kryo kryo, Input input, Class<AModel> aModelClass) {
ArrayList<String> strings = kryo.readObjectOrNull(input, ArrayList.class);
return new AModel(strings);
}
}
public static final class AModel {
private final ArrayList<String> strings;
public AModel(ArrayList<String> strings) {
this.strings = strings;
}
public ArrayList<String> getStrings() {
return strings;
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mkscrg.sandbox</groupId>
<artifactId>storm-kryo-test</artifactId>
<version>0.0.0</version>
<repositories>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.mkscrg.sandbox.KryoTestTopology</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<configuration>
<mainClass>com.mkscrg.sandbox.KryoTestTopology</mainClass>
<classpathScope>compile</classpathScope>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.8.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment