Created
February 2, 2014 21:34
-
-
Save pchiusano/8775287 to your computer and use it in GitHub Desktop.
A Par type for representing parallel computations, without blocking, developed in chapter 7 of FP in Scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fpinscala.parallelism | |
import java.util.concurrent.{Callable, CountDownLatch, ExecutorService} | |
import java.util.concurrent.atomic.AtomicReference | |
object Nonblocking { | |
trait Future[+A] { | |
private[parallelism] def apply(k: A => Unit): Unit | |
} | |
type Par[+A] = ExecutorService => Future[A] | |
object Par { | |
def run[A](es: ExecutorService)(p: Par[A]): A = { | |
val ref = new java.util.concurrent.atomic.AtomicReference[A] // A mutable, threadsafe reference, to use for storing the result | |
val latch = new CountDownLatch(1) // A latch which, when decremented, implies that `ref` has the result | |
p(es) { a => ref.set(a); latch.countDown } // Asynchronously set the result, and decrement the latch | |
latch.await // Block until the `latch.countDown` is invoked asynchronously | |
ref.get // Once we've passed the latch, we know `ref` has been set, and return its value | |
} | |
def unit[A](a: A): Par[A] = | |
es => new Future[A] { | |
def apply(cb: A => Unit): Unit = | |
cb(a) | |
} | |
/** A non-strict version of `unit` */ | |
def delay[A](a: => A): Par[A] = | |
es => new Future[A] { | |
def apply(cb: A => Unit): Unit = | |
cb(a) | |
} | |
def fork[A](a: => Par[A]): Par[A] = | |
es => new Future[A] { | |
def apply(cb: A => Unit): Unit = | |
eval(es)(a(es)(cb)) | |
} | |
/** | |
* Helper function for constructing `Par` values out of calls to non-blocking continuation-passing-style APIs. | |
* This will come in handy in Chapter 13. | |
*/ | |
def async[A](f: (A => Unit) => Unit): Par[A] = es => new Future[A] { | |
def apply(k: A => Unit) = f(k) | |
} | |
/** | |
* Helper function, for evaluating an action | |
* asynchronously, using the given `ExecutorService`. | |
*/ | |
def eval(es: ExecutorService)(r: => Unit): Unit = | |
es.submit(new Callable[Unit] { def call = r }) | |
def map2[A,B,C](p: Par[A], p2: Par[B])(f: (A,B) => C): Par[C] = | |
es => new Future[C] { | |
def apply(cb: C => Unit): Unit = { | |
var ar: Option[A] = None | |
var br: Option[B] = None | |
val combiner = Actor[Either[A,B]](es) { | |
case Left(a) => | |
if (br.isDefined) eval(es)(cb(f(a,br.get))) | |
else ar = Some(a) | |
case Right(b) => | |
if (ar.isDefined) eval(es)(cb(f(ar.get,b))) | |
else br = Some(b) | |
} | |
p(es)(a => combiner ! Left(a)) | |
p2(es)(b => combiner ! Right(b)) | |
} | |
} | |
// specialized version of `map` | |
def map[A,B](p: Par[A])(f: A => B): Par[B] = | |
es => new Future[B] { | |
def apply(cb: B => Unit): Unit = | |
p(es)(a => eval(es) { cb(f(a)) }) | |
} | |
def lazyUnit[A](a: => A): Par[A] = | |
fork(unit(a)) | |
def asyncF[A,B](f: A => B): A => Par[B] = | |
a => lazyUnit(f(a)) | |
def sequenceRight[A](as: List[Par[A]]): Par[List[A]] = | |
as match { | |
case Nil => unit(Nil) | |
case h :: t => map2(h, fork(sequence(t)))(_ :: _) | |
} | |
def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork { | |
if (as.isEmpty) unit(Vector()) | |
else if (as.length == 1) map(as.head)(a => Vector(a)) | |
else { | |
val (l,r) = as.splitAt(as.length/2) | |
map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _) | |
} | |
} | |
def sequence[A](as: List[Par[A]]): Par[List[A]] = | |
map(sequenceBalanced(as.toIndexedSeq))(_.toList) | |
def parMap[A,B](as: List[A])(f: A => B): Par[List[B]] = | |
sequence(as.map(asyncF(f))) | |
def parMap[A,B](as: IndexedSeq[A])(f: A => B): Par[IndexedSeq[B]] = | |
sequenceBalanced(as.map(asyncF(f))) | |
// exercise answers | |
/* | |
* We can implement `choice` as a new primitive. | |
* | |
* `p(es)(result => ...)` for some `ExecutorService`, `es`, and | |
* some `Par`, `p`, is the idiom for running `p`, and registering | |
* a callback to be invoked when its result is available. The | |
* result will be bound to `result` in the function passed to | |
* `p(es)`. | |
* | |
* If you find this code difficult to follow, you may want to | |
* write down the type of each subexpression and follow the types | |
* through the implementation. What is the type of `p(es)`? What | |
* about `t(es)`? What about `t(es)(cb)`? | |
*/ | |
def choice[A](p: Par[Boolean])(t: Par[A], f: Par[A]): Par[A] = | |
es => new Future[A] { | |
def apply(cb: A => Unit): Unit = | |
p(es) { b => | |
if (b) eval(es) { t(es)(cb) } | |
else eval(es) { f(es)(cb) } | |
} | |
} | |
/* The code here is very similar. */ | |
def choiceN[A](p: Par[Int])(ps: List[Par[A]]): Par[A] = | |
es => new Future[A] { | |
def apply(cb: A => Unit): Unit = | |
p(es) { ind => eval(es) { ps(ind)(es)(cb) }} | |
} | |
def choiceViaChoiceN[A](a: Par[Boolean])(ifTrue: Par[A], ifFalse: Par[A]): Par[A] = | |
choiceN(map(a)(b => if (b) 0 else 1))(List(ifTrue, ifFalse)) | |
def choiceMap[K,V](p: Par[K])(ps: Map[K,Par[V]]): Par[V] = | |
es => new Future[V] { | |
def apply(cb: V => Unit): Unit = | |
p(es)(k => ps(k)(es)(cb)) | |
} | |
/* `chooser` is usually called `flatMap` or `bind`. */ | |
def chooser[A,B](p: Par[A])(f: A => Par[B]): Par[B] = | |
flatMap(p)(f) | |
def flatMap[A,B](p: Par[A])(f: A => Par[B]): Par[B] = | |
es => new Future[B] { | |
def apply(cb: B => Unit): Unit = | |
p(es)(a => f(a)(es)(cb)) | |
} | |
def choiceViaFlatMap[A](p: Par[Boolean])(f: Par[A], t: Par[A]): Par[A] = | |
flatMap(p)(b => if (b) t else f) | |
def choiceNViaFlatMap[A](p: Par[Int])(choices: List[Par[A]]): Par[A] = | |
flatMap(p)(i => choices(i)) | |
def join[A](p: Par[Par[A]]): Par[A] = | |
es => new Future[A] { | |
def apply(cb: A => Unit): Unit = | |
p(es)(p2 => eval(es) { p2(es)(cb) }) | |
} | |
def joinViaFlatMap[A](a: Par[Par[A]]): Par[A] = | |
flatMap(a)(x => x) | |
def flatMapViaJoin[A,B](p: Par[A])(f: A => Par[B]): Par[B] = | |
join(map(p)(f)) | |
/* Gives us infix syntax for `Par`. */ | |
implicit def toParOps[A](p: Par[A]): ParOps[A] = new ParOps(p) | |
// infix versions of `map`, `map2` and `flatMap` | |
class ParOps[A](p: Par[A]) { | |
def map[B](f: A => B): Par[B] = Par.map(p)(f) | |
def map2[B,C](b: Par[B])(f: (A,B) => C): Par[C] = Par.map2(p,b)(f) | |
def flatMap[B](f: A => Par[B]): Par[B] = Par.flatMap(p)(f) | |
def zip[B](b: Par[B]): Par[(A,B)] = p.map2(b)((_,_)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment