Created
May 14, 2018 11:46
-
-
Save notxcain/3af0d2f01decdf85c1e2c262d74664e1 to your computer and use it in GitHub Desktop.
Effect Polymorphic Push-based Observable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import cats.data.EitherT | |
| import cats.effect.Timer | |
| import cats.implicits._ | |
| import cats.{ApplicativeError, Foldable, Functor, Monad, MonadError} | |
| import scala.concurrent.duration.FiniteDuration | |
| trait Observable[F[_], A] { outer => | |
| def subscribe[O](observer: Observer[F, A, O]): F[O] | |
| final def foldLeft[O](o: O)(f: (O, A) => O)( | |
| implicit F: MonadError[F, Throwable]): F[O] = | |
| subscribe(Observer.foldLeft(o, f)) | |
| final def mapEval[B](f: A => F[B])(implicit F: Monad[F]): Observable[F, B] = | |
| new Observable[F, B] { | |
| override def subscribe[O](observer: Observer[F, B, O]): F[O] = | |
| outer.subscribe(observer.contramapEval(f)) | |
| } | |
| final def take(n: Int)(implicit F: Monad[F]): Observable[F, A] = | |
| new Observable[F, A] { self => | |
| override def subscribe[O](observer: Observer[F, A, O]): F[O] = { | |
| outer.subscribe(observer.take(n)) | |
| } | |
| } | |
| final def flatMap[B](f: A => Observable[F, B])( | |
| implicit F: Monad[F]): Observable[F, B] = new Observable[F, B] { | |
| override def subscribe[O](observer: Observer[F, B, O]): F[O] = | |
| outer.subscribe(new FlatMapper(f, observer)) | |
| } | |
| final def drain(implicit F: ApplicativeError[F, Throwable]): F[Unit] = | |
| subscribe(Observer.drain) | |
| } | |
| final class InnerFlatMapperObserver[F[_], A, B, O]( | |
| f: A => Observable[F, B], | |
| observer: Observer[F, B, O])(implicit F: Monad[F]) | |
| extends Observer[F, B, Either[O, Observer[F, A, O]]] { | |
| override def onNext( | |
| b: B): F[Either[Either[O, Observer[F, A, O]], | |
| Observer[F, B, Either[O, Observer[F, A, O]]]]] = | |
| observer.onNext(b).map { | |
| case Right(ob) => | |
| (new InnerFlatMapperObserver(f, ob): Observer[ | |
| F, | |
| B, | |
| Either[O, Observer[F, A, O]]]) | |
| .asRight[Either[O, Observer[F, A, O]]] | |
| case Left(o) => | |
| o.asLeft[Observer[F, A, O]] | |
| .asLeft[Observer[F, B, Either[O, Observer[F, A, O]]]] | |
| } | |
| override def onComplete: F[Either[O, Observer[F, A, O]]] = | |
| (new FlatMapper[F, A, B, O](f, observer): Observer[F, A, O]) | |
| .asRight[O] | |
| .pure[F] | |
| override def onError(e: Throwable): F[Unit] = observer.onError(e) | |
| } | |
| class FlatMapper[F[_], A, B, O]( | |
| f: A => Observable[F, B], | |
| outerObserver: Observer[F, B, O])(implicit F: Monad[F]) | |
| extends Observer[F, A, O] { | |
| override def onNext(a: A): F[Either[O, Observer[F, A, O]]] = | |
| f(a).subscribe(new InnerFlatMapperObserver(f, outerObserver)) | |
| override def onComplete: F[O] = outerObserver.onComplete | |
| override def onError(e: Throwable): F[Unit] = outerObserver.onError(e) | |
| } | |
| /* | |
| import io.evotor.webhooks.common._ | |
| import cats.implicits._ | |
| import cats.effect.IO | |
| Observable[IO].of(List(1, 2, 3, 4)) | |
| */ | |
| object Observable { | |
| final class Builder[F[_]] { | |
| def of[C[_]: Foldable, A](ca: C[A])( | |
| implicit F: Monad[F]): Observable[F, A] = | |
| new FoldableObservable[F, C, A](ca) | |
| def periodic(period: FiniteDuration)(implicit timer: Timer[F], | |
| F: Monad[F]): Observable[F, Unit] = | |
| new PeriodicObservable[F](period) | |
| } | |
| def apply[F[_]]: Builder[F] = new Builder[F] | |
| } | |
| final class FoldableObservable[F[_]: Monad, C[_]: Foldable, A](ca: C[A]) | |
| extends Observable[F, A] { | |
| override def subscribe[B](observer: Observer[F, A, B]): F[B] = | |
| ca.foldM[EitherT[F, B, ?], Observer[F, A, B]](observer) { (acc, i) => | |
| EitherT(acc.onNext(i)) | |
| } | |
| .semiflatMap(_.onComplete) | |
| .merge | |
| } | |
| final class PeriodicObservable[F[_]: Monad](period: FiniteDuration)( | |
| implicit timer: Timer[F]) | |
| extends Observable[F, Unit] { | |
| override def subscribe[O](observer: Observer[F, Unit, O]): F[O] = | |
| timer.sleep(period).flatMap { x => | |
| observer.onNext(x).flatMap { | |
| case Right(nextObserver) => | |
| subscribe(nextObserver) | |
| case Left(o) => | |
| o.pure[F] | |
| } | |
| } | |
| } | |
| trait Observer[F[_], A, O] { outer => | |
| def onNext(a: A): F[Either[O, Observer[F, A, O]]] | |
| def onComplete: F[O] | |
| def onError(e: Throwable): F[Unit] | |
| final def contramap[B](f: B => A)(implicit F: Functor[F]): Observer[F, B, O] = | |
| new Observer[F, B, O] { | |
| override def onNext(b: B): F[Either[O, Observer[F, B, O]]] = | |
| outer.onNext(f(b)).map { | |
| case Right(ob) => Right(ob.contramap(f)) | |
| case Left(o) => Left(o) | |
| } | |
| override def onComplete: F[O] = outer.onComplete | |
| override def onError(e: Throwable): F[Unit] = outer.onError(e) | |
| } | |
| final def contramapEval[B](f: B => F[A])( | |
| implicit F: Monad[F]): Observer[F, B, O] = | |
| new Observer[F, B, O] { | |
| override def onNext(a: B): F[Either[O, Observer[F, B, O]]] = | |
| f(a).flatMap(outer.onNext).map { | |
| case Right(observer) => | |
| Right(observer.contramapEval(f)) | |
| case Left(o) => Left(o) | |
| } | |
| override def onComplete: F[O] = outer.onComplete | |
| override def onError(e: Throwable): F[Unit] = outer.onError(e) | |
| } | |
| final def take(n: Int)(implicit F: Functor[F]): Observer[F, A, O] = | |
| new Observer[F, A, O] { | |
| override def onNext(a: A): F[Either[O, Observer[F, A, O]]] = | |
| if (n == 0) { | |
| outer.onComplete.map(_.asLeft) | |
| } else { | |
| outer | |
| .onNext(a) | |
| .map { | |
| case Right(cont) => cont.take(n - 1).asRight | |
| case Left(o) => o.asLeft | |
| } | |
| } | |
| override def onComplete: F[O] = outer.onComplete | |
| override def onError(e: Throwable): F[Unit] = outer.onError(e) | |
| } | |
| } | |
| object Observer { | |
| def drain[F[_], A]( | |
| implicit F: ApplicativeError[F, Throwable]): Observer[F, A, Unit] = | |
| new Observer[F, A, Unit] { | |
| private val onNext = (this: Observer[F, A, Unit]).asRight[Unit].pure[F] | |
| override def onNext(a: A): F[Either[Unit, Observer[F, A, Unit]]] = | |
| onNext | |
| override def onComplete: F[Unit] = ().pure[F] | |
| override def onError(e: Throwable): F[Unit] = F.raiseError(e) | |
| } | |
| def foldLeft[F[_]: MonadError[?[_], Throwable], A, O]( | |
| zero: O, | |
| f: (O, A) => O): Observer[F, A, O] = | |
| new Observer[F, A, O] { | |
| override def onNext(a: A): F[Either[O, Observer[F, A, O]]] = | |
| Observer.foldLeft[F, A, O](f(zero, a), f).asRight[O].pure[F] | |
| override def onComplete: F[O] = zero.pure[F] | |
| override def onError(e: Throwable): F[Unit] = e.raiseError[F, Unit] | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment