Skip to content

Instantly share code, notes, and snippets.

@nalingarg2
Last active January 7, 2020 06:40
Show Gist options
  • Save nalingarg2/eb852f77193325202fa4 to your computer and use it in GitHub Desktop.
Save nalingarg2/eb852f77193325202fa4 to your computer and use it in GitHub Desktop.
kafka with Spark Streaming
#
# Hadoop:: KafkaSpark
# Recipe:: Kafka and Spark
#
# Copyright (C) 2015 Cloudwick labs
# Contact :: [email protected]
# All rights reserved - Do Not Redistribute
#
#One machine is required as below mentioned steps are for POC purpose only.
#install scala
wget http://downloads.typesafe.com/scala/2.11.6/scala-2.11.6.tgz
tar xvf scala-2.11.6.tgz
sudo mv scala-2.11.6 /usr/lib
sudo ln -s /usr/lib/scala-2.11.6 /usr/lib/scala
export PATH=$PATH:/usr/lib/scala/bin (could add to /etc/profile.d/)
scala -version
#java installation
wget --no-check-certificate \ --no-cookies \ --header "Cookie: oraclelicense=accept-securebackup-cookie" \ http://download.oracle.com/otn-pub/java/jdk/7u45-b18/jdk-7u45-linux-x64.rpm \ -O jdk-7u45-linux-x64.rpm
rpm -ivh jdk-7u45-linux-x64.rpm
# update the installed java as the latest version using alternatives
alternatives --install /usr/bin/java java /usr/java/jdk1.7.0_45/bin/java 200000
#install git
yum -y install git
#install spark
wget http://apache.mirrors.hoobly.com/spark/spark-1.3.0/spark-1.3.0.tgz
tar -xzvf spark-1.3.0.tgz
cd spark-1.3.0
sbt update
sbt package
#install sbt
curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum -y install sbt
#zookeeper installation(standalaone)
#Download
wget http://mirror.tcpdiag.net/apache/zookeeper/stable/zookeeper-3.4.6.tar.gz
tar -xzvf zookeeper-3.4.6.tar.gz
nano conf/zoo.cfg
#add below content
tickTime=2000
dataDir=/var/zookeeper
clientPort=2181
#start zookeeper
bin/zkServer.sh start
#Smoke test: connection to zookeeper
bin/zkCli.sh 127.0.0.1:2181
#Kafka installation
#make sure you have right version of scala
#make sure to Download "binary version" of Kafka for Scala Version
#Below is for scala 2.11.6 and kafka 0.8.2.1
wget http://mirror.olnevhost.net/pub/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
tar -xzvf kafka_2.11-0.8.2.1.tgz
cd kafka_2.11-0.8.2.1
sbt update
sbt package
sbt assembly-package-dependency (Dont worry if, this fails)
#change the dat dir in config/zookeeper.conf
dataDir=/var/zookeeper
#also change port from 2181 to 2182(anything else)
#start the kafka server
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
#create Topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1
#Test
bin/kafka-list-topic.sh --zookeeper localhost:2181
#run Producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
#Start Consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
#bin/spark-shell
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
#example
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.streaming
import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
* <zkQuorum> is a list of one or more zookeeper servers that make quorum
* <group> is the name of kafka consumer group
* <topics> is a list of one or more kafka topics to consume from
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
* `$ bin/run-example \
* org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
* my-consumer-group topic1,topic2 1`
*/
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
// Produces some random words between 1 and 100.
object KafkaWordCountProducer {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
"<messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
// Zookeeper connection properties
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
// Send some messages
while(true) {
val messages = (1 to messagesPerSec.toInt).map { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
.mkString(" ")
new KeyedMessage[String, String](topic, str)
}.toArray
producer.send(messages: _*)
Thread.sleep(100)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment