Created
January 8, 2016 18:21
-
-
Save nmadhire/054ca8aa3f18e5526028 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.opensoc.topology.runner; | |
import backtype.storm.tuple.Tuple; | |
import com.opensoc.filters.GenericMessageFilter; | |
import com.opensoc.parser.interfaces.MessageParser; | |
import com.opensoc.parsing.AbstractParserBolt; | |
import com.opensoc.parsing.TelemetryParserBolt; | |
import com.opensoc.test.bolts.PrintingBolt; | |
import com.opensoc.test.spouts.GenericInternalTestSpout; | |
import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.SystemPropertyUtil; | |
import org.apache.flink.storm.util.BoltFileSink; | |
import org.apache.flink.storm.util.BoltPrintSink; | |
import org.apache.flink.storm.util.OutputFormatter; | |
import java.io.Serializable; | |
public class BroRunner extends TopologyRunner implements Serializable{ | |
static String test_file_path = "SampleInput/BroExampleOutput"; | |
@Override | |
public boolean initializeParsingBolt(String topology_name, | |
String name) { | |
try { | |
String messageUpstreamComponent = messageComponents.get(messageComponents.size()-1); | |
System.out.println("[OpenSOC] ------" + name + " is initializing from " + messageUpstreamComponent); | |
String class_name = config.getString("bolt.parser.adapter"); | |
if(class_name == null) | |
{ | |
System.out.println("[OpenSOC] Parser adapter not set. Please set bolt.indexing.adapter in topology.conf"); | |
throw new Exception("Parser adapter not set"); | |
} | |
/* | |
Class loaded_class = Class.forName(class_name); | |
MessageParser parser = (MessageParser) loaded_class.newInstance(); | |
AbstractParserBolt parser_bolt = new TelemetryParserBolt() | |
.withMessageParser(parser) | |
.withOutputFieldName(topology_name) | |
.withMessageFilter(new GenericMessageFilter()) | |
.withMetricConfig(config); | |
builder.setBolt(name, parser_bolt, | |
config.getInt("bolt.parser.parallelism.hint")) | |
.shuffleGrouping(messageUpstreamComponent) | |
.setNumTasks(config.getInt("bolt.parser.num.tasks")); | |
*/ | |
//builder.setBolt("Printing Bolt", new PrintingBolt(), 1).shuffleGrouping(name, "message"); | |
builder.setBolt("PrintBoltUsingFlinkClass", new BoltPrintSink(new OutputFormatter() { | |
public String format(Tuple tuple) { | |
return tuple.toString(); | |
} | |
}), 1).shuffleGrouping(messageUpstreamComponent); | |
builder.setBolt("Write to File", new BoltFileSink("/tmp/out.txt", new OutputFormatter() { | |
public String format(Tuple tuple) { | |
return tuple.toString(); | |
} | |
}), 1).shuffleGrouping(messageUpstreamComponent); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
System.exit(0); | |
} | |
return true; | |
} | |
@Override | |
public boolean initializeTestingSpout(String name) { | |
try { | |
System.out.println("[OpenSOC] Initializing Test Spout"); | |
GenericInternalTestSpout testSpout = new GenericInternalTestSpout() | |
.withFilename(test_file_path).withRepeating( | |
config.getBoolean("spout.test.parallelism.repeat")); | |
builder.setSpout(name, testSpout, | |
config.getInt("spout.test.parallelism.hint")).setNumTasks( | |
config.getInt("spout.test.num.tasks")); | |
System.out.println("Printing config:" + config.getInt("spout.test.parallelism.hint")); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
System.exit(0); | |
} | |
return true; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.opensoc.test.spouts; | |
import backtype.storm.spout.SpoutOutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.base.BaseRichSpout; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
import backtype.storm.utils.Utils; | |
import com.opensoc.test.filereaders.FileReader; | |
import java.io.IOException; | |
import java.util.List; | |
import java.util.Map; | |
public class GenericInternalTestSpout extends BaseRichSpout { | |
/** | |
* | |
*/ | |
private static final long serialVersionUID = -2379344923143372543L; | |
List<String> jsons; | |
private String _filename; | |
private int _delay = 100; | |
private boolean _repeating = true; | |
private SpoutOutputCollector _collector; | |
private FileReader Reader; | |
private int cnt = 0; | |
public GenericInternalTestSpout withFilename(String filename) | |
{ | |
_filename = filename; | |
return this; | |
} | |
public GenericInternalTestSpout withMilisecondDelay(int delay) | |
{ | |
_delay = delay; | |
return this; | |
} | |
public GenericInternalTestSpout withRepeating(boolean repeating) | |
{ | |
_repeating = repeating; | |
return this; | |
} | |
@SuppressWarnings("rawtypes") | |
public void open(Map conf, TopologyContext context, | |
SpoutOutputCollector collector) { | |
_collector = collector; | |
try { | |
Reader = new FileReader(); | |
jsons = Reader.readFromFile(_filename); | |
} catch (IOException e) | |
{ | |
System.out.println("Could not read sample JSONs"); | |
e.printStackTrace(); | |
} | |
} | |
public void nextTuple() { | |
Utils.sleep(_delay); | |
if(cnt < jsons.size()) | |
{ | |
_collector.emit(new Values(jsons.get(cnt))); | |
} | |
cnt ++; | |
if(_repeating && cnt == jsons.size() -1 ) | |
cnt = 0; | |
} | |
@Override | |
public void ack(Object id) { | |
} | |
@Override | |
public void fail(Object id) { | |
} | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("message")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment