Skip to content

Instantly share code, notes, and snippets.

@AndreasKostler
Created May 26, 2015 16:57
Show Gist options
  • Save AndreasKostler/82c2e99e84984aa7743f to your computer and use it in GitHub Desktop.
Save AndreasKostler/82c2e99e84984aa7743f to your computer and use it in GitHub Desktop.
lag function
/* The lag function is an analytic function that lets
* you query more than one event at a time.
*
* Finds events associated with previous events of the same type.
* For each item in the `events` pipe, find all `events` which occurred `offset` time steps
* in the past
*
* @param events Input data
* @param offset Time lag
* @return TypedPipe of Tupels (E[t-offset],E[t])
*/
def lag[A: Ordering : Timestamped : EntityAware]
(events: TypedPipe[A],
offset: Int): TypedPipe[(A, A)] = {
events
.groupByEntity
.sortBy(_.timestamp)
.mapValueStream {
events => {
val (it1,it2) = events.duplicate
it1.zip(it2.drop(offset))
}
}
.values
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment