Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save Hungsiro506/f5a61100e8a103fcb98e6c537db023ae to your computer and use it in GitHub Desktop.

Select an option

Save Hungsiro506/f5a61100e8a103fcb98e6c537db023ae to your computer and use it in GitHub Desktop.
I've just started looking around for a solution for stateful computation with Spark Streaming when I came across the updateStateByKey() function. The problem I'm trying to solve: 10,000 sensors produce a binary value every minute. If consecutive values a sensor reports are different from each other, I would like to flag that and send it down Kaf…
val sensorData: DStream[(String, Int)] = ???
val state = sensorData.updateStateByKey[(String, Int)](updateFunction _)
def updateFunction(newValues: Seq[(String, Int)], currentValues: Seq[(String, Int)]) = {
val newValuesMap = newValues.toMap
val currentValuesMap = currentValues.toMap
currentValuesMap.keys.foreach ( (id) =>
if(currrentValuesMap.get(id) != newValuesMap.getOrElse(id, -1)) {
//send to Kafka
}
)
Some(newValues)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment