Skip to content

Instantly share code, notes, and snippets.

@Fristi
Last active August 31, 2016 22:07
Show Gist options
  • Save Fristi/032e206fe473336f929c to your computer and use it in GitHub Desktop.
Save Fristi/032e206fe473336f929c to your computer and use it in GitHub Desktop.
Monad/Applicative implementation for Akka-stream's Source
package nl.mdj.fpinscala
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.pattern.after
import akka.stream.{ActorFlowMaterializer, FlattenStrategy}
import akka.stream.scaladsl.{Source => S, Flow}
import akka.stream.scaladsl.FlowGraph.Implicits._
import scalaz._
import scalaz.Scalaz._
object SourceMonad extends App {
implicit val sys = ActorSystem()
implicit val ec = sys.dispatcher
implicit val materializer = ActorFlowMaterializer()
implicit val monad = new Monad[({type f[x] = S[x, Unit]})#f] {
override def point[A](a: => A) = S(List(a))
override def bind[A, B](fa: S[A, Unit])(f: (A) => S[B, Unit]): S[B, Unit] = {
fa.map(f).flatten(FlattenStrategy.concat)
}
override def apply2[A, B, C](fa: => S[A, Unit], fb: => S[B, Unit])(f: (A, B) => C): S[C, Unit] = {
S() { implicit b =>
val zip = b.add(akka.stream.scaladsl.Zip[A, B]())
fa ~> zip.in0
fb ~> zip.in1
zip.out
}.map { case (a,b) => f(a, b)}
}
}
val test = for {
multiplied <- S(List(1,2,3)).map(_ * 2)
plusOne <- monad.point(multiplied).map(_ + 1)
} yield plusOne
val app = for {
result <- (S(1 to 1000) |@| S(1 to 1000) |@| S(1 to 1000))(_ * _ * _)
plusOne <- monad.point(result).mapAsync(x => after(1.second, sys.scheduler)(Future.successful(x))).map(_ + 1)
} yield plusOne
app.runForeach(println)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment