Skip to content

Instantly share code, notes, and snippets.

@oxbowlakes
Created January 28, 2014 11:45
Show Gist options
  • Save oxbowlakes/8666295 to your computer and use it in GitHub Desktop.
Save oxbowlakes/8666295 to your computer and use it in GitHub Desktop.
Construct a scala future on top on a standard Java Executor service
package fewchaz
import scala.util.{Failure, Success, Try}
import scala.concurrent.{CanAwait, ExecutionContext}
import java.util.concurrent._
import scala.util.control.NonFatal
trait Fewcha[T] extends scala.concurrent.Future[T] {
def futr: Future[T]
private[this] final def toTry: Try[T] = try util.Success(futr.get()) catch { case NonFatal(t) => util.Failure(t)}
private final val ls = java.util.Collections.newSetFromMap(new ConcurrentHashMap[FutureListener[T], java.lang.Boolean]())
private[test] final def addListener(l: FutureListener[T]) = synchronized {
if (futr.isDone)
l.onComplete(toTry)
ls.add(l)
}
private[test] final def removeListener(l: FutureListener[T]) = ls.remove(l)
private[test] final def completed(f: Future[T]) = synchronized {
val t = toTry
val itr = ls.iterator()
while (itr.hasNext) {
val l = itr.next()
l.asInstanceOf[FutureListener[T]].onComplete(t)
}
}
def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
val l: FutureListener[T] = new FutureListener[T] {
def onComplete(t: Try[T]) = try executor.execute(new Runnable { def run() { f(t) } }) finally removeListener(this.asInstanceOf[FutureListener[T]])
}
addListener(l.asInstanceOf[FutureListener[T]])
}
def isCompleted = futr.isDone
def value: Option[Try[T]] = if (futr.isDone) Some(try util.Success(futr.get()) catch { case NonFatal(t) => util.Failure(t)}) else None
import concurrent.duration._
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
val cdl = new CountDownLatch(1)
val l: FutureListener[T] = new FutureListener[T] {
def onComplete(t: Try[T]) = cdl.countDown()
}
try {
if (!cdl.await(atMost.toNanos, NANOSECONDS))
throw new TimeoutException
this
}
finally removeListener(l.asInstanceOf[FutureListener[T]])
}
def result(atMost: Duration)(implicit permit: CanAwait): T = {
ready(atMost)
value.get.get
}
}
trait FutureListener[T] {
def onComplete(f: Try[T]): Unit
}
class Fewchas(val es: ExecutorService) {
private val cs: CompletionService[Any] = new ExecutorCompletionService[Any](es)
private val ls = new java.util.HashMap[Future[Any], Fewcha[Any]]()
new Thread {
override def run() {
try {
while (!isInterrupted) {
val f = cs.take()
val f2 = ls.synchronized(ls.remove(f))
f2.completed(f)
}
}
catch {
case _: InterruptedException => /* exit loop */
}
}
}.start()
def apply[T](t: => T) = submit(t)
def submit[T](t: => T): scala.concurrent.Future[T] = submit(new Callable[T] { def call() = t})
def submit[T](c: Callable[T]): scala.concurrent.Future[T] = {
ls.synchronized {
val f = cs.submit(c.asInstanceOf[Callable[Any]])
val f2 = new Fewcha[T] { val futr = f.asInstanceOf[Future[T]] }
ls.put(f, f2.asInstanceOf[Fewcha[Any]])
f2
}
}
def submit(r: Runnable): scala.concurrent.Future[Unit] = {
submit(new Callable[Unit] {
def call() {r.run()}
})
}
}
object Fewchas {
implicit val global = new Fewchas(Executors.newSingleThreadExecutor())
def apply(es: ExecutorService) = new Fewchas(es)
}
object PollingFewchas {
private final val m = new java.util.HashMap[Future[Any], Fewcha[Any]]
new Thread {
override def run() {
while (!isInterrupted) {
m synchronized {
val itr = m.entrySet.iterator()
while (itr.hasNext) {
val e = itr.next
if (e.getKey.isDone) {
itr.remove()
e.getValue.completed(e.getKey)
}
}
}
Thread.sleep(50)
}
}
}.start()
def wrap[T](f: Future[T]): scala.concurrent.Future[T] = {
m synchronized {
val f2 = new Fewcha[T] { val futr = f}
m.put(f.asInstanceOf[Future[Any]], f2.asInstanceOf[Fewcha[Any]])
f2
}
}
}
object Main extends App {
import ExecutionContext.Implicits.global
def work: String = { println("Sleeping " + Thread.currentThread().getName); Thread.sleep(20000L); "Work!" }
Fewchas.global(work).onComplete {
case Success(s) => println("Complete! " + s + " on " + Thread.currentThread().getName)
case Failure(t) => t.printStackTrace()
}
println("Submitted! " + Thread.currentThread().getName)
}
object Main2 extends App {
import ExecutionContext.Implicits.global
def work: String = { println("Sleeping on: " + Thread.currentThread().getName); Thread.sleep(20000L); "Work!" }
val es = Executors.newSingleThreadExecutor()
val f = es.submit(new Callable[String] { def call() = work})
println("Submitted on: " + Thread.currentThread().getName)
PollingFewchas.wrap(f).onComplete {
case Success(s) => println("Completed with [" + s + "] on: " + Thread.currentThread().getName)
case Failure(t) => t.printStackTrace()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment