Skip to content

Instantly share code, notes, and snippets.

@MafaldaLandeiro
Created September 19, 2020 16:20
Show Gist options
  • Select an option

  • Save MafaldaLandeiro/33d6c324ef3a434414935314e5fcc376 to your computer and use it in GitHub Desktop.

Select an option

Save MafaldaLandeiro/33d6c324ef3a434414935314e5fcc376 to your computer and use it in GitHub Desktop.
Aggregating Messages Bolt
package org.ApacheStormTopologyJava.bolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.windowing.TupleWindow;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class AggregatingMessagesBolt extends BaseWindowedBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.outputCollector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("randomMessagesConcatenate"));
}
@Override
public void execute(TupleWindow tupleWindow) {
List<Tuple> tuples = tupleWindow.get();
tuples.sort(Comparator.comparing(this::getTimestamp));
String randomMessagesConcatenate = tuples.stream()
.map(tuple -> tuple.getStringByField("randomMessageFiltered"))
.collect(Collectors.joining(" "));
Values values = new Values(randomMessagesConcatenate);
outputCollector.emit(values);
}
private Long getTimestamp(Tuple tuple) {
return tuple.getLongByField("timestamp");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment