Skip to content

Instantly share code, notes, and snippets.

@samuela
Last active January 2, 2016 15:08
Show Gist options
  • Save samuela/8321140 to your computer and use it in GitHub Desktop.
Save samuela/8321140 to your computer and use it in GitHub Desktop.
def bufferByKey[T, K](o: Observable[T], keyFunc: T => K): Observable[Seq[T]] = {
type S = (Option[K], List[T], Option[List[T]])
o.scan[S](None, List(), None) { (lastState: S, newVal: T) =>
val (lastKey, acc, emit) = lastState
val nk = keyFunc(newVal)
lastKey match {
case None => (Some(nk), newVal :: acc, None)
case Some(k) =>
if (k == nk)
(Some(k), newVal :: acc, None)
else
(Some(nk), List(newVal), Some(acc))
}
} flatMap {
case (_, _, None) => Observable()
case (_, _, Some(acc)) => Observable(acc.reverse)
}
}
val ticks: Signal[Tick]
def ticksToBar(ts: Seq[Tick]): Bar = ...
val bars = bufferByKey(ticks, { tick => tick.timestamp.minute }) map ticksToBar
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment