Skip to content

Instantly share code, notes, and snippets.

@ShahOdin
Created November 11, 2019 12:25
Show Gist options
  • Save ShahOdin/4944bdbcfbd4e40cae1348d1e76e1ebc to your computer and use it in GitHub Desktop.
Save ShahOdin/4944bdbcfbd4e40cae1348d1e76e1ebc to your computer and use it in GitHub Desktop.
playing around with cats-effect
import cats.effect.{IO, IOApp}
object Demo extends IOApp {
import cats.effect._
import cats.effect.concurrent._
import cats.syntax.functor._
import cats.syntax.parallel._
import cats.instances.list._
import cats.syntax.flatMap._
val DemoNumbers: List[Int] = (0 until 100).toList
def publishAndSum(numbers: List[Int]): IO[Unit] = {
final class MLock(mvar: MVar[IO, Unit]) {
private def acquire: IO[Unit] =
mvar.take
private def release: IO[Unit] =
mvar.put(())
def greenLight[A](fa: IO[A]): IO[A] =
acquire.bracket(_ => fa)(_ => release)
}
object MLock {
def apply(): IO[MLock] =
MVar[IO].of(()).map(ref => new MLock(ref))
}
def publish(i: Int, lock: IO[MLock]): IO[Unit] = for {
l <- lock
_ <- l.greenLight(IO.pure(println(s"published $i")))
} yield ()
val lock = MLock()
numbers.parTraverse_(publish(_, lock))
}
def sumOfValues(numbers: List[Int]): IO[Unit] = {
def sum(state: MVar[IO, Int], list: List[Int]): IO[Int] = list match {
case Nil => state.take
case x :: xs =>
for {
current <- state.take
_ <- state.put(current + x)
_ <- IO.pure(println(s"$current + $x = ${current+x}"))
result <- sum(state, xs)
} yield result
}
MVar.of[IO, Int](0).flatMap(sum(_, numbers)).map(_ => ())
}
def channels(numbers: List[Int]): IO[Unit] = {
type Channel[A] = MVar[IO, Option[A]]
def producer(ch: Channel[Int], list: List[Int]): IO[Unit] = list match {
case Nil =>
ch.put(None).flatTap(_ => IO.pure(println("we are done!“)))
case head :: tail =>
for {
_ <- IO.pure(println("About to put one!"))
_ <- ch.put(Some(head))
_ <- IO.pure(println("Just put one. gonna produce next!"))
_ <- producer(ch, tail)
} yield ()
}
def consumer(ch: Channel[Int], sum: Long): IO[Long] = ch.take.flatMap {
case Some(x) =>
// next please
println("consume next!")
consumer(ch, sum + x)
case None =>
println("no more to consume!")
IO.pure(sum) // we are done!
}
for {
channel <- MVar[IO].empty[Option[Int]]
producerTask = producer(channel, numbers)
consumerTask = consumer(channel, 0L)
fp <- producerTask.start
fc <- consumerTask.start
_ <- fp.join
sum <- fc.join
} yield sum
}
override def run(args: List[String]): IO[ExitCode] = channels(DemoNumbers).as(ExitCode.Success)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment