Created
November 11, 2019 12:25
-
-
Save ShahOdin/4944bdbcfbd4e40cae1348d1e76e1ebc to your computer and use it in GitHub Desktop.
playing around with cats-effect
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.{IO, IOApp} | |
object Demo extends IOApp { | |
import cats.effect._ | |
import cats.effect.concurrent._ | |
import cats.syntax.functor._ | |
import cats.syntax.parallel._ | |
import cats.instances.list._ | |
import cats.syntax.flatMap._ | |
val DemoNumbers: List[Int] = (0 until 100).toList | |
def publishAndSum(numbers: List[Int]): IO[Unit] = { | |
final class MLock(mvar: MVar[IO, Unit]) { | |
private def acquire: IO[Unit] = | |
mvar.take | |
private def release: IO[Unit] = | |
mvar.put(()) | |
def greenLight[A](fa: IO[A]): IO[A] = | |
acquire.bracket(_ => fa)(_ => release) | |
} | |
object MLock { | |
def apply(): IO[MLock] = | |
MVar[IO].of(()).map(ref => new MLock(ref)) | |
} | |
def publish(i: Int, lock: IO[MLock]): IO[Unit] = for { | |
l <- lock | |
_ <- l.greenLight(IO.pure(println(s"published $i"))) | |
} yield () | |
val lock = MLock() | |
numbers.parTraverse_(publish(_, lock)) | |
} | |
def sumOfValues(numbers: List[Int]): IO[Unit] = { | |
def sum(state: MVar[IO, Int], list: List[Int]): IO[Int] = list match { | |
case Nil => state.take | |
case x :: xs => | |
for { | |
current <- state.take | |
_ <- state.put(current + x) | |
_ <- IO.pure(println(s"$current + $x = ${current+x}")) | |
result <- sum(state, xs) | |
} yield result | |
} | |
MVar.of[IO, Int](0).flatMap(sum(_, numbers)).map(_ => ()) | |
} | |
def channels(numbers: List[Int]): IO[Unit] = { | |
type Channel[A] = MVar[IO, Option[A]] | |
def producer(ch: Channel[Int], list: List[Int]): IO[Unit] = list match { | |
case Nil => | |
ch.put(None).flatTap(_ => IO.pure(println("we are done!“))) | |
case head :: tail => | |
for { | |
_ <- IO.pure(println("About to put one!")) | |
_ <- ch.put(Some(head)) | |
_ <- IO.pure(println("Just put one. gonna produce next!")) | |
_ <- producer(ch, tail) | |
} yield () | |
} | |
def consumer(ch: Channel[Int], sum: Long): IO[Long] = ch.take.flatMap { | |
case Some(x) => | |
// next please | |
println("consume next!") | |
consumer(ch, sum + x) | |
case None => | |
println("no more to consume!") | |
IO.pure(sum) // we are done! | |
} | |
for { | |
channel <- MVar[IO].empty[Option[Int]] | |
producerTask = producer(channel, numbers) | |
consumerTask = consumer(channel, 0L) | |
fp <- producerTask.start | |
fc <- consumerTask.start | |
_ <- fp.join | |
sum <- fc.join | |
} yield sum | |
} | |
override def run(args: List[String]): IO[ExitCode] = channels(DemoNumbers).as(ExitCode.Success) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment