Created
December 2, 2017 23:33
-
-
Save nambrot/db47bc9a005b5c7501c648f64337b367 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val eventsByWindowAndUser = Flow[Event] | |
// windowing the stream by creating WindowCommands | |
.statefulMapConcat { () => | |
val generator = new CommandGenerator() | |
ev => generator.forEvent(ev, 10.seconds, 1.second) | |
} | |
// Group by the windows | |
.groupBy(164, _.w) | |
// "Finish" the each window group when we detect a close window | |
.takeWhile(!_.isInstanceOf[CloseWindow]) | |
// Only collect the events with the windows | |
.collect { case AddToWindow(e, w) => (w, e) } | |
// Further group the group of streams by the user id and aggregate the events | |
.groupBy(64, _._2.userId) | |
.fold(List[Event]()) { case (agg, ev) => ev._2 :: agg } | |
// Merge back the sub streams | |
.mergeSubstreams | |
.async | |
.mergeSubstreams | |
.async | |
val complexAlerts = | |
Flow[List[Event]] | |
.filter((list) => | |
list.exists((e) => e.isInstanceOf[BloodPressureEvent] && e.asInstanceOf[BloodPressureEvent].systolic < 100) && | |
list.exists((e) => e.isInstanceOf[HeartRateEvent] && e.asInstanceOf[HeartRateEvent].heartRate > 100) | |
).map { case (event :: _) => (event.userId, s"User ${event.userId} has a complex alert") } | |
... | |
merger ~> eventsByWindowAndUser ~> complexAlerts ~> Sink.foreach(println) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment