Skip to content

Instantly share code, notes, and snippets.

@allwefantasy
Created June 6, 2016 03:54
Show Gist options
  • Save allwefantasy/7303c21c57c0d8864892ffca8547cd9a to your computer and use it in GitHub Desktop.
Save allwefantasy/7303c21c57c0d8864892ffca8547cd9a to your computer and use it in GitHub Desktop.
import _root_.kafka.serializer.StringDecoder
import com.stuq.nginx.parser.NginxParser
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
/**
* 4/25/16 WilliamZhu([email protected])
*/
object ExampleMapWithState {
def main(args: Array[String]) = {
val conf = new SparkConf().setAppName("测试Streaming应用")
val isDebug = true
val duration = 5
if (isDebug) {
conf.setMaster("local[2]")
}
def createContext = {
val ssc = new StreamingContext(conf, Seconds(duration))
val input = if (isDebug) new TestInputStream[String](ssc, Mock.items, 1)
else {
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
Map("metadata.broker.list" -> "broker1"),
Set("topic")
).map(f => f._2)
}
val result = input.map { nginxLogLine =>
val items = NginxParser.parse(nginxLogLine)
items(2).split("/")(2)
}
/*
参看: https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html
*/
def trackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val output = (key, sum)
state.update(sum)
Some(output)
}
val initialRDD = ssc.sparkContext.parallelize(List(("abc", 100L), ("bbc", 32L)))
val stateSpec = StateSpec.function(trackStateFunc _)
.initialState(initialRDD)
.numPartitions(2)
.timeout(Seconds(60))
//首先必须是个key-value ,从mapWithState 的名字可以看得出来
val newResult = result.map(domain => (domain, 1)).mapWithState(stateSpec)
val stateSnapshotStream = newResult.stateSnapshots()
stateSnapshotStream.foreachRDD { rdd =>
rdd.foreach(k => println(s"计数:${k._1} -> ${k._2}"))
}
result.foreachRDD { rdd =>
rdd.foreach(line => println(line))
}
ssc.checkpoint("file:///tmp/mapwithstate/")
ssc
}
val ssc = StreamingContext.getActiveOrCreate("file:///tmp/mapwithstate/", () => createContext)
ssc.start()
ssc.awaitTermination()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment