Skip to content

Instantly share code, notes, and snippets.

@notxcain
Created May 14, 2018 11:46
Show Gist options
  • Select an option

  • Save notxcain/3af0d2f01decdf85c1e2c262d74664e1 to your computer and use it in GitHub Desktop.

Select an option

Save notxcain/3af0d2f01decdf85c1e2c262d74664e1 to your computer and use it in GitHub Desktop.
Effect Polymorphic Push-based Observable
import cats.data.EitherT
import cats.effect.Timer
import cats.implicits._
import cats.{ApplicativeError, Foldable, Functor, Monad, MonadError}
import scala.concurrent.duration.FiniteDuration
trait Observable[F[_], A] { outer =>
def subscribe[O](observer: Observer[F, A, O]): F[O]
final def foldLeft[O](o: O)(f: (O, A) => O)(
implicit F: MonadError[F, Throwable]): F[O] =
subscribe(Observer.foldLeft(o, f))
final def mapEval[B](f: A => F[B])(implicit F: Monad[F]): Observable[F, B] =
new Observable[F, B] {
override def subscribe[O](observer: Observer[F, B, O]): F[O] =
outer.subscribe(observer.contramapEval(f))
}
final def take(n: Int)(implicit F: Monad[F]): Observable[F, A] =
new Observable[F, A] { self =>
override def subscribe[O](observer: Observer[F, A, O]): F[O] = {
outer.subscribe(observer.take(n))
}
}
final def flatMap[B](f: A => Observable[F, B])(
implicit F: Monad[F]): Observable[F, B] = new Observable[F, B] {
override def subscribe[O](observer: Observer[F, B, O]): F[O] =
outer.subscribe(new FlatMapper(f, observer))
}
final def drain(implicit F: ApplicativeError[F, Throwable]): F[Unit] =
subscribe(Observer.drain)
}
final class InnerFlatMapperObserver[F[_], A, B, O](
f: A => Observable[F, B],
observer: Observer[F, B, O])(implicit F: Monad[F])
extends Observer[F, B, Either[O, Observer[F, A, O]]] {
override def onNext(
b: B): F[Either[Either[O, Observer[F, A, O]],
Observer[F, B, Either[O, Observer[F, A, O]]]]] =
observer.onNext(b).map {
case Right(ob) =>
(new InnerFlatMapperObserver(f, ob): Observer[
F,
B,
Either[O, Observer[F, A, O]]])
.asRight[Either[O, Observer[F, A, O]]]
case Left(o) =>
o.asLeft[Observer[F, A, O]]
.asLeft[Observer[F, B, Either[O, Observer[F, A, O]]]]
}
override def onComplete: F[Either[O, Observer[F, A, O]]] =
(new FlatMapper[F, A, B, O](f, observer): Observer[F, A, O])
.asRight[O]
.pure[F]
override def onError(e: Throwable): F[Unit] = observer.onError(e)
}
class FlatMapper[F[_], A, B, O](
f: A => Observable[F, B],
outerObserver: Observer[F, B, O])(implicit F: Monad[F])
extends Observer[F, A, O] {
override def onNext(a: A): F[Either[O, Observer[F, A, O]]] =
f(a).subscribe(new InnerFlatMapperObserver(f, outerObserver))
override def onComplete: F[O] = outerObserver.onComplete
override def onError(e: Throwable): F[Unit] = outerObserver.onError(e)
}
/*
import io.evotor.webhooks.common._
import cats.implicits._
import cats.effect.IO
Observable[IO].of(List(1, 2, 3, 4))
*/
object Observable {
final class Builder[F[_]] {
def of[C[_]: Foldable, A](ca: C[A])(
implicit F: Monad[F]): Observable[F, A] =
new FoldableObservable[F, C, A](ca)
def periodic(period: FiniteDuration)(implicit timer: Timer[F],
F: Monad[F]): Observable[F, Unit] =
new PeriodicObservable[F](period)
}
def apply[F[_]]: Builder[F] = new Builder[F]
}
final class FoldableObservable[F[_]: Monad, C[_]: Foldable, A](ca: C[A])
extends Observable[F, A] {
override def subscribe[B](observer: Observer[F, A, B]): F[B] =
ca.foldM[EitherT[F, B, ?], Observer[F, A, B]](observer) { (acc, i) =>
EitherT(acc.onNext(i))
}
.semiflatMap(_.onComplete)
.merge
}
final class PeriodicObservable[F[_]: Monad](period: FiniteDuration)(
implicit timer: Timer[F])
extends Observable[F, Unit] {
override def subscribe[O](observer: Observer[F, Unit, O]): F[O] =
timer.sleep(period).flatMap { x =>
observer.onNext(x).flatMap {
case Right(nextObserver) =>
subscribe(nextObserver)
case Left(o) =>
o.pure[F]
}
}
}
trait Observer[F[_], A, O] { outer =>
def onNext(a: A): F[Either[O, Observer[F, A, O]]]
def onComplete: F[O]
def onError(e: Throwable): F[Unit]
final def contramap[B](f: B => A)(implicit F: Functor[F]): Observer[F, B, O] =
new Observer[F, B, O] {
override def onNext(b: B): F[Either[O, Observer[F, B, O]]] =
outer.onNext(f(b)).map {
case Right(ob) => Right(ob.contramap(f))
case Left(o) => Left(o)
}
override def onComplete: F[O] = outer.onComplete
override def onError(e: Throwable): F[Unit] = outer.onError(e)
}
final def contramapEval[B](f: B => F[A])(
implicit F: Monad[F]): Observer[F, B, O] =
new Observer[F, B, O] {
override def onNext(a: B): F[Either[O, Observer[F, B, O]]] =
f(a).flatMap(outer.onNext).map {
case Right(observer) =>
Right(observer.contramapEval(f))
case Left(o) => Left(o)
}
override def onComplete: F[O] = outer.onComplete
override def onError(e: Throwable): F[Unit] = outer.onError(e)
}
final def take(n: Int)(implicit F: Functor[F]): Observer[F, A, O] =
new Observer[F, A, O] {
override def onNext(a: A): F[Either[O, Observer[F, A, O]]] =
if (n == 0) {
outer.onComplete.map(_.asLeft)
} else {
outer
.onNext(a)
.map {
case Right(cont) => cont.take(n - 1).asRight
case Left(o) => o.asLeft
}
}
override def onComplete: F[O] = outer.onComplete
override def onError(e: Throwable): F[Unit] = outer.onError(e)
}
}
object Observer {
def drain[F[_], A](
implicit F: ApplicativeError[F, Throwable]): Observer[F, A, Unit] =
new Observer[F, A, Unit] {
private val onNext = (this: Observer[F, A, Unit]).asRight[Unit].pure[F]
override def onNext(a: A): F[Either[Unit, Observer[F, A, Unit]]] =
onNext
override def onComplete: F[Unit] = ().pure[F]
override def onError(e: Throwable): F[Unit] = F.raiseError(e)
}
def foldLeft[F[_]: MonadError[?[_], Throwable], A, O](
zero: O,
f: (O, A) => O): Observer[F, A, O] =
new Observer[F, A, O] {
override def onNext(a: A): F[Either[O, Observer[F, A, O]]] =
Observer.foldLeft[F, A, O](f(zero, a), f).asRight[O].pure[F]
override def onComplete: F[O] = zero.pure[F]
override def onError(e: Throwable): F[Unit] = e.raiseError[F, Unit]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment