-
-
Save idris75/e4eeaa50f086b80c0afb056dfba8ea4e to your computer and use it in GitHub Desktop.
Spark Streaming - quick tips
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Analysing from Kafka topic: | |
#This below script can be put in a scala script name.scala and run from spark-shell | |
#This can be created as a scala project as well, remove the comments, use the dependencies below in build.sbt and compile | |
import kafka.serializer.StringDecoder | |
import org.apache.spark.streaming._ | |
import org.apache.spark.streaming.kafka._ | |
import org.apache.spark.SparkConf | |
import org.apache.spark.streaming.dstream.InputDStream | |
import org.apache.spark.sql._ | |
//object FlumeToKafkaToSpark { | |
case class Word(word: String) | |
//def main(args: Array[String]) | |
//{ | |
println("hello testing"); | |
val sparkConf = new SparkConf().setAppName("ftokafkatospark").setMaster("local[2]"); | |
val topicSet = "ftokafkaTopicNew".split(",").toSet | |
val kafkaParams = Map[String,String]("bootstrap.servers" -> "localhost:9092") | |
val ssc = new StreamingContext(sparkConf, Seconds(5)) | |
val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc,kafkaParams,topicSet) | |
#To run as a windowed stream replace the above line with the below line | |
val messages = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc,kafkaParams,topicSet).window(Seconds(10),Seconds(5)) | |
messages.foreach { rdd => | |
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) | |
import sqlContext.implicits._ | |
val wordDStream = rdd.map(x => Word(x._2)).toDF() | |
wordDStream.registerTempTable("word_table") | |
val wordCountResult = sqlContext.sql("select word,count(*) from word_table group by word") | |
wordCountResult.show() | |
} | |
ssc.start() | |
ssc.awaitTermination() | |
//countByDept.saveAsTextFiles(args(0)) | |
// } | |
//} | |
==========================run as job using spark-submit=================== | |
spark-submit --class FlumeToKafkaToSpark --master local[2] --jars "/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming-kafka_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/kafka_2.10-0.10.2.1.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/metrics-core-2.2.0.jar" /home/cloudera/ftokafkatospark_2.10-1.0.jar | |
#if the result is too verbose, set spark/conf/log4j.properties to this: | |
log4j.rootCategory=WARN, console | |
#if the file is not there, do the below: | |
cp /etc/spark/conf/log4j.properties.template /etc/spark/conf/log4j.properties | |
==============================build.sbt=================================== | |
name := "ftokafkatospark" | |
version := "1.0" | |
scalaVersion := "2.10.4" | |
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.2.1" | |
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.2" | |
//libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.6.2" | |
//libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.10" % "1.6.2" | |
=============================Kafka source/producer script================================ | |
import java.util.Properties; | |
//import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.*; | |
//import org.apache.kafka.clients.producer.ProducerRecord; | |
//import org.apache.kafka.clients.producer.RecordMetadata; | |
public class SimpleProducer { | |
public static void main(String[] args) throws Exception{ | |
/* if (args.length < 1) | |
{ | |
System.out.println("Please enter values for topic "); | |
return; | |
} | |
*/ | |
String topicName = "ftokafkaTopicNew"; | |
String key = null; //"Key1"; | |
String value = "value"; //args[0]; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "192.168.236.130:9092"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("username", "root"); | |
props.put("password", "cloudera"); | |
Producer<String, String> producer = new KafkaProducer(props); | |
// keep both the loops below while testing streaming | |
//keep the first loop but comment the second one and run the window with length 60sec and interval 5 seconds and run the producer a few times | |
//you will see the results going like 5 - 10 - 15 - 20 - 25 - 25 -25 -25 - 20 - 15 -10 -5 - - | |
for(int i = 0; i < 5; i++){ | |
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), "Value1 ")); | |
} | |
for(int j = 0; j < 10; j++){ | |
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(j), "Value2 ")); | |
} | |
// producer.flush(); | |
// RecordMetadata metaData = producer.send(record).get(); | |
producer.close(); | |
System.out.println("SimpleProducer Completed."); | |
} | |
} | |
================================pom========================== | |
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka_2.10</artifactId> | |
<version>0.10.2.1</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka-clients</artifactId> | |
<version>0.10.2.1</version> | |
</dependency> | |
======================================================== |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment