Created
February 16, 2018 06:29
-
-
Save pRoy24/e91f8e34b3f9f8cb563379afde63b8ef to your computer and use it in GitHub Desktop.
Creating a data processor configuration for Spark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Conf File for Kafka | |
public static void main(String[] args) throws Exception { | |
//read config file | |
Properties prop = PropertyFileReader.readPropertyFile(); | |
String zookeeper = prop.getProperty("com.iot.app.kafka.zookeeper"); | |
String brokerList = prop.getProperty("com.iot.app.kafka.brokerlist"); | |
String topic = prop.getProperty("com.iot.app.kafka.topic"); | |
logger.info("Using Zookeeper=" + zookeeper + " ,Broker-list=" + brokerList + " and topic " + topic); | |
// set producer properties | |
Properties properties = new Properties(); | |
properties.put("zookeeper.connect", zookeeper); | |
properties.put("metadata.broker.list", brokerList); | |
properties.put("request.required.acks", "1"); | |
properties.put("serializer.class", "com.iot.app.kafka.util.IoTDataEncoder"); | |
//generate event | |
Producer<String, IoTData> producer = new Producer<String, IoTData>(new ProducerConfig(properties)); | |
IoTDataProducer iotProducer = new IoTDataProducer(); | |
iotProducer.generateIoTEvent(producer,topic); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Code sourced from https://github.com/pRoy24/tangled-economy | |
//read Spark and Cassandra properties and create SparkConf | |
Properties prop = PropertyFileReader.readPropertyFile(); | |
SparkConf conf = new SparkConf() | |
.setAppName(prop.getProperty("com.iot.app.spark.app.name")) | |
.setMaster(prop.getProperty("com.iot.app.spark.master")) | |
.set("spark.cassandra.connection.host", prop.getProperty("com.iot.app.cassandra.host")) | |
.set("spark.cassandra.connection.port", prop.getProperty("com.iot.app.cassandra.port")) | |
.set("spark.cassandra.connection.keep_alive_ms", prop.getProperty("com.iot.app.cassandra.keep_alive")); | |
//batch interval of 5 seconds for incoming stream | |
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); | |
//add check point directory | |
jssc.checkpoint(prop.getProperty("com.iot.app.spark.checkpoint.dir")); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment