Skip to content

Instantly share code, notes, and snippets.

@jamesrajendran
Last active March 2, 2020 13:37
Show Gist options
  • Save jamesrajendran/c930e6487b47f0e9951fcff6b4f8c803 to your computer and use it in GitHub Desktop.
Save jamesrajendran/c930e6487b47f0e9951fcff6b4f8c803 to your computer and use it in GitHub Desktop.
Spark Streaming - quick tips
#Analysing from Kafka topic:
#This below script can be put in a scala script name.scala and run from spark-shell
#This can be created as a scala project as well, remove the comments, use the dependencies below in build.sbt and compile
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.sql._
//object FlumeToKafkaToSpark {
case class Word(word: String)
//def main(args: Array[String])
//{
println("hello testing");
val sparkConf = new SparkConf().setAppName("ftokafkatospark").setMaster("local[2]");
val topicSet = "ftokafkaTopicNew".split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> "localhost:9092")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc,kafkaParams,topicSet)
#To run as a windowed stream replace the above line with the below line
val messages = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc,kafkaParams,topicSet).window(Seconds(10),Seconds(5))
messages.foreach { rdd =>
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val wordDStream = rdd.map(x => Word(x._2)).toDF()
wordDStream.registerTempTable("word_table")
val wordCountResult = sqlContext.sql("select word,count(*) from word_table group by word")
wordCountResult.show()
}
ssc.start()
ssc.awaitTermination()
//countByDept.saveAsTextFiles(args(0))
// }
//}
==========================run as job using spark-submit===================
spark-submit --class FlumeToKafkaToSpark --master local[2] --jars "/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/spark-streaming-kafka_2.10-1.6.2.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/kafka_2.10-0.10.2.1.jar,/home/cloudera/Downloads/kafka_2.10-0.10.2.1/libs/metrics-core-2.2.0.jar" /home/cloudera/ftokafkatospark_2.10-1.0.jar
#if the result is too verbose, set spark/conf/log4j.properties to this:
log4j.rootCategory=WARN, console
#if the file is not there, do the below:
cp /etc/spark/conf/log4j.properties.template /etc/spark/conf/log4j.properties
==============================build.sbt===================================
name := "ftokafkatospark"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2"
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.10.2.1"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.2"
//libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.6.2"
//libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.10" % "1.6.2"
=============================Kafka source/producer script================================
import java.util.Properties;
//import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.*;
//import org.apache.kafka.clients.producer.ProducerRecord;
//import org.apache.kafka.clients.producer.RecordMetadata;
public class SimpleProducer {
public static void main(String[] args) throws Exception{
/* if (args.length < 1)
{
System.out.println("Please enter values for topic ");
return;
}
*/
String topicName = "ftokafkaTopicNew";
String key = null; //"Key1";
String value = "value"; //args[0];
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.236.130:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("username", "root");
props.put("password", "cloudera");
Producer<String, String> producer = new KafkaProducer(props);
// keep both the loops below while testing streaming
//keep the first loop but comment the second one and run the window with length 60sec and interval 5 seconds and run the producer a few times
//you will see the results going like 5 - 10 - 15 - 20 - 25 - 25 -25 -25 - 20 - 15 -10 -5 - -
for(int i = 0; i < 5; i++){
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), "Value1 "));
}
for(int j = 0; j < 10; j++){
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(j), "Value2 "));
}
// producer.flush();
// RecordMetadata metaData = producer.send(record).get();
producer.close();
System.out.println("SimpleProducer Completed.");
}
}
================================pom==========================
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
========================================================
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment