Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active January 28, 2016 22:55
Show Gist options
  • Select an option

  • Save alexandru/b63c5dcaba0665422b90 to your computer and use it in GitHub Desktop.

Select an option

Save alexandru/b63c5dcaba0665422b90 to your computer and use it in GitHub Desktop.
import scala.concurrent.duration.FiniteDuration
import monix.streams._
def throttleX[T](source: Observable[T], timespan: FiniteDuration): Observable[T] = {
val timespanMillis = timespan.toMillis
sealed trait State
case object StartOrDebounce extends State
case class Emit(startSignal: T, delayUntil: Long) extends State
case class Delay(lastSignal: T, delayUntil: Long) extends State
case class EmitExtra(signal: T) extends State
object CoolDown {
def unapply(ref: State): Option[Long] =
ref match {
case Emit(_, delayUntil) => Some(delayUntil)
case Delay(_, delayUntil) => Some(delayUntil)
case _ => None
}
}
val throttleFirst = source.scan(StartOrDebounce : State) { (state, signal) =>
state match {
case StartOrDebounce =>
val now = System.currentTimeMillis() + timespan.toMillis
Emit(signal, now)
case CoolDown(delayUntil) =>
val now = System.currentTimeMillis()
if (delayUntil > now)
Delay(signal, delayUntil)
else
Emit(signal, now + timespanMillis)
}
}
val endOfDelay = throttleFirst
.collect { case Emit(_, _) => StartOrDebounce }
.debounce(timespan)
val merged = Observable.merge(throttleFirst, endOfDelay)
.scan(StartOrDebounce : State) { (lastState, currentState) =>
currentState match {
case StartOrDebounce =>
lastState match {
case Delay(lastSignal, _) =>
EmitExtra(lastSignal)
case _ =>
StartOrDebounce
}
case other =>
other
}
}
merged.collect {
case Emit(signal, _) => signal
case EmitExtra(signal) => signal
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment