Created
September 19, 2020 16:20
-
-
Save MafaldaLandeiro/33d6c324ef3a434414935314e5fcc376 to your computer and use it in GitHub Desktop.
Aggregating Messages Bolt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.ApacheStormTopologyJava.bolt; | |
| import org.apache.storm.task.OutputCollector; | |
| import org.apache.storm.task.TopologyContext; | |
| import org.apache.storm.topology.OutputFieldsDeclarer; | |
| import org.apache.storm.topology.base.BaseWindowedBolt; | |
| import org.apache.storm.tuple.Fields; | |
| import org.apache.storm.tuple.Tuple; | |
| import org.apache.storm.tuple.Values; | |
| import org.apache.storm.windowing.TupleWindow; | |
| import java.util.Comparator; | |
| import java.util.List; | |
| import java.util.Map; | |
| import java.util.stream.Collectors; | |
| public class AggregatingMessagesBolt extends BaseWindowedBolt { | |
| private OutputCollector outputCollector; | |
| @Override | |
| public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { | |
| this.outputCollector = collector; | |
| } | |
| @Override | |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
| declarer.declare(new Fields("randomMessagesConcatenate")); | |
| } | |
| @Override | |
| public void execute(TupleWindow tupleWindow) { | |
| List<Tuple> tuples = tupleWindow.get(); | |
| tuples.sort(Comparator.comparing(this::getTimestamp)); | |
| String randomMessagesConcatenate = tuples.stream() | |
| .map(tuple -> tuple.getStringByField("randomMessageFiltered")) | |
| .collect(Collectors.joining(" ")); | |
| Values values = new Values(randomMessagesConcatenate); | |
| outputCollector.emit(values); | |
| } | |
| private Long getTimestamp(Tuple tuple) { | |
| return tuple.getLongByField("timestamp"); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment