Skip to content

Instantly share code, notes, and snippets.

@Hungsiro506
Forked from koen-dejonghe/SlidingWindow.scala
Created July 29, 2017 10:54
Show Gist options
  • Save Hungsiro506/3ebf01035ee2affdab68a30f1ed3307c to your computer and use it in GitHub Desktop.
Save Hungsiro506/3ebf01035ee2affdab68a30f1ed3307c to your computer and use it in GitHub Desktop.
case class SomeEvent(value: Long)
val events = Source
.tick(0 seconds, 250 millis, "")
.zipWithIndex
.map { case (_, l) =>
SomeEvent(l)
}
val group = Flow[SomeEvent].groupedWithin(100, 500 millis) // +/- 2 events per group
val slide = Flow[Seq[SomeEvent]].sliding(3, 1) // 3 groups per slide, step size = 1
val exe = events
.via(group)
.via(slide)
.map(_.flatten)
.runWith(Sink.foreach(println))
try Await.result(exe, 6 seconds)
catch { case _: Throwable => }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment