This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// These are my two event classes... | |
case class Event1(id: Int, eventStartTime: DateTime, entityRoles: EntityRoles) | |
object Event1 { | |
override def toString = "ev1" | |
implicit val isTimeStamped: Timestamped[Event1] = | |
Timestamped.from(_.eventStartTime) | |
implicit val isEntityRoleAware: EntityRoleAware[Event1] = | |
EntityRoleAware.from(_.entityRoles.roles) | |
implicit val hasOrdering: Ordering[Event1] = | |
Ordering.by(e => (e.eventStartTime, e.entityRoles)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def lag[A: Ordering : Timestamped : EntityAware] | |
(events: TypedPipe[A], | |
offset: Int): TypedPipe[(A, A)] = { | |
events | |
.groupByEntity | |
.sortBy(_.timestamp) | |
.mapValueStream { | |
events => { | |
events.sliding(2,offset) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* The lag function is an analytic function that lets | |
* you query more than one event at a time. | |
* | |
* Finds events associated with previous events of the same type. | |
* For each item in the `events` pipe, find all `events` which occurred `offset` time steps | |
* in the past | |
* | |
* @param events Input data | |
* @param offset Time lag | |
* @return TypedPipe of Tupels (E[t-offset],E[t]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package au.com.cba.omnia.eventually.ops | |
import au.com.cba.omnia.eventually.schema.{Entity, EntityAware} | |
import SelectEntitySyntax._ | |
import com.twitter.scalding.typed.{ TypedPipe, Grouped } | |
import scala.reflect.ClassTag | |
trait GroupByEntity[C] { |
NewerOlder