Created
November 17, 2015 18:33
-
-
Save cjmatta/3a560a1738c485258fc4 to your computer and use it in GitHub Desktop.
TupleGenerator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package streamflow.spout.core; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.BoltDeclarer; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.base.BaseRichSpout; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
import backtype.storm.utils.Utils; | |
import com.google.inject.Inject; | |
import com.google.inject.name.Named; | |
import org.slf4j.Logger; | |
import streamflow.annotations.ComponentProperty; | |
import streamflow.annotations.Description; | |
import java.util.Map; | |
/** | |
* Created by cmatta on 11/17/15. | |
*/ | |
public class TupleGenerator extends BaseRichSpout { | |
private SpoutOutputCollector collector; | |
private Logger logger; | |
private Integer delay; | |
private String json; | |
private Boolean cool; | |
@ComponentProperty(label = "Delay", name = "jetstream_spout_core_tupleGenerator_delay", required = true, type = "text", defaultValue = "60") | |
@Description("Delay between emit.") | |
@Inject | |
public void setDelay(@Named("jetstream_spout_core_tupleGenerator_delay") String delay) { this.delay = Integer.parseInt(delay); } | |
@ComponentProperty(label = "JSON", name = "jetstream_spout_core_tupleGenerator_json", required = true, type = "text", defaultValue = "{ \\\"url\\\" : \\\"http://www.google.com\\\" }") | |
@Description("JSON") | |
@Inject | |
public void setJSON(@Named("jetstream_spout_core_tupleGenerator_json") String json) { this.json = json; } | |
@Override | |
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) { | |
logger.info("Spout Tuple Generator Initialized."); | |
this.collector = collector; | |
} | |
@Override | |
public void close() { | |
logger.info("Spout Tuple Generator Stopped."); | |
} | |
@Override | |
public void nextTuple() { | |
Utils.sleep(this.delay * 1000); | |
collector.emit(new Values(this.json)); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("json")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment