Skip to content

Instantly share code, notes, and snippets.

@mannharleen
Created September 4, 2017 16:03
Show Gist options
  • Select an option

  • Save mannharleen/0d364f6db1d107e8a795be98aea72f26 to your computer and use it in GitHub Desktop.

Select an option

Save mannharleen/0d364f6db1d107e8a795be98aea72f26 to your computer and use it in GitHub Desktop.
2 Spark receivers to read from 1 kafka topic that has 2 partitions. Each receiver is mapped to read from a single partition
/* Pre steps
//start zookeper
> zookeeper-server-start.bat C:\kafka_2.11-0.11.0.0\config\zookeeper.properties
//start kafka server
> kafka-server-start.bat C:\kafka_2.11-0.11.0.0\config\server.properties
//create a topic with 2 partitions
> kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test-2partitions
//run a console based kafka producer
> kafka-console-producer.bat --broker-list localhost:9092 --topic test-2partitions
*/
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
val ssc = new StreamingContext(new SparkConf().setAppName("sbtapp").setMaster("local[4]"),Seconds(10))
ssc.sparkContext.setLogLevel("ERROR")
//create 2 receivers; each one reading from one partition
val streams = (1 to 2).map( i => KafkaUtils.createStream(ssc,"localhost:2181","test-group-1", Map("test-2partitions" -> i)))
val unionOfstreams = ssc.union(streams)
streams.foreach(stream => { stream.print} ) //print each stream
unionOfstreams.print //prints the union of the 2 streams
/* output
-------------------------------------------
Time: 1504538460000 ms
-------------------------------------------
(null,1)
(null,3)
(null,5)
-------------------------------------------
Time: 1504538460000 ms
-------------------------------------------
(null,2)
(null,4)
(null,6)
-------------------------------------------
Time: 1504538460000 ms
-------------------------------------------
(null,1)
(null,3)
(null,5)
(null,2)
(null,4)
(null,6)
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment