Skip to content

Instantly share code, notes, and snippets.

@yarikc
Created January 25, 2016 00:55
Show Gist options
  • Save yarikc/ebabf5130cc26f3956f3 to your computer and use it in GitHub Desktop.
Save yarikc/ebabf5130cc26f3956f3 to your computer and use it in GitHub Desktop.
SimpleKafka to Storm topology
# reading log4j records with layout %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX}\u001F%p\u001F%m%n
# need to use message generator that adds sequence number to position 2
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import org.json.simple.JSONObject;
import storm.kafka.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
public class StormMain {
public static class PrinterBolt extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
String data = tuple.getString(0);
String[] tokens = data.split("\u001F");
if (tokens.length < 4) {
System.out.println("Ignoring message: " + data);
return;
}
JSONObject jsonObject = new JSONObject();
LocalDateTime t = LocalDateTime.parse(tokens[0], DateTimeFormatter.ISO_DATE_TIME);
jsonObject.put("date", t.format(DateTimeFormatter.ISO_DATE));
jsonObject.put("hour", t.getHour());
jsonObject.put("level", tokens[1]);
jsonObject.put("sequence", tokens[2]);
jsonObject.put("message", tokens[3]);
System.out.println(
DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()) +
" " +
Thread.currentThread().getName() +
" " +
jsonObject);
} catch (Throwable t) {
t.printStackTrace();
}
}
}
public static void main(String[] args) {
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test1", UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime = -2;
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka", kafkaSpout, 5);
builder.setBolt("print", new PrinterBolt(), 10).shuffleGrouping("kafka");
Config config = new Config();
config.setNumWorkers(5);
config.setMaxTaskParallelism(15);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config, builder.createTopology());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment