Created
September 13, 2020 19:00
-
-
Save dmgcodevil/ecb63d1a90792ffb24b8354253e7b407 to your computer and use it in GitHub Desktop.
Effect wrapper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package io.parapet.v2 | |
| import cats.effect.{Concurrent, ContextShift, IO, Timer} | |
| import cats.free.Free | |
| import cats.free.Free.liftF | |
| import cats.{Monad, ~>} | |
| import scala.concurrent.ExecutionContext | |
| object Core { | |
| case class Event(data: Any) | |
| //------------------------------------------------------ | |
| sealed trait Op[A] | |
| type Flow[A] = Free[Op, A] | |
| case class Eval[A](body: () => A) extends Op[A] | |
| case class Send(body: () => Event, p: String) extends Op[Unit] | |
| case class Blocking[A](body: () => Flow[A]) extends Op[A] | |
| def eval[A](thunk: => A): Flow[A] = liftF[Op, A](Eval[A](() => thunk)) | |
| def send(thunk: => Event, p: String): Flow[Unit] = liftF[Op, Unit](Send(() => thunk, p)) | |
| def blocking[A](body: => Flow[A]): Flow[A] = liftF[Op, A](Blocking(() => body)) | |
| //------------------------------------------------------ | |
| //------------------------------------------------------ | |
| sealed abstract class Effect[A, F[_]](val fa: F[A]) | |
| final case class Async[A, F[_]](override val fa: F[A]) extends Effect[A, F](fa) | |
| final case class Sync[A, F[_]](override val fa: F[A]) extends Effect[A, F](fa) | |
| //------------------------------------------------------ | |
| implicit def effectMonad[F[_] : Concurrent]: Monad[Effect[*, F]] = new Monad[Effect[*, F]] { | |
| val ct = implicitly[Concurrent[F]] | |
| override def pure[A](x: A): Effect[A, F] = Sync(ct.pure(x)) | |
| override def flatMap[A, B](fa: Effect[A, F])(f: A => Effect[B, F]): Effect[B, F] = { | |
| // if `f` is producing Async , how to switch the context Sync -> Async? | |
| Sync(ct.flatMap(fa.fa)(a => f(a).fa)) // ? | |
| } | |
| override def tailRecM[A, B](a: A)(f: A => Effect[Either[A, B], F]): Effect[B, F] = { | |
| f(a) match { | |
| case ef: Effect[Either[A, B], F] => | |
| Sync(ct.flatMap(ef.fa) { | |
| case Left(next) => tailRecM(next)(f).fa | |
| case Right(b) => ct.pure(b) | |
| }) | |
| } | |
| } | |
| } | |
| def opToIO[F[_] : Concurrent]: Op ~> Effect[*, F] = new (Op ~> Effect[*, F]) { | |
| override def apply[A](fa: Op[A]): Effect[A, F] = { | |
| val ct = implicitly[Concurrent[F]] | |
| fa match { | |
| case Eval(body) => Sync(ct.delay(body())) | |
| case Send(body, p) => Sync(ct.delay(println(s"send '${body()}' to $p"))) | |
| case b: Blocking[A] => Async(ct.suspend(b.body().foldMap[Effect[*, F]](opToIO[F]).fa)) | |
| } | |
| } | |
| } | |
| def main(args: Array[String]): Unit = { | |
| lazy val ec: ExecutionContext = scala.concurrent.ExecutionContext.global | |
| implicit lazy val contextShift: ContextShift[IO] = IO.contextShift(ec) | |
| implicit lazy val timer: Timer[IO] = IO.timer(ec) | |
| var x = 0 | |
| val program = for { | |
| v <- eval { | |
| x = x + 1 | |
| "point" | |
| } | |
| _ <- eval(println(v)) | |
| _ <- eval(println(x)) | |
| _ <- blocking(send(Event(v + x), "p1")) | |
| } yield () | |
| val effect = program.foldMap[Effect[*, IO]](opToIO[IO]) | |
| val io = effect match { | |
| case Sync(fa) => IO(println("sync->")).flatMap(_ => fa) | |
| case Async(fa) => IO(println("async->")).flatMap(_ => fa) | |
| } | |
| io.unsafeRunSync() | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
prints:
should be:
async->