Last active
November 14, 2018 21:55
-
-
Save orium/2f4d342cb8f2c8fa6a4b3a7ce8443282 to your computer and use it in GitHub Desktop.
Par actor - red book - 20181002
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch07 | |
import java.util.UUID | |
import java.util.concurrent.{Callable, CountDownLatch, ExecutorService, Executors} | |
import java.util.concurrent.atomic.AtomicReference | |
import ch07.Par.Par | |
import ch07.ParImpl.Par | |
object ParActor { | |
sealed trait Future[+A] { | |
private[ch07] def onComplete(cb: A => Unit): Unit | |
} | |
object Future { | |
private[ch07] def onComplete[A](f: (A => Unit) => Unit): Future[A] = new Future[A] { | |
override private[ch07] def onComplete(cb: A => Unit): Unit = | |
f(cb) | |
} | |
} | |
type Par[+A] = ExecutorService => Future[A] | |
object Par { | |
def unit[A](v: A): Par[A] = | |
es => Future.onComplete[A](cb => cb(v)) | |
def fork[A](par: => Par[A]): Par[A] = | |
es => Future.onComplete[A] { cb => | |
eval(es) { | |
par(es).onComplete(cb) | |
} | |
} | |
// Runs something in another thread. | |
private def eval(es: ExecutorService)(r: => Unit): Unit = | |
es.submit(new Callable[Unit] { | |
def call(): Unit = r | |
}) | |
def run[A](es: ExecutorService)(p: Par[A]): A = { | |
val ref = new AtomicReference[A] | |
val latch = new CountDownLatch(1) | |
p(es).onComplete { r => | |
ref.set(r) | |
latch.countDown() | |
} | |
latch.await() | |
ref.get() | |
} | |
def map2[A, B, C](pa: Par[A], pb: Par[B])(f: (A, B) => C): Par[C] = es => Future.onComplete[C] { cb => | |
// val mapId = UUID.randomUUID() | |
val ar: AtomicReference[Option[A]] = new AtomicReference(None) | |
val br: AtomicReference[Option[B]] = new AtomicReference(None) | |
val combiner = Actor[Either[A, B]](es) { msg => | |
// println(s"in map2 $mapId with thread ${Thread.currentThread()}") | |
msg match { | |
case Left(a) => ar.set(Some(a)) | |
case Right(b) => br.set(Some(b)) | |
} | |
(ar.get(), br.get()) match { | |
case (Some(av), Some(bv)) => | |
eval(es) { | |
cb(f(av, bv)) | |
} | |
case _ => () | |
} | |
} | |
pa(es).onComplete(v => combiner ! Left(v)) | |
pb(es).onComplete(v => combiner ! Right(v)) | |
} | |
} | |
} | |
object Main extends App { | |
import ParActor.Par | |
def `give me a number n of threads and I shall give you a Par that deadlocks with an executor service with n thread but does not in an executor service with n+1 threads!`(nThreads: Int): Par[Int] = | |
nThreads match { | |
case 0 => Par.fork(Par.unit(0)) | |
case n => Par.fork(`give me a number n of threads and I shall give you a Par that deadlocks with an executor service with n thread but does not in an executor service with n+1 threads!`(n - 1)) | |
} | |
val par = `give me a number n of threads and I shall give you a Par that deadlocks with an executor service with n thread but does not in an executor service with n+1 threads!`(3) | |
val es3 = Executors.newFixedThreadPool(3) | |
println(Par.run(es3)(par)) | |
def sum(ints: IndexedSeq[Int]): Par[Int] = | |
if (ints.length <= 1) | |
Par.unit(ints.headOption.getOrElse(0)) | |
else { | |
val (l, r) = ints.splitAt(ints.length / 2) | |
Par.map2(sum(l), sum(r))(_ + _) | |
} | |
val summedNumbers = Par.run(es3)(sum(IndexedSeq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))) | |
println(summedNumbers) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment