Created
January 28, 2014 11:45
-
-
Save oxbowlakes/8666295 to your computer and use it in GitHub Desktop.
Construct a scala future on top on a standard Java Executor service
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fewchaz | |
import scala.util.{Failure, Success, Try} | |
import scala.concurrent.{CanAwait, ExecutionContext} | |
import java.util.concurrent._ | |
import scala.util.control.NonFatal | |
trait Fewcha[T] extends scala.concurrent.Future[T] { | |
def futr: Future[T] | |
private[this] final def toTry: Try[T] = try util.Success(futr.get()) catch { case NonFatal(t) => util.Failure(t)} | |
private final val ls = java.util.Collections.newSetFromMap(new ConcurrentHashMap[FutureListener[T], java.lang.Boolean]()) | |
private[test] final def addListener(l: FutureListener[T]) = synchronized { | |
if (futr.isDone) | |
l.onComplete(toTry) | |
ls.add(l) | |
} | |
private[test] final def removeListener(l: FutureListener[T]) = ls.remove(l) | |
private[test] final def completed(f: Future[T]) = synchronized { | |
val t = toTry | |
val itr = ls.iterator() | |
while (itr.hasNext) { | |
val l = itr.next() | |
l.asInstanceOf[FutureListener[T]].onComplete(t) | |
} | |
} | |
def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit = { | |
val l: FutureListener[T] = new FutureListener[T] { | |
def onComplete(t: Try[T]) = try executor.execute(new Runnable { def run() { f(t) } }) finally removeListener(this.asInstanceOf[FutureListener[T]]) | |
} | |
addListener(l.asInstanceOf[FutureListener[T]]) | |
} | |
def isCompleted = futr.isDone | |
def value: Option[Try[T]] = if (futr.isDone) Some(try util.Success(futr.get()) catch { case NonFatal(t) => util.Failure(t)}) else None | |
import concurrent.duration._ | |
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = { | |
val cdl = new CountDownLatch(1) | |
val l: FutureListener[T] = new FutureListener[T] { | |
def onComplete(t: Try[T]) = cdl.countDown() | |
} | |
try { | |
if (!cdl.await(atMost.toNanos, NANOSECONDS)) | |
throw new TimeoutException | |
this | |
} | |
finally removeListener(l.asInstanceOf[FutureListener[T]]) | |
} | |
def result(atMost: Duration)(implicit permit: CanAwait): T = { | |
ready(atMost) | |
value.get.get | |
} | |
} | |
trait FutureListener[T] { | |
def onComplete(f: Try[T]): Unit | |
} | |
class Fewchas(val es: ExecutorService) { | |
private val cs: CompletionService[Any] = new ExecutorCompletionService[Any](es) | |
private val ls = new java.util.HashMap[Future[Any], Fewcha[Any]]() | |
new Thread { | |
override def run() { | |
try { | |
while (!isInterrupted) { | |
val f = cs.take() | |
val f2 = ls.synchronized(ls.remove(f)) | |
f2.completed(f) | |
} | |
} | |
catch { | |
case _: InterruptedException => /* exit loop */ | |
} | |
} | |
}.start() | |
def apply[T](t: => T) = submit(t) | |
def submit[T](t: => T): scala.concurrent.Future[T] = submit(new Callable[T] { def call() = t}) | |
def submit[T](c: Callable[T]): scala.concurrent.Future[T] = { | |
ls.synchronized { | |
val f = cs.submit(c.asInstanceOf[Callable[Any]]) | |
val f2 = new Fewcha[T] { val futr = f.asInstanceOf[Future[T]] } | |
ls.put(f, f2.asInstanceOf[Fewcha[Any]]) | |
f2 | |
} | |
} | |
def submit(r: Runnable): scala.concurrent.Future[Unit] = { | |
submit(new Callable[Unit] { | |
def call() {r.run()} | |
}) | |
} | |
} | |
object Fewchas { | |
implicit val global = new Fewchas(Executors.newSingleThreadExecutor()) | |
def apply(es: ExecutorService) = new Fewchas(es) | |
} | |
object PollingFewchas { | |
private final val m = new java.util.HashMap[Future[Any], Fewcha[Any]] | |
new Thread { | |
override def run() { | |
while (!isInterrupted) { | |
m synchronized { | |
val itr = m.entrySet.iterator() | |
while (itr.hasNext) { | |
val e = itr.next | |
if (e.getKey.isDone) { | |
itr.remove() | |
e.getValue.completed(e.getKey) | |
} | |
} | |
} | |
Thread.sleep(50) | |
} | |
} | |
}.start() | |
def wrap[T](f: Future[T]): scala.concurrent.Future[T] = { | |
m synchronized { | |
val f2 = new Fewcha[T] { val futr = f} | |
m.put(f.asInstanceOf[Future[Any]], f2.asInstanceOf[Fewcha[Any]]) | |
f2 | |
} | |
} | |
} | |
object Main extends App { | |
import ExecutionContext.Implicits.global | |
def work: String = { println("Sleeping " + Thread.currentThread().getName); Thread.sleep(20000L); "Work!" } | |
Fewchas.global(work).onComplete { | |
case Success(s) => println("Complete! " + s + " on " + Thread.currentThread().getName) | |
case Failure(t) => t.printStackTrace() | |
} | |
println("Submitted! " + Thread.currentThread().getName) | |
} | |
object Main2 extends App { | |
import ExecutionContext.Implicits.global | |
def work: String = { println("Sleeping on: " + Thread.currentThread().getName); Thread.sleep(20000L); "Work!" } | |
val es = Executors.newSingleThreadExecutor() | |
val f = es.submit(new Callable[String] { def call() = work}) | |
println("Submitted on: " + Thread.currentThread().getName) | |
PollingFewchas.wrap(f).onComplete { | |
case Success(s) => println("Completed with [" + s + "] on: " + Thread.currentThread().getName) | |
case Failure(t) => t.printStackTrace() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment