Created
March 16, 2012 12:39
-
-
Save abh1nav/2049902 to your computer and use it in GitHub Desktop.
A storm bolt (S3) that collects tuples from Bolts S1 and S2 and runs only when both tuples have been received.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitsprout.sproutscore.bolts; | |
import java.util.HashMap; | |
import java.util.Map; | |
import com.google.common.collect.Maps; | |
import backtype.storm.task.OutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.IRichBolt; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.tuple.Tuple; | |
public class S3 implements IRichBolt { | |
/** | |
* To hold the tuples generated by S1 and S2 | |
*/ | |
private class InputCollector { | |
public Tuple s1 = null; | |
public Tuple s2 = null; | |
} | |
private OutputCollector collector; | |
private HashMap<String, InputCollector> inputs; | |
@SuppressWarnings("rawtypes") | |
@Override | |
public void prepare(Map stormConf, TopologyContext context, | |
OutputCollector collector) { | |
this.collector = collector; | |
this.inputs = Maps.newHashMap(); | |
} | |
@Override | |
public void execute(Tuple input) { | |
String messageId = (String) input.getValueByField("messageId"); | |
InputCollector ic = getOrCreateInputCollector(messageId); | |
switch(input.getSourceComponent()) { | |
case "BoltIDofS1": | |
ic.s1 = input; | |
break; | |
case "BoltIDofS2": | |
ic.s2 = input; | |
break; | |
} | |
if(ic.s1 != null && ic.s2 != null) { | |
/** | |
* Run your code for S3 | |
* now that you have both tuples from S1 and S2 | |
* for the given message Id | |
*/ | |
// remember to remove this InputCollector object Map | |
this.inputs.remove(messageId); | |
} | |
this.collector.ack(input); | |
} | |
private InputCollector getOrCreateInputCollector(String messageId) { | |
InputCollector ic; | |
if(this.inputs.containsKey(messageId)) { | |
ic = this.inputs.get(messageId); | |
} | |
else { | |
ic = new InputCollector(); | |
this.inputs.put(messageId, ic); | |
} | |
return ic; | |
} | |
@Override | |
public void cleanup() {} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Re thread: http://groups.google.com/group/storm-user/browse_thread/thread/3834480fc03ceb07