Created
May 1, 2017 18:55
-
-
Save tsusanto/d9e9c9ff06cffa355710c00c9ca360f9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.apache.spark.SparkConf | |
import org.apache.spark.streaming.{Seconds, StreamingContext} | |
import org.apache.spark.streaming.kafka010._ | |
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent | |
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe | |
/** | |
* Consumes messages from one or more topics in Kafka and does wordcount. | |
* export SPARK_KAFKA_VERSION=0.10 | |
* spark2-submit --files jaas.conf --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --class Spark2Kafka Spark2Kafka-assembly-1.0.jar | |
*/ | |
object Spark2Kafka { | |
def main(args: Array[String]) { | |
val kafkaParams = Map[String, Object]( | |
"bootstrap.servers" -> "broker1:9092,broker26:9092,broker3:9092", | |
"key.deserializer" -> classOf[StringDeserializer], | |
"security.protocol" -> "SASL_PLAINTEXT", | |
"sasl.mechanism" -> "PLAIN", | |
"value.deserializer" -> classOf[StringDeserializer], | |
"auto.offset.reset" -> "latest", | |
"enable.auto.commit" -> (false: java.lang.Boolean) | |
) | |
val sparkConf = new SparkConf().setAppName("Spark2_KafkaSASL") | |
val streamingContext = new StreamingContext(sparkConf, Seconds(5)) | |
val topics = Array("poslog") | |
val stream = KafkaUtils.createDirectStream[String, String]( | |
streamingContext, | |
PreferConsistent, | |
Subscribe[String, String](topics, kafkaParams) | |
) | |
// Get the lines, split them into words, count the words and print | |
stream.map(record => (record.key, record.value)) | |
stream.print() | |
// Start the computation | |
streamingContext.start() | |
streamingContext.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment