|
package com.mkscrg.sandbox; |
|
|
|
import backtype.storm.Config; |
|
import backtype.storm.LocalCluster; |
|
import backtype.storm.StormSubmitter; |
|
import backtype.storm.generated.AlreadyAliveException; |
|
import backtype.storm.generated.InvalidTopologyException; |
|
import backtype.storm.generated.StormTopology; |
|
import backtype.storm.spout.SpoutOutputCollector; |
|
import backtype.storm.task.TopologyContext; |
|
import backtype.storm.topology.BasicOutputCollector; |
|
import backtype.storm.topology.OutputFieldsDeclarer; |
|
import backtype.storm.topology.TopologyBuilder; |
|
import backtype.storm.topology.base.BaseBasicBolt; |
|
import backtype.storm.topology.base.BaseRichSpout; |
|
import backtype.storm.tuple.Fields; |
|
import backtype.storm.tuple.Tuple; |
|
import backtype.storm.tuple.Values; |
|
import backtype.storm.utils.Utils; |
|
import com.esotericsoftware.kryo.Kryo; |
|
import com.esotericsoftware.kryo.Serializer; |
|
import com.esotericsoftware.kryo.io.Input; |
|
import com.esotericsoftware.kryo.io.Output; |
|
import org.apache.log4j.Logger; |
|
|
|
import java.util.*; |
|
|
|
public final class KryoTestTopology { |
|
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { |
|
TopologyBuilder topologyBuilder = new TopologyBuilder(); |
|
topologyBuilder.setSpout("spout", new AModelSpout(), 1).setNumTasks(1); |
|
topologyBuilder.setBolt("bolt1", new AModelConduitBolt(), 2) |
|
.shuffleGrouping("spout"); |
|
|
|
StormTopology stormTopology = topologyBuilder.createTopology(); |
|
|
|
Config config = new Config(); |
|
config.setFallBackOnJavaSerialization(false); |
|
config.setMaxSpoutPending(1); |
|
config.setNumWorkers(2); |
|
|
|
config.registerSerialization(AModel.class, AModelKryoSerializer.class); |
|
|
|
if (args != null && args.length > 0) { |
|
StormSubmitter.submitTopology(args[0], config, stormTopology); |
|
} else { |
|
LocalCluster localCluster = new LocalCluster(); |
|
localCluster.submitTopology("kryo-test_local", config, stormTopology); |
|
|
|
Utils.sleep(15000); |
|
|
|
localCluster.shutdown(); |
|
} |
|
} |
|
|
|
public static final class AModelSpout extends BaseRichSpout { |
|
private static final Logger LOGGER = Logger.getLogger(AModelSpout.class); |
|
|
|
private SpoutOutputCollector spoutOutputCollector; |
|
|
|
@Override |
|
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { |
|
outputFieldsDeclarer.declare(new Fields("aModel")); |
|
} |
|
|
|
@Override |
|
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { |
|
this.spoutOutputCollector = spoutOutputCollector; |
|
} |
|
|
|
@Override |
|
public void nextTuple() { |
|
ArrayList<String> stringList = new ArrayList<String>(); |
|
stringList.add("Hello"); |
|
stringList.add("world"); |
|
|
|
AModel AModel = new AModel(stringList); |
|
|
|
LOGGER.info("Emitting " + AModel); |
|
spoutOutputCollector.emit(new Values(AModel), UUID.randomUUID().toString()); |
|
|
|
Utils.sleep(500); |
|
} |
|
} |
|
|
|
public static final class AModelConduitBolt extends BaseBasicBolt { |
|
private static final Logger LOGGER = Logger.getLogger(AModelConduitBolt.class); |
|
|
|
public AModelConduitBolt() { |
|
super(); |
|
} |
|
|
|
@Override |
|
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { |
|
outputFieldsDeclarer.declare(new Fields("aModel")); |
|
} |
|
|
|
@Override |
|
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { |
|
AModel aModel = (AModel) tuple.getValueByField("aModel"); |
|
LOGGER.info("Received and emitting " + aModel); |
|
basicOutputCollector.emit(new Values(aModel)); |
|
} |
|
} |
|
|
|
public static final class AModelKryoSerializer extends Serializer<AModel> { |
|
@Override |
|
public void write(Kryo kryo, Output output, AModel aModel) { |
|
kryo.writeObjectOrNull(output, aModel.getStrings(), ArrayList.class); |
|
} |
|
|
|
@Override |
|
public AModel read(Kryo kryo, Input input, Class<AModel> aModelClass) { |
|
ArrayList<String> strings = kryo.readObjectOrNull(input, ArrayList.class); |
|
return new AModel(strings); |
|
} |
|
} |
|
|
|
public static final class AModel { |
|
private final ArrayList<String> strings; |
|
|
|
public AModel(ArrayList<String> strings) { |
|
this.strings = strings; |
|
} |
|
|
|
public ArrayList<String> getStrings() { |
|
return strings; |
|
} |
|
} |
|
} |