Created
September 25, 2011 20:58
-
-
Save arjones/1241146 to your computer and use it in GitHub Desktop.
WordCount Bolt with Timer to Emit()
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package storm.starter.bolt; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.Timer; | |
import java.util.TimerTask; | |
import org.joda.time.DateTime; | |
import org.joda.time.DateTimeZone; | |
import backtype.storm.task.OutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.IRichBolt; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
import backtype.storm.tuple.Values; | |
public class WordCountTimeFrame implements IRichBolt { | |
private Timer timer; | |
private static final int EMIT_TIMEFRAME = 10; // each 10 seconds | |
OutputCollector collector; | |
// counter | |
Map<String, Integer> counts; | |
@Override | |
public void execute(Tuple tupple) { | |
String word = tupple.getString(0); | |
Integer count = counts.get(word); | |
if (count == null) | |
count = 0; | |
count++; | |
counts.put(word, count); | |
collector.ack(tupple); | |
} | |
@Override | |
public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { | |
this.counts = Collections.synchronizedMap(new HashMap<String, Integer>()); | |
this.collector = collector; | |
// Round the timestamp. ie: 2011-09-26T19:21:00.000Z | |
DateTime now = new DateTime().withZone(DateTimeZone.UTC); | |
int roundSeconds = ((now.getSecondOfMinute() / EMIT_TIMEFRAME) * EMIT_TIMEFRAME) + EMIT_TIMEFRAME; | |
DateTime startAt = now.minuteOfHour().roundFloorCopy().plusSeconds(roundSeconds); | |
this.timer = new Timer(); | |
this.timer.scheduleAtFixedRate(new EmitTask(counts, collector), startAt.toDate(), EMIT_TIMEFRAME * 1000L); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("timestamp", "word", "count")); | |
} | |
@Override | |
public void cleanup() { | |
this.timer.cancel(); | |
this.timer.purge(); | |
} | |
class EmitTask extends TimerTask { | |
private final Map<String, Integer> counts; | |
private final OutputCollector collector; | |
public EmitTask(Map<String, Integer> counts, OutputCollector collector) { | |
this.counts = counts; | |
this.collector = collector; | |
} | |
@Override | |
public void run() { | |
// create snapshot | |
Map<String, Integer> snapshot; | |
synchronized (this.counts) { | |
snapshot = new HashMap<String, Integer>(this.counts); | |
this.counts.clear(); | |
} | |
long currentTime = System.currentTimeMillis(); | |
Set<String> keys = snapshot.keySet(); | |
for (String word : keys) { | |
Integer count = snapshot.get(word); | |
this.collector.emit(new Values(currentTime, word, count)); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment