Created
March 20, 2017 06:24
-
-
Save dgadiraju/6059be215e014ca78ac9bf85b057f282 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by itversity on 17/03/17. | |
*/ | |
/* build.sbt | |
name := "retail" | |
version := "1.0" | |
scalaVersion := "2.10.6" | |
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-streaming-flume_2.10" % "1.6.2" | |
libraryDependencies += "org.apache.spark" % "spark-streaming-flume-sink_2.10" % "1.6.2" | |
*/ | |
/* spark submit command | |
// Build jar file, ship it to cluster, make sure jars files are available as specified under jars | |
// Then run spark-submit | |
spark-submit --class FlumeSparkWordCount \ | |
--master yarn \ | |
--conf spark.ui.port=22231 \ | |
--jars "/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming-flume_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/flume/lib/commons-lang3-3.5.jar,/usr/hdp/2.5.0.0-1245/spark/lib/spark-streaming-flume-sink_2.10-1.6.2.jar,/usr/hdp/2.5.0.0-1245/flume/lib/flume-ng-sdk-1.5.2.2.5.0.0-1245.jar" \ | |
retail_2.10-1.0.jar | |
*/ | |
import org.apache.spark.SparkConf | |
import org.apache.spark.streaming._ | |
import org.apache.spark.streaming.dstream.ReceiverInputDStream | |
import org.apache.spark.streaming.flume._ | |
object FlumeSparkWordCount { | |
def main(args: Array[String]): Unit = { | |
val batchInterval = Seconds(30) | |
val sparkConf = new SparkConf().setAppName("FlumePollingEventCount").setMaster("yarn-client") | |
val ssc = new StreamingContext(sparkConf, batchInterval) | |
val host = "gw01.itversity.com" | |
val port = 19999 | |
val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils. | |
createPollingStream(ssc, host, port) | |
// stream.map(e => new String(e.event.getBody.array())).print | |
stream.map(e => new String(e.event.getBody.array())). | |
filter(rec => rec.contains("GET /department/")). | |
map(rec => (rec.split(" ")(6).split("/")(2), 1)). | |
reduceByKey(_ + _). | |
print() | |
// val streamMap = stream.map(e => new String(e.event.getBody.array())) | |
// val streamFilter = streamMap. | |
// filter(rec => rec.contains("GET /department/")) | |
// val departmentMap = streamFilter. | |
// map(rec => (rec.split(" ")(6).split("/")(2), 1)) | |
// val countByDepartment: DStream[(String, Int)] = departmentMap. | |
// reduceByKey((agg, value) => agg + value) | |
// countByDepartment.map(rec => rec).print() | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment