Created
June 24, 2018 02:23
-
-
Save changlinli/0594a96d69556cb7844563de1992f46c to your computer and use it in GitHub Desktop.
Example of ReaderT with fs2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Example { | |
import fs2._ | |
import fs2.async.mutable._ | |
import cats._ | |
import cats.data._ | |
import cats.implicits._ | |
/** | |
* This exists only because cats-effect keeps the trait expressing the | |
* Concurrent instance of Kleisli package private, otherwise we could just | |
* directly extend that trait instead of needing to reimplement each of | |
* the methods of our super classes | |
*/ | |
def readerTEffectInstance[G[_]: Effect, A]( | |
state: A | |
): Effect[({ type L[X] = Kleisli[G, A, X] })#L] = | |
new Effect[({ type L[X] = Kleisli[G, A, X] })#L] { | |
override def runAsync[A](fa: Kleisli[G, A, A])( | |
cb: Either[Throwable, A] => IO[Unit]): IO[Unit] = | |
Effect[G].runAsync(fa.run(state))(cb) | |
override def async[A](k: (Either[Throwable, A] => Unit) => Unit): Kleisli[G, A, A] = | |
Concurrent.catsKleisliConcurrent[G, A].async(k) | |
override def suspend[A](thunk: => Kleisli[G, A, A]): Kleisli[G, A, A] = | |
Concurrent.catsKleisliConcurrent[G, A].suspend(thunk) | |
override def flatMap[A, B](fa: Kleisli[G, A, A])(f: A => Kleisli[G, A, B]): Kleisli[G, A, B] = | |
Concurrent.catsKleisliConcurrent[G, A].flatMap(fa)(f) | |
override def tailRecM[A, B](a: A)(f: A => Kleisli[G, A, Either[A, B]]): Kleisli[G, A, B] = | |
Concurrent.catsKleisliConcurrent[G, A].tailRecM(a)(f) | |
override def raiseError[A](e: Throwable): Kleisli[G, A, A] = | |
Concurrent.catsKleisliConcurrent[G, A].raiseError(e) | |
override def handleErrorWith[A](fa: Kleisli[G, A, A])( | |
f: Throwable => Kleisli[G, A, A]): Kleisli[G, A, A] = | |
Concurrent.catsKleisliConcurrent[G, A].handleErrorWith(fa)(f) | |
override def pure[A](x: A): Kleisli[G, A, A] = | |
Concurrent.catsKleisliConcurrent[G, A].pure(x) | |
} | |
type RefIO[State, A] = ReaderT[IO, Ref[IO, State], A] | |
/** | |
* This is unsafe because it only works if you're sure nothing else is | |
* concurrently running against the same Ref. Otherwise between when we | |
* retrieve apply the state of our ref to our ReaderT computation, and | |
* then examine the state again to put it into our StateT, the state of | |
* the ref might have been mutated by something else. | |
* | |
* Note as well that unsafeToStateT is NOT the inverse of fromStateT. The | |
* latter is completely safe even in the face of concurrency. This creates | |
* a new Ref which will override the ref in the reader generated by fromStateT. | |
*/ | |
def unsafeToStateT[State, A](value: RefIO[State, A]): StateT[IO, State, A] = | |
StateT[IO, State, A] { state => | |
for { | |
ref <- Ref[IO, State](state) | |
result <- value.run(ref) | |
currentState <- ref.get | |
} yield (currentState, result) | |
} | |
def fromStateT[State: Monoid, A](stateT: StateT[IO, State, A]): RefIO[State, A] = | |
Kleisli[IO, Ref[IO, State], A] { ref => | |
for { | |
currentState <- ref.get | |
stateAndValue <- stateT.run(currentState) | |
(newState, value) = stateAndValue | |
_ <- ref.setSync(newState) | |
} yield value | |
} | |
def stream(label: String): Stream[StateIO, Unit] = | |
Stream.eval(Effect[StateIO].pure(())).evalMap[Unit] { _ => | |
for { | |
_ <- StateT.modify[IO, List[String]]("b" :: _) | |
_ <- StateT | |
.inspect[IO, List[String], String](_.toString) | |
.map(list => println(s"$label: $list")) | |
} yield () | |
} | |
type StateIO[A] = StateT[IO, List[String], A] | |
type RefIOStr[A] = ReaderT[IO, Ref[IO, List[String]], A] | |
val stateIOToRefIOStr: StateIO ~> RefIOStr = new (StateIO ~> RefIOStr) { | |
override def apply[A](fa: StateIO[A]): RefIOStr[A] = fromStateT(fa) | |
} | |
val exampleIO = { | |
val initialStateAction = Ref[IO, List[String]](List.empty) | |
initialStateAction.flatMap { initialState => | |
implicit val effectInstance = readerTEffectInstance[IO, Ref[IO, List[String]]](initialState) | |
val io = for { | |
currentState <- Kleisli.ask[IO, Ref[IO, List[String]]] | |
_ <- Kleisli.liftF(currentState.modify("a" :: _)) | |
interrupt <- Signal[RefIOStr, Boolean](false) | |
_ <- stream("with interrupt") | |
.translate(stateIOToRefIOStr) | |
.interruptWhen(interrupt) | |
.compile | |
.drain | |
// Note that this stream will continue adding to the same state as the | |
// with interrupt state stream | |
_ <- stream("without interrupt") | |
.translate(stateIOToRefIOStr) | |
.compile | |
.drain | |
} yield () | |
io.run(initialState) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment