Skip to content

Instantly share code, notes, and snippets.

@dmgcodevil
Created September 13, 2020 19:00
Show Gist options
  • Select an option

  • Save dmgcodevil/ecb63d1a90792ffb24b8354253e7b407 to your computer and use it in GitHub Desktop.

Select an option

Save dmgcodevil/ecb63d1a90792ffb24b8354253e7b407 to your computer and use it in GitHub Desktop.
Effect wrapper
package io.parapet.v2
import cats.effect.{Concurrent, ContextShift, IO, Timer}
import cats.free.Free
import cats.free.Free.liftF
import cats.{Monad, ~>}
import scala.concurrent.ExecutionContext
object Core {
case class Event(data: Any)
//------------------------------------------------------
sealed trait Op[A]
type Flow[A] = Free[Op, A]
case class Eval[A](body: () => A) extends Op[A]
case class Send(body: () => Event, p: String) extends Op[Unit]
case class Blocking[A](body: () => Flow[A]) extends Op[A]
def eval[A](thunk: => A): Flow[A] = liftF[Op, A](Eval[A](() => thunk))
def send(thunk: => Event, p: String): Flow[Unit] = liftF[Op, Unit](Send(() => thunk, p))
def blocking[A](body: => Flow[A]): Flow[A] = liftF[Op, A](Blocking(() => body))
//------------------------------------------------------
//------------------------------------------------------
sealed abstract class Effect[A, F[_]](val fa: F[A])
final case class Async[A, F[_]](override val fa: F[A]) extends Effect[A, F](fa)
final case class Sync[A, F[_]](override val fa: F[A]) extends Effect[A, F](fa)
//------------------------------------------------------
implicit def effectMonad[F[_] : Concurrent]: Monad[Effect[*, F]] = new Monad[Effect[*, F]] {
val ct = implicitly[Concurrent[F]]
override def pure[A](x: A): Effect[A, F] = Sync(ct.pure(x))
override def flatMap[A, B](fa: Effect[A, F])(f: A => Effect[B, F]): Effect[B, F] = {
// if `f` is producing Async , how to switch the context Sync -> Async?
Sync(ct.flatMap(fa.fa)(a => f(a).fa)) // ?
}
override def tailRecM[A, B](a: A)(f: A => Effect[Either[A, B], F]): Effect[B, F] = {
f(a) match {
case ef: Effect[Either[A, B], F] =>
Sync(ct.flatMap(ef.fa) {
case Left(next) => tailRecM(next)(f).fa
case Right(b) => ct.pure(b)
})
}
}
}
def opToIO[F[_] : Concurrent]: Op ~> Effect[*, F] = new (Op ~> Effect[*, F]) {
override def apply[A](fa: Op[A]): Effect[A, F] = {
val ct = implicitly[Concurrent[F]]
fa match {
case Eval(body) => Sync(ct.delay(body()))
case Send(body, p) => Sync(ct.delay(println(s"send '${body()}' to $p")))
case b: Blocking[A] => Async(ct.suspend(b.body().foldMap[Effect[*, F]](opToIO[F]).fa))
}
}
}
def main(args: Array[String]): Unit = {
lazy val ec: ExecutionContext = scala.concurrent.ExecutionContext.global
implicit lazy val contextShift: ContextShift[IO] = IO.contextShift(ec)
implicit lazy val timer: Timer[IO] = IO.timer(ec)
var x = 0
val program = for {
v <- eval {
x = x + 1
"point"
}
_ <- eval(println(v))
_ <- eval(println(x))
_ <- blocking(send(Event(v + x), "p1"))
} yield ()
val effect = program.foldMap[Effect[*, IO]](opToIO[IO])
val io = effect match {
case Sync(fa) => IO(println("sync->")).flatMap(_ => fa)
case Async(fa) => IO(println("async->")).flatMap(_ => fa)
}
io.unsafeRunSync()
}
}
@dmgcodevil
Copy link
Author

prints:

sync->
point
1
send 'Event(point1)' to p1

should be: async->

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment