Skip to content

Instantly share code, notes, and snippets.

@tango238
Created March 6, 2016 02:00
Show Gist options
  • Save tango238/07702bf665bf7ed0da24 to your computer and use it in GitHub Desktop.
Save tango238/07702bf665bf7ed0da24 to your computer and use it in GitHub Desktop.
parMapの実装
import java.util.concurrent._
object Par {
type Par[A] = ExecutorService => Future[A]
def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a)
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
private case class UnitFuture[A](get: A) extends Future[A] {
def isDone = true
def get(timeout: Long, units: TimeUnit) = get
def isCancelled = false
def cancel(evenIfRunning: Boolean): Boolean = false
}
def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] =
(es: ExecutorService) => {
val af = a(es)
val bf = b(es)
UnitFuture(f(af.get, bf.get))
}
def fork[A](a: => Par[A]): Par[A] =
es => es.submit(new Callable[A]{
def call = a(es).get
})
def asyncF[A,B](f: A => B): A => Par[B] = a => lazyUnit(f(a))
def map[A,B](pa: Par[A])(f: A => B): Par[B] =
map2(pa, unit(()))((a, _) => f(a))
def sequence[A](as: List[Par[A]]): Par[List[A]] =
map(sequenceBalanced(as.toIndexedSeq))(_.toList)
def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = fork {
if (as.isEmpty) unit(Vector())
else if (as.length == 1) map(as.head)(a => Vector(a))
else {
val (l,r) = as.splitAt(as.length/2)
map2(sequenceBalanced(l), sequenceBalanced(r))(_ ++ _)
}
}
def parMap[A,B](as: List[A])(f: A => B): Par[List[B]] = sequence(as.map(asyncF(f)))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment