Skip to content

Instantly share code, notes, and snippets.

@nambrot
Created December 2, 2017 23:33
Show Gist options
  • Save nambrot/db47bc9a005b5c7501c648f64337b367 to your computer and use it in GitHub Desktop.
Save nambrot/db47bc9a005b5c7501c648f64337b367 to your computer and use it in GitHub Desktop.
val eventsByWindowAndUser = Flow[Event]
// windowing the stream by creating WindowCommands
.statefulMapConcat { () =>
val generator = new CommandGenerator()
ev => generator.forEvent(ev, 10.seconds, 1.second)
}
// Group by the windows
.groupBy(164, _.w)
// "Finish" the each window group when we detect a close window
.takeWhile(!_.isInstanceOf[CloseWindow])
// Only collect the events with the windows
.collect { case AddToWindow(e, w) => (w, e) }
// Further group the group of streams by the user id and aggregate the events
.groupBy(64, _._2.userId)
.fold(List[Event]()) { case (agg, ev) => ev._2 :: agg }
// Merge back the sub streams
.mergeSubstreams
.async
.mergeSubstreams
.async
val complexAlerts =
Flow[List[Event]]
.filter((list) =>
list.exists((e) => e.isInstanceOf[BloodPressureEvent] && e.asInstanceOf[BloodPressureEvent].systolic < 100) &&
list.exists((e) => e.isInstanceOf[HeartRateEvent] && e.asInstanceOf[HeartRateEvent].heartRate > 100)
).map { case (event :: _) => (event.userId, s"User ${event.userId} has a complex alert") }
...
merger ~> eventsByWindowAndUser ~> complexAlerts ~> Sink.foreach(println)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment