Created
September 8, 2012 20:31
-
-
Save sjoerdmulder/3679478 to your computer and use it in GitHub Desktop.
TridentTopology failing tuples
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import backtype.storm.Config; | |
import backtype.storm.LocalCluster; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.base.BaseRichSpout; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
import backtype.storm.utils.Utils; | |
import storm.trident.TridentTopology; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
public class TridentTest { | |
public static void main(String[] args) throws Exception { | |
Config conf = new Config(); | |
conf.setMaxSpoutPending(10); | |
conf.setMessageTimeoutSecs(3); | |
TridentTopology topology = new TridentTopology(); | |
BaseRichSpout spout = new BaseRichSpout() { | |
public SpoutOutputCollector collector; | |
private int MAX_PER_SEQUENCE = 693; | |
private int sleeping = 0; | |
private int index = 0; | |
private int sequence = 0; | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("test")); | |
} | |
@Override | |
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { | |
this.collector = collector; | |
} | |
@Override | |
public void nextTuple() { | |
if(index >= MAX_PER_SEQUENCE) { | |
index = 0; | |
sequence++; | |
} | |
if (sleeping % 100 == 0) { | |
Utils.sleep(50); | |
} else { | |
collector.emit(new Values("this is some sentence"), sequence + "-" + index); | |
index++; | |
} | |
sleeping++; | |
} | |
@Override | |
public void ack(Object msgId) { | |
Integer sequence = getSequenceFromMsgId(msgId); | |
int count = updateValue(sequence, countingAck); | |
checkForCompleteness(sequence, count, true); | |
} | |
@Override | |
public void fail(Object msgId) { | |
Integer sequence = getSequenceFromMsgId(msgId); | |
int count = updateValue(sequence, countingFail); | |
checkForCompleteness(sequence, count, false); | |
} | |
Map<Integer, Integer> countingAck = new ConcurrentHashMap<Integer, Integer>(); | |
Map<Integer, Integer> countingFail = new ConcurrentHashMap<Integer, Integer>(); | |
private void checkForCompleteness(Integer sequence, int count, boolean ack) { | |
if(count == MAX_PER_SEQUENCE) { | |
if(ack) { | |
countingAck.remove(sequence); | |
System.out.println("Acking sequence: " + sequence); | |
if(!countingAck.isEmpty()) { | |
System.out.println("Some sequence didn't ack completly!! " + countingAck.toString()); | |
} | |
} else { | |
countingFail.remove(sequence); | |
System.out.println("Failing sequence: " + sequence); | |
if(!countingFail.isEmpty()) { | |
System.out.println("Some sequence didn't fail completly!! " + countingFail.toString()); | |
} | |
} | |
} | |
} | |
private Integer getSequenceFromMsgId(Object msgId) { | |
String[] result = ((String) msgId).split("-"); | |
return Integer.parseInt(result[0]); | |
} | |
private int updateValue(Integer key, Map<Integer, Integer> map) { | |
int count = 0; | |
if(map.containsKey(key)) { | |
count = map.get(key); | |
} | |
count++; | |
map.put(key, count); | |
return count; | |
} | |
}; | |
topology.newStream("stream", spout); | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology("trident-test", conf, topology.build()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment