Skip to content

Instantly share code, notes, and snippets.

@nambrot
Created December 2, 2017 23:31
Show Gist options
  • Save nambrot/7c719f4215f35dd375701d7b8eb62c12 to your computer and use it in GitHub Desktop.
Save nambrot/7c719f4215f35dd375701d7b8eb62c12 to your computer and use it in GitHub Desktop.
def updateFunction(alertStrings: Seq[String], stateOption: Option[(Boolean, Date, String)]): Option[(Boolean, Date, String)] = {
(alertStrings, stateOption) match {
case (_, Some((_, triggerTime, string))) => {
val cal = Calendar.getInstance()
cal.add(Calendar.Minute, -60)
if (triggerTime.before(cal.getTime)) None else Some((false, triggerTime, string))
}
case (Nil, None) => None
case (Seq(string, _*), None) => Some((true, Calendar.getInstance().getTime(), string))
}
}
val alertState =
alertStream
.updateStateByKey(updateFunction)
.filter {
case (_, (bool, _, _)) => bool
}.map {
case (_, (_, _, string)) => string
}
alertState.print
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment