Last active
March 7, 2016 06:42
-
-
Save palmerabollo/0869d520a612d7c4ed08 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.Map; | |
| import org.elasticsearch.storm.EsBolt; | |
| import backtype.storm.task.IOutputCollector; | |
| import backtype.storm.task.OutputCollector; | |
| import backtype.storm.task.TopologyContext; | |
| import backtype.storm.tuple.Tuple; | |
| public class ExtendedESBolt extends EsBolt { | |
| final static class AckOutputCollector extends OutputCollector { | |
| public AckOutputCollector(IOutputCollector delegate) { | |
| super(delegate); | |
| } | |
| @Override | |
| public void fail(Tuple input) { | |
| // Ignore failures during ES indexing | |
| ack(input); | |
| } | |
| } | |
| public ExtendedESBolt(String target) { | |
| super(target); | |
| } | |
| public ExtendedESBolt(String target, boolean writeAck) { | |
| super(target, writeAck); | |
| } | |
| public ExtendedESBolt(String target, Map configuration) { | |
| super(target, configuration); | |
| } | |
| @Override | |
| public void prepare(Map conf, TopologyContext context, OutputCollector collector) { | |
| // Decorate the collector to ACK everything | |
| AckOutputCollector mycollector = new AckOutputCollector(collector); | |
| super.prepare(conf, context, mycollector); | |
| } | |
| @Override | |
| public void execute(Tuple input) { | |
| try { | |
| super.execute(input); | |
| } catch (Exception e) { | |
| // TODO do something with the Tuple | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@OverRide
public void execute(Tuple input) {
try {
super.execute(input);
} catch (Exception e) {
// TODO do something with the Tuple
String outputData = input.getString(0);
I am not able to get the contents of the tuple . It just throws an exception like Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
at backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) ~[storm-core-0.10.0.jar:0.10.0]