Created
January 25, 2016 00:55
-
-
Save yarikc/ebabf5130cc26f3956f3 to your computer and use it in GitHub Desktop.
SimpleKafka to Storm topology
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# reading log4j records with layout %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX}\u001F%p\u001F%m%n | |
# need to use message generator that adds sequence number to position 2 | |
import backtype.storm.Config; | |
import backtype.storm.LocalCluster; | |
import backtype.storm.spout.SchemeAsMultiScheme; | |
import backtype.storm.topology.BasicOutputCollector; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.TopologyBuilder; | |
import backtype.storm.topology.base.BaseBasicBolt; | |
import backtype.storm.tuple.Tuple; | |
import org.json.simple.JSONObject; | |
import storm.kafka.*; | |
import java.time.LocalDateTime; | |
import java.time.format.DateTimeFormatter; | |
import java.util.UUID; | |
public class StormMain { | |
public static class PrinterBolt extends BaseBasicBolt { | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
} | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
try { | |
String data = tuple.getString(0); | |
String[] tokens = data.split("\u001F"); | |
if (tokens.length < 4) { | |
System.out.println("Ignoring message: " + data); | |
return; | |
} | |
JSONObject jsonObject = new JSONObject(); | |
LocalDateTime t = LocalDateTime.parse(tokens[0], DateTimeFormatter.ISO_DATE_TIME); | |
jsonObject.put("date", t.format(DateTimeFormatter.ISO_DATE)); | |
jsonObject.put("hour", t.getHour()); | |
jsonObject.put("level", tokens[1]); | |
jsonObject.put("sequence", tokens[2]); | |
jsonObject.put("message", tokens[3]); | |
System.out.println( | |
DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()) + | |
" " + | |
Thread.currentThread().getName() + | |
" " + | |
jsonObject); | |
} catch (Throwable t) { | |
t.printStackTrace(); | |
} | |
} | |
} | |
public static void main(String[] args) { | |
BrokerHosts hosts = new ZkHosts("localhost:2181"); | |
SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test1", UUID.randomUUID().toString()); | |
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); | |
spoutConfig.startOffsetTime = -2; | |
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); | |
TopologyBuilder builder = new TopologyBuilder(); | |
builder.setSpout("kafka", kafkaSpout, 5); | |
builder.setBolt("print", new PrinterBolt(), 10).shuffleGrouping("kafka"); | |
Config config = new Config(); | |
config.setNumWorkers(5); | |
config.setMaxTaskParallelism(15); | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology("kafka", config, builder.createTopology()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment