Skip to content

Instantly share code, notes, and snippets.

@arjones
Created September 25, 2011 20:58
Show Gist options
  • Save arjones/1241146 to your computer and use it in GitHub Desktop.
Save arjones/1241146 to your computer and use it in GitHub Desktop.
WordCount Bolt with Timer to Emit()
package storm.starter.bolt;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordCountTimeFrame implements IRichBolt {
private Timer timer;
private static final int EMIT_TIMEFRAME = 10; // each 10 seconds
OutputCollector collector;
// counter
Map<String, Integer> counts;
@Override
public void execute(Tuple tupple) {
String word = tupple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.ack(tupple);
}
@Override
public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
this.counts = Collections.synchronizedMap(new HashMap<String, Integer>());
this.collector = collector;
// Round the timestamp. ie: 2011-09-26T19:21:00.000Z
DateTime now = new DateTime().withZone(DateTimeZone.UTC);
int roundSeconds = ((now.getSecondOfMinute() / EMIT_TIMEFRAME) * EMIT_TIMEFRAME) + EMIT_TIMEFRAME;
DateTime startAt = now.minuteOfHour().roundFloorCopy().plusSeconds(roundSeconds);
this.timer = new Timer();
this.timer.scheduleAtFixedRate(new EmitTask(counts, collector), startAt.toDate(), EMIT_TIMEFRAME * 1000L);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("timestamp", "word", "count"));
}
@Override
public void cleanup() {
this.timer.cancel();
this.timer.purge();
}
class EmitTask extends TimerTask {
private final Map<String, Integer> counts;
private final OutputCollector collector;
public EmitTask(Map<String, Integer> counts, OutputCollector collector) {
this.counts = counts;
this.collector = collector;
}
@Override
public void run() {
// create snapshot
Map<String, Integer> snapshot;
synchronized (this.counts) {
snapshot = new HashMap<String, Integer>(this.counts);
this.counts.clear();
}
long currentTime = System.currentTimeMillis();
Set<String> keys = snapshot.keySet();
for (String word : keys) {
Integer count = snapshot.get(word);
this.collector.emit(new Values(currentTime, word, count));
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment