Skip to content

Instantly share code, notes, and snippets.

@AndreasKostler
Created May 27, 2015 09:20
Show Gist options
  • Save AndreasKostler/08d6907d9271cdffd6c3 to your computer and use it in GitHub Desktop.
Save AndreasKostler/08d6907d9271cdffd6c3 to your computer and use it in GitHub Desktop.
def lag[A: Ordering : Timestamped : EntityAware]
(events: TypedPipe[A],
offset: Int): TypedPipe[(A, A)] = {
events
.groupByEntity
.sortBy(_.timestamp)
.mapValueStream {
events => {
events.sliding(2,offset)
}
}
.values
.map { case Seq(a,b) => (a,b) }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment