Created
May 26, 2015 16:57
-
-
Save AndreasKostler/82c2e99e84984aa7743f to your computer and use it in GitHub Desktop.
lag function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* The lag function is an analytic function that lets | |
* you query more than one event at a time. | |
* | |
* Finds events associated with previous events of the same type. | |
* For each item in the `events` pipe, find all `events` which occurred `offset` time steps | |
* in the past | |
* | |
* @param events Input data | |
* @param offset Time lag | |
* @return TypedPipe of Tupels (E[t-offset],E[t]) | |
*/ | |
def lag[A: Ordering : Timestamped : EntityAware] | |
(events: TypedPipe[A], | |
offset: Int): TypedPipe[(A, A)] = { | |
events | |
.groupByEntity | |
.sortBy(_.timestamp) | |
.mapValueStream { | |
events => { | |
val (it1,it2) = events.duplicate | |
it1.zip(it2.drop(offset)) | |
} | |
} | |
.values | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment