Skip to content

Instantly share code, notes, and snippets.

val program: Env[Any, Unit] = for {
_ <- EnvCtx(println("Starting"))
_ <- EnvCtx(println("Finished"))
} yield ()
program.unsafeRunSync(())
trait Context[Ctx] {
def ctx: Context.Service[Ctx]
}
object Context {
trait Service[Ctx] {
val read: UIO[Ctx]
def set[R, E, A](value: Ctx): UIO[Unit]
def update[R, E, A](f: Ctx => Ctx): UIO[Ctx]
}
}
def init[Ctx](value: Ctx): UIO[Context[Ctx]] = {
makeService(value).map {service =>
new Context[Ctx] {
override def ctx: Service[Ctx] = service
}
}
}
def makeService[Ctx](value: Ctx): UIO[Context.Service[Ctx]] =
FiberRef.make(value).map { fiberRef =>
// helper methods
def read[Ctx]: ZIO[Context[Ctx], Nothing, Ctx] =
ZIO.accessM[Context[Ctx]](_.ctx.read)
def update[Ctx, R <: Context[Ctx], E, A](f: Ctx => Ctx): ZIO[R, Nothing, Ctx] =
ZIO.accessM[R](_.ctx.update(f))
def set[Ctx, R <: Context[Ctx]](value: Ctx): ZIO[R, Nothing, Unit] =
ZIO.accessM[R](_.ctx.set(value))
trait Context[Ctx] {
def ctx: Context.Service[Ctx]
}
object Context {
trait Service[Ctx] {
val read: UIO[Ctx]
def set[R, E, A](value: Ctx): UIO[Unit]
def update[R, E, A](f: Ctx => Ctx): UIO[Ctx]
}
object EnvCtx {
def apply[R, A](body: => A): Env[R, A] = ofZIO(ZIO(body))
def pure[R, A](x: A): Env[R, A] = ofZIO(ZIO.succeed(x))
def delay[R, A](x: A): Env[R, A] = ofZIO(ZIO.effect(x))
def context[R]: Env[R, R] = ofZIO(Context.read[R])
def updateContext[Ctx, A](f: Ctx => Ctx): Env[Ctx, Ctx] = ofZIO(Context.update(f))
def setContext[Ctx, A](value: Ctx): Env[Ctx, Unit] = ofZIO(Context.set(value))
private def ofZIO[R, A](zio: RIO[Context[R], A]): Env[R, A] = new EnvCtx(zio)
}
object EnvTest extends App {
val program: Env[ExampleEnv, Unit] = for {
_ <- EnvCtx(println("Starting"))
_ <- EnvCtx.setContext(ExampleEnv("request B"))
newCtx <- EnvCtx.context
_ <- EnvCtx(println(s"Read $newCtx"))
_ <- EnvCtx(println("Finished"))
} yield ()
program.unsafeRunSync(ExampleEnv("request A"))
implicit def concurrentEffect[Ctx](implicit runtime: Runtime[Context[Ctx]]): effect.ConcurrentEffect[Env[Ctx, ?]] =
new EnvConcurrentEffect[Ctx]
object EnvFinalTaglessTest extends App {
import com.example.functional.env.catz._
def buildProgram[F[_] : Sync]: F[Unit] = {
import cats.implicits._
for {
_ <- Sync[F].delay(println("Starting"))
_ <- Sync[F].delay(println("Finished"))
} yield ()
}