Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Created December 20, 2016 21:27
Show Gist options
  • Select an option

  • Save calvinlfer/cba627d3dca769d760710ff8a2f98b15 to your computer and use it in GitHub Desktop.

Select an option

Save calvinlfer/cba627d3dca769d760710ff8a2f98b15 to your computer and use it in GitHub Desktop.
Monadic parallel foldMap
import cats.syntax.applicative._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monoid._
import cats.{Applicative, Id, Monad, Monoid, ~>}
import scala.concurrent._
import scala.language.higherKinds
implicit val ec = scala.concurrent.ExecutionContext.Implicits.global
def foldMapM[A, M[_] : Monad, B: Monoid](iter: Iterable[A])(f: A => M[B] = (a: A) => a.pure[Id]): M[B] =
iter.foldLeft(Monoid[B].empty.pure[M]) {
(acc: M[B], next: A) =>
for {
b1 <- acc
b2 <- f(next)
} yield b1 |+| b2
}
def foldMapPM[A, B: Monoid, F[_] : Monad, G[_] : Applicative](values: Iterable[A])(f: A => F[B])(nat: F ~> G)
(implicit ec: ExecutionContext): Future[G[B]] = {
val numCores = Runtime.getRuntime.availableProcessors
val chunkSize = (1.0 * values.size / numCores).ceil.toInt
val chunks = values.grouped(chunkSize)
val results: Iterator[Future[F[B]]] = chunks.map(eachIter => Future {
foldMapM(eachIter)(f)
})
val futureIter: Future[Iterator[F[B]]] = Future.sequence(results)
val resFutureFB = futureIter.map(iter => iter.foldLeft(Monoid[B].empty.pure[F])((Fb1, Fb2) =>
for {
b1 <- Fb1
b2 <- Fb2
} yield b1 |+| b2
))
resFutureFB.map(fb => nat(fb))
}
type Error = String
type ErrorOr[A] = Either[Error, A]
import cats.instances.int._
foldMapPM(List("1", "2"))(each => stringToErrorOrInt(each))(new (ErrorOr ~> ErrorOr) {
override def apply[A](fa: ErrorOr[A]): ErrorOr[A] = fa
})
// Usage
type Error = String
type ErrorOr[A] = Either[Error, A]
import scala.util.Try
import cats.instances.int._
import cats.instances.either._
def stringToErrorOrInt(input: String): ErrorOr[Int] =
Try(input.toInt).toEither.left.map(throwable => throwable.getMessage)
val result = foldMapPM(List("1", "2"))(each => stringToErrorOrInt(each))(new (ErrorOr ~> ErrorOr) {
override def apply[A](fa: ErrorOr[A]): ErrorOr[A] = fa
})
import duration._
Await.result(result, 10 seconds)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment