Created
May 12, 2017 13:06
-
-
Save jamesrajendran/088d9f03483b683d01923ab157a692df to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.examples.streaming | |
import org.apache.spark.SparkConf | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.storage.StorageLevel | |
import org.apache.spark.streaming.{Seconds, StreamingContext, Time} | |
/** | |
* Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the | |
* network every second. | |
* | |
* Usage: SqlNetworkWordCount <hostname> <port> | |
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. | |
* | |
* To run this on your local machine, you need to first run a Netcat server | |
* `$ nc -lk 9999` | |
* and then run the example | |
* `$ bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount localhost 9999` | |
*/ | |
object SqlNetworkWordCount { | |
def main(args: Array[String]) { | |
if (args.length < 2) { | |
System.err.println("Usage: NetworkWordCount <hostname> <port>") | |
System.exit(1) | |
} | |
StreamingExamples.setStreamingLogLevels() | |
// Create the context with a 2 second batch size | |
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount") | |
val ssc = new StreamingContext(sparkConf, Seconds(2)) | |
// Create a socket stream on target ip:port and count the | |
// words in input stream of \n delimited text (eg. generated by 'nc') | |
// Note that no duplication in storage level only for running locally. | |
// Replication necessary in distributed scenario for fault tolerance. | |
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) | |
val words = lines.flatMap(_.split(" ")) | |
// Convert RDDs of the words DStream to DataFrame and run SQL query | |
words.foreachRDD { (rdd: RDD[String], time: Time) => | |
// Get the singleton instance of SparkSession | |
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) | |
import spark.implicits._ | |
// Convert RDD[String] to RDD[case class] to DataFrame | |
val wordsDataFrame = rdd.map(w => Record(w)).toDF() | |
// Creates a temporary view using the DataFrame | |
wordsDataFrame.createOrReplaceTempView("words") | |
// Do word count on table using SQL and print it | |
val wordCountsDataFrame = | |
spark.sql("select word, count(*) as total from words group by word") | |
println(s"========= $time =========") | |
wordCountsDataFrame.show() | |
} | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} | |
/** Case class for converting RDD to DataFrame */ | |
case class Record(word: String) | |
/** Lazily instantiated singleton instance of SparkSession */ | |
object SparkSessionSingleton { | |
@transient private var instance: SparkSession = _ | |
def getInstance(sparkConf: SparkConf): SparkSession = { | |
if (instance == null) { | |
instance = SparkSession | |
.builder | |
.config(sparkConf) | |
.getOrCreate() | |
} | |
instance | |
} | |
} | |
// scalastyle:on println | |
//courtesy: https://github.com/apache/spark/blob/v2.1.1/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment