Skip to content

Instantly share code, notes, and snippets.

@nambrot
Created December 2, 2017 23:29
Show Gist options
  • Select an option

  • Save nambrot/7c382120b26d6be0a31f95742e3abc33 to your computer and use it in GitHub Desktop.

Select an option

Save nambrot/7c382120b26d6be0a31f95742e3abc33 to your computer and use it in GitHub Desktop.
def reducer(
a: (List[BloodPressureEvent], List[HeartRateEvent]),
b: (List[BloodPressureEvent], List[HeartRateEvent])
) = (a, b) match {
case ((bp1, hr1), (bp2, h2)) => (bp1 ++ bp2, hr1 ++ h2)
}
val combinedStream =
bloodPressureStream
.map((event) => (event.userId, List(event)))
.fullOuterJoin(heartRateStream.map((event) => (event.userId, List(event))))
.map({
case (id, (Some(x), Some(y))) => (id, (x, y))
case (id, (_, Some(y))) => (id, (List(), y))
case (id, (Some(x), _)) => (id, (x, List()))
case (id, _) => (id, (List(), List()))
})
.reduceByKeyAndWindow(reducer(_, _), Minutes(60), Minutes(1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment