Skip to content

Instantly share code, notes, and snippets.

@alexandru
Created August 14, 2016 11:36
Show Gist options
  • Select an option

  • Save alexandru/c497180db4f4275196f1d73bc62b0cfa to your computer and use it in GitHub Desktop.

Select an option

Save alexandru/c497180db4f4275196f1d73bc62b0cfa to your computer and use it in GitHub Desktop.
import monix.reactive._
import monix.eval.{Callback, Task}
import monix.execution.Ack.{Continue, Stop}
import scala.util.control.NonFatal
def fromAsyncStateAction[S,A](seed: => S)(f: S => Task[(A,S)]): Observable[A] =
Observable.unsafeCreate[A] { (subscriber) =>
import subscriber.scheduler
def loop(state: S): Task[Unit] =
try f(state).flatMap { case (a,s) =>
Task.fromFuture(subscriber.onNext(a)).flatMap {
case Continue => loop(s)
case Stop => Task.unit
}
} catch {
case NonFatal(ex) =>
Task.raiseError(ex)
}
loop(seed).runAsync(Callback.empty)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment