Skip to content

Instantly share code, notes, and snippets.

@akhld
Created December 11, 2014 11:13
Show Gist options
  • Save akhld/a59a2369f0f1f5509af4 to your computer and use it in GitHub Desktop.
Save akhld/a59a2369f0f1f5509af4 to your computer and use it in GitHub Desktop.
LowLevelKafkaConsumer
import consumer.kafka.client.KafkaReceiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by akhld on 11/12/14.
*/
object LowLevelKafkaConsumer {
def main(arg: Array[String]): Unit = {
import org.apache.log4j.Logger
import org.apache.log4j.Level
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
//Create SparkContext
val conf = new SparkConf()
.setMaster("spark://akhldz:7077")
.setAppName("LowLevelKafka")
.set("spark.executor.memory", "1g")
.set("spark.rdd.compress","true")
.set("spark.storage.memoryFraction", "1")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.unpersist", "true")
.set("spark.streaming.blockInterval", "200")
val sc = new SparkContext(conf)
sc.addJar("/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar")
sc.addJar("/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar")
sc.addJar("/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar")
sc.addJar("/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar")
sc.addJar("/home/akhld/benchmark/jar/target/scala-2.10/pubmatic_2.10-1.0.jar")
sc.addJar("/home/akhld/benchmark/kafka-spark-consumer/target/kafka-spark-consumer-0.0.1-SNAPSHOT-jar-with-dependencies.jar")
val ssc = new StreamingContext(sc, Seconds(10))
val topic = "partitions3"
val topics_map = Map(topic -> 10)
val zkhosts = "10.67.122.211"
val zkports = "2181"
val brokerPath = "/broker"
val kafkaProperties: Map[String, String] = Map("zookeeper.hosts" -> zkhosts, "zookeeper.port" -> zkports,
"zookeeper.broker.path" -> brokerPath , "kafka.topic" -> topic,
"zookeeper.consumer.connection" -> "localhost:2182", "zookeeper.consumer.path" -> "/spark-kafka", "kafka.consumer.id" -> "12345")
val props = new java.util.Properties()
kafkaProperties foreach { case (key,value) => props.put(key, value)}
val partitions = 3
val kafkaStreams = (1 to partitions).map { i=>
ssc.receiverStream(new KafkaReceiver(props, i))
}
val tmp_stream = ssc.union(kafkaStreams)
tmp_stream.foreachRDD(rdd => println("\n\nNumber of records in this batch : " + rdd.count()))
tmp_stream.print()
ssc.start()
ssc.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment