Skip to content

Instantly share code, notes, and snippets.

// These are my two event classes...
case class Event1(id: Int, eventStartTime: DateTime, entityRoles: EntityRoles)
object Event1 {
override def toString = "ev1"
implicit val isTimeStamped: Timestamped[Event1] =
Timestamped.from(_.eventStartTime)
implicit val isEntityRoleAware: EntityRoleAware[Event1] =
EntityRoleAware.from(_.entityRoles.roles)
implicit val hasOrdering: Ordering[Event1] =
Ordering.by(e => (e.eventStartTime, e.entityRoles))
def lag[A: Ordering : Timestamped : EntityAware]
(events: TypedPipe[A],
offset: Int): TypedPipe[(A, A)] = {
events
.groupByEntity
.sortBy(_.timestamp)
.mapValueStream {
events => {
events.sliding(2,offset)
/* The lag function is an analytic function that lets
* you query more than one event at a time.
*
* Finds events associated with previous events of the same type.
* For each item in the `events` pipe, find all `events` which occurred `offset` time steps
* in the past
*
* @param events Input data
* @param offset Time lag
* @return TypedPipe of Tupels (E[t-offset],E[t])
package au.com.cba.omnia.eventually.ops
import au.com.cba.omnia.eventually.schema.{Entity, EntityAware}
import SelectEntitySyntax._
import com.twitter.scalding.typed.{ TypedPipe, Grouped }
import scala.reflect.ClassTag
trait GroupByEntity[C] {