Last active
December 23, 2015 04:08
-
-
Save elvanja/6577832 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.testfu.storm; | |
import backtype.storm.Config; | |
import backtype.storm.LocalCluster; | |
import backtype.storm.LocalDRPC; | |
import backtype.storm.StormSubmitter; | |
import backtype.storm.generated.StormTopology; | |
import backtype.storm.tuple.Fields; | |
import storm.trident.TridentState; | |
import storm.trident.TridentTopology; | |
import storm.trident.operation.builtin.FilterNull; | |
import storm.trident.operation.builtin.Sum; | |
import storm.trident.operation.builtin.TupleCollectionGet; | |
import storm.trident.testing.MemoryMapState; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
/** | |
* Creates a single stream with all metrics. | |
* State queries extract and group the needed data. | |
*/ | |
public class CombinedTopology { | |
public static final String MSG_COUNT_FIELD_NAME = "msgCountTotal"; | |
public static StormTopology buildTopology(LocalDRPC drpc) { | |
TridentTopology topology = new TridentTopology(); | |
TridentState messageState = buildMessageState(topology); | |
buildAllQuery(drpc, topology, messageState); | |
buildByGatewaysQuery(drpc, topology, messageState); | |
buildByChannelsQuery(drpc, topology, messageState); | |
buildByStatusesQuery(drpc, topology, messageState); | |
return topology.build(); | |
} | |
private static TridentState buildMessageState(TridentTopology topology) { | |
return topology.newStream("messageState", new RandomMessageSpout()) | |
.groupBy(new Fields("statusId", "smsChannelId", "gatewayId")) | |
.persistentAggregate(new MemoryMapState.Factory(), new Fields("msgCount"), new Sum(), new Fields(MSG_COUNT_FIELD_NAME)); | |
} | |
private static void buildAllQuery(LocalDRPC drpc, TridentTopology topology, TridentState messageState) { | |
buildQueryFor(drpc, topology, messageState, "total"); | |
} | |
private static void buildByGatewaysQuery(LocalDRPC drpc, TridentTopology topology, TridentState messageState) { | |
buildQueryFor(drpc, topology, messageState, "byGateway", "gatewayId"); | |
} | |
private static void buildByChannelsQuery(LocalDRPC drpc, TridentTopology topology, TridentState messageState) { | |
buildQueryFor(drpc, topology, messageState, "byChannel", "smsChannelId"); | |
} | |
private static void buildByStatusesQuery(LocalDRPC drpc, TridentTopology topology, TridentState messageState) { | |
buildQueryFor(drpc, topology, messageState, "byStatus", "statusId"); | |
} | |
private static void buildQueryFor(LocalDRPC drpc, TridentTopology topology, TridentState messageState, String name, String... groupBy) { | |
List<String> allFields = new ArrayList<String>(Arrays.asList(groupBy)); | |
allFields.add(MSG_COUNT_FIELD_NAME); | |
topology.newDRPCStream(name, drpc) | |
.stateQuery(messageState, new TupleCollectionGet(), new Fields(allFields)) | |
.each(new Fields(MSG_COUNT_FIELD_NAME), new FilterNull()) | |
.groupBy(new Fields(groupBy)) | |
.aggregate(new Fields(MSG_COUNT_FIELD_NAME), new Sum(), new Fields("total")) | |
; | |
} | |
public static void main(String[] args) throws Exception { | |
Config conf = new Config(); | |
conf.setMaxSpoutPending(20); | |
if (args.length == 0) { | |
LocalDRPC drpc = new LocalDRPC(); | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology("combined", conf, buildTopology(drpc)); | |
for (int i=0; i < 100; i++) { | |
// execute topology state query, passing in the name of the drpc stream | |
System.out.println("total: " + drpc.execute("total", "")); | |
System.out.println("all gateways: " + drpc.execute("byGateway", "")); | |
System.out.println("all channels: " + drpc.execute("byChannel", "")); | |
System.out.println("all statuses: " + drpc.execute("byStatus", "")); | |
Thread.sleep(1000); | |
} | |
} | |
else { | |
conf.setNumWorkers(3); | |
StormSubmitter.submitTopology(args[0], conf, buildTopology(null)); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.testfu.storm; | |
import backtype.storm.Config; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
import org.jgroups.util.Util; | |
import storm.trident.operation.TridentCollector; | |
import storm.trident.spout.IBatchSpout; | |
import java.util.Date; | |
import java.util.Map; | |
import java.util.Random; | |
/** | |
* When implemented as BaseRichSpout, it caused issues with Util.sleep, the data was emitted but not persisted. | |
* Not an issue when directly implementing IBatchSpout. | |
*/ | |
public class RandomMessageSpout implements IBatchSpout { | |
public static final int MAX_MESSAGE_PARTS = 3; | |
Random randomGenerator; | |
@Override | |
public void open(Map map, TopologyContext topologyContext) { | |
this.randomGenerator = new Random(); | |
} | |
@Override | |
public void emitBatch(long batchId, TridentCollector collector) { | |
Util.sleep(100); | |
collector.emit(new Values( | |
new Date(), | |
randomChannel(), | |
randomGateway(), | |
randomMessageParts(), | |
randomStatus() | |
)); | |
} | |
@Override | |
public void ack(long batchId) { | |
} | |
@Override | |
public void close() { | |
} | |
@Override | |
public Map getComponentConfiguration() { | |
Config conf = new Config(); | |
conf.setMaxTaskParallelism(1); | |
return conf; | |
} | |
@Override | |
public Fields getOutputFields() { | |
return new Fields( | |
"sentDateTime", | |
"smsChannelId", | |
"gatewayId", | |
"messageParts", | |
"statusId" | |
); | |
} | |
private int randomStatus() { | |
int[] statuses = new int[] { 1, 2, 3, 4, 5 }; | |
return statuses[randomGenerator.nextInt(statuses.length)]; | |
} | |
private int randomChannel() { | |
int[] channels = new int[] { 101, 102, 103, 104, 105 }; | |
return channels[randomGenerator.nextInt(channels.length)]; | |
} | |
private int randomGateway() { | |
int[] gateways = new int[] { 10, 20, 30, 40, 50 }; | |
return gateways[randomGenerator.nextInt(gateways.length)]; | |
} | |
private int randomMessageParts() { | |
return randomGenerator.nextInt(MAX_MESSAGE_PARTS) + 1; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.testfu.storm; | |
import backtype.storm.Config; | |
import backtype.storm.LocalCluster; | |
import backtype.storm.LocalDRPC; | |
import backtype.storm.StormSubmitter; | |
import backtype.storm.generated.StormTopology; | |
import backtype.storm.tuple.Fields; | |
import storm.trident.TridentState; | |
import storm.trident.TridentTopology; | |
import storm.trident.operation.builtin.FilterNull; | |
import storm.trident.operation.builtin.Sum; | |
import storm.trident.operation.builtin.TupleCollectionGet; | |
import storm.trident.testing.MemoryMapState; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
/** | |
* Creates a separate stream and query state for each metric. | |
* Uses the same spout as the data source. | |
*/ | |
public class SeparateTopology { | |
public static final String MSG_COUNT_FIELD_NAME = "msgCountTotal"; | |
public static StormTopology buildTopology(LocalDRPC drpc) { | |
TridentTopology topology = new TridentTopology(); | |
RandomMessageSpout randomMessageSpout = new RandomMessageSpout(); | |
buildAllQuery(drpc, topology, randomMessageSpout); | |
buildByGatewayQuery(drpc, topology, randomMessageSpout); | |
buildByChannelQuery(drpc, topology, randomMessageSpout); | |
buildByStatusesQuery(drpc, topology, randomMessageSpout); | |
return topology.build(); | |
} | |
private static void buildAllQuery(LocalDRPC drpc, TridentTopology topology, RandomMessageSpout randomMessageSpout) { | |
buildQueryFor(drpc, topology, randomMessageSpout, "total"); | |
} | |
private static void buildByGatewayQuery(LocalDRPC drpc, TridentTopology topology, RandomMessageSpout randomMessageSpout) { | |
buildQueryFor(drpc, topology, randomMessageSpout, "byGateway", "gatewayId"); | |
} | |
private static void buildByChannelQuery(LocalDRPC drpc, TridentTopology topology, RandomMessageSpout randomMessageSpout) { | |
buildQueryFor(drpc, topology, randomMessageSpout, "byChannel", "smsChannelId"); | |
} | |
private static void buildByStatusesQuery(LocalDRPC drpc, TridentTopology topology, RandomMessageSpout randomMessageSpout) { | |
buildQueryFor(drpc, topology, randomMessageSpout, "byStatus", "statusId"); | |
} | |
private static void buildQueryFor(LocalDRPC drpc, TridentTopology topology, RandomMessageSpout randomMessageSpout, String name, String... groupBy) { | |
TridentState queryState = | |
topology.newStream(name + "Stream", randomMessageSpout) | |
.groupBy(new Fields(groupBy)) | |
.persistentAggregate(new MemoryMapState.Factory(), new Fields("msgCount"), new Sum(), new Fields(MSG_COUNT_FIELD_NAME)) | |
; | |
List<String> allFields = new ArrayList<String>(Arrays.asList(groupBy)); | |
allFields.add(MSG_COUNT_FIELD_NAME); | |
topology.newDRPCStream(name, drpc) | |
.stateQuery(queryState, new TupleCollectionGet(), new Fields(allFields)) | |
.each(new Fields(MSG_COUNT_FIELD_NAME), new FilterNull()) | |
.groupBy(new Fields(groupBy)) | |
.aggregate(new Fields(MSG_COUNT_FIELD_NAME), new Sum(), new Fields("total")) | |
; | |
} | |
public static void main(String[] args) throws Exception { | |
Config conf = new Config(); | |
conf.setMaxSpoutPending(20); | |
if (args.length == 0) { | |
LocalDRPC drpc = new LocalDRPC(); | |
LocalCluster cluster = new LocalCluster(); | |
cluster.submitTopology("separate", conf, buildTopology(drpc)); | |
for (int i = 0; i < 100; i++) { | |
// execute topology state query, passing in the name of the drpc stream | |
System.out.println("total: " + drpc.execute("total", "")); | |
System.out.println("all gateways: " + drpc.execute("byGateway", "")); | |
System.out.println("all channels: " + drpc.execute("byChannel", "")); | |
System.out.println("all statuses: " + drpc.execute("byStatus", "")); | |
Thread.sleep(1000); | |
} | |
} | |
else { | |
conf.setNumWorkers(3); | |
StormSubmitter.submitTopology(args[0], conf, buildTopology(null)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment