Created
December 13, 2011 00:55
-
-
Save derekjw/1469887 to your computer and use it in GitHub Desktop.
Dispatcher Future
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala | |
index b7f2afc..1663db6 100644 | |
--- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala | |
+++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala | |
@@ -7,8 +7,9 @@ package akka.dispatch | |
import java.util.concurrent._ | |
import akka.event.Logging.Error | |
import akka.util.{ Duration, Switch, ReentrantGuard } | |
-import atomic.{ AtomicInteger, AtomicLong } | |
+import atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicLong } | |
import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy } | |
+import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } | |
import akka.actor._ | |
import akka.actor.ActorSystem | |
import locks.ReentrantLock | |
@@ -16,6 +17,7 @@ import scala.annotation.tailrec | |
import akka.event.EventStream | |
import akka.actor.ActorSystem.Settings | |
import com.typesafe.config.Config | |
+import akka.dispatch.Await.CanAwait | |
/** | |
* @author <a href="http://jonasboner.com">Jonas Bonér</a> | |
@@ -90,6 +92,7 @@ object MessageDispatcher { | |
* @author <a href="http://jonasboner.com">Jonas Bonér</a> | |
*/ | |
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable { | |
+ self ⇒ | |
import MessageDispatcher._ | |
import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater } | |
@@ -259,6 +262,127 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext | |
* Returns the "current" emptiness status of the mailbox for the specified actor | |
*/ | |
def mailboxIsEmpty(actor: ActorCell): Boolean = !actor.mailbox.hasMessages | |
+ | |
+ /** | |
+ * The default concrete Future implementation. | |
+ */ | |
+ final class DefaultPromise[T]() extends AbstractPromise with Promise[T] { | |
+ | |
+ import DefaultPromise.{ FState, Success, Failure, Pending } | |
+ | |
+ implicit def dispatcher: MessageDispatcher = self | |
+ | |
+ protected final def tryAwait(atMost: Duration): Boolean = { | |
+ Future.blocking | |
+ | |
+ @tailrec | |
+ def awaitUnsafe(waitTimeNanos: Long): Boolean = { | |
+ if (value.isEmpty && waitTimeNanos > 0) { | |
+ val ms = NANOSECONDS.toMillis(waitTimeNanos) | |
+ val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec | |
+ val start = System.nanoTime() | |
+ try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } | |
+ | |
+ awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) | |
+ } else | |
+ value.isDefined | |
+ } | |
+ awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue) | |
+ } | |
+ | |
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type = | |
+ if (value.isDefined || tryAwait(atMost)) this | |
+ else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") | |
+ | |
+ def result(atMost: Duration)(implicit permit: CanAwait): T = | |
+ ready(atMost).value.get match { | |
+ case Left(e) ⇒ throw e | |
+ case Right(r) ⇒ r | |
+ } | |
+ | |
+ def value: Option[Either[Throwable, T]] = getState.value | |
+ | |
+ @inline | |
+ private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] | |
+ | |
+ @inline | |
+ protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) | |
+ | |
+ @inline | |
+ protected final def getState: FState[T] = updater.get(this) | |
+ | |
+ def tryComplete(value: Either[Throwable, T]): Boolean = { | |
+ val callbacks: List[Future[T] ⇒ Unit] = { | |
+ try { | |
+ @tailrec | |
+ def tryComplete: List[Future[T] ⇒ Unit] = { | |
+ val cur = getState | |
+ | |
+ cur match { | |
+ case Pending(listeners) ⇒ | |
+ if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners | |
+ else tryComplete | |
+ case _ ⇒ null | |
+ } | |
+ } | |
+ tryComplete | |
+ } finally { | |
+ synchronized { notifyAll() } //Notify any evil blockers | |
+ } | |
+ } | |
+ | |
+ callbacks match { | |
+ case null ⇒ false | |
+ case cs if cs.isEmpty ⇒ true | |
+ case cs ⇒ Future.dispatchTask(() ⇒ cs foreach notifyCompleted); true | |
+ } | |
+ } | |
+ | |
+ def onComplete(func: Future[T] ⇒ Unit): this.type = { | |
+ @tailrec //Returns whether the future has already been completed or not | |
+ def tryAddCallback(): Boolean = { | |
+ val cur = getState | |
+ cur match { | |
+ case _: Success[_] | _: Failure[_] ⇒ true | |
+ case p: Pending[_] ⇒ | |
+ val pt = p.asInstanceOf[Pending[T]] | |
+ if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() | |
+ } | |
+ } | |
+ | |
+ if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func)) | |
+ | |
+ this | |
+ } | |
+ | |
+ private def notifyCompleted(func: Future[T] ⇒ Unit) { | |
+ // TODO FIXME catching all and continue isn't good for OOME, ticket #1418 | |
+ try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } | |
+ } | |
+ } | |
+ | |
+ /** | |
+ * An already completed Future is seeded with it's result at creation, is useful for when you are participating in | |
+ * a Future-composition but you already have a value to contribute. | |
+ */ | |
+ final class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { | |
+ val value = Some(suppliedValue) | |
+ | |
+ implicit def dispatcher: MessageDispatcher = self | |
+ | |
+ def tryComplete(value: Either[Throwable, T]): Boolean = true | |
+ def onComplete(func: Future[T] ⇒ Unit): this.type = { | |
+ Future dispatchTask (() ⇒ func(this)) | |
+ this | |
+ } | |
+ | |
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this | |
+ def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { | |
+ case Left(e) ⇒ throw e | |
+ case Right(r) ⇒ r | |
+ } | |
+ } | |
+ | |
} | |
/** | |
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala | |
index 6e0691b..01e6f55 100644 | |
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala | |
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala | |
@@ -581,17 +581,17 @@ object Promise { | |
* | |
* Scala API | |
*/ | |
- def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]() | |
+ def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new dispatcher.DefaultPromise[A] | |
/** | |
* Creates an already completed Promise with the specified exception | |
*/ | |
- def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Left(exception)) | |
+ def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new dispatcher.KeptPromise[T](Left(exception)) | |
/** | |
* Creates an already completed Promise with the specified result | |
*/ | |
- def successful[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Right(result)) | |
+ def successful[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new dispatcher.KeptPromise[T](Right(result)) | |
} | |
/** | |
@@ -684,120 +684,3 @@ private[dispatch] object DefaultPromise { | |
} | |
private val emptyPendingValue = Pending[Nothing](Nil) | |
} | |
- | |
-/** | |
- * The default concrete Future implementation. | |
- */ | |
-class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] { | |
- self ⇒ | |
- | |
- import DefaultPromise.{ FState, Success, Failure, Pending } | |
- | |
- protected final def tryAwait(atMost: Duration): Boolean = { | |
- Future.blocking | |
- | |
- @tailrec | |
- def awaitUnsafe(waitTimeNanos: Long): Boolean = { | |
- if (value.isEmpty && waitTimeNanos > 0) { | |
- val ms = NANOSECONDS.toMillis(waitTimeNanos) | |
- val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec | |
- val start = System.nanoTime() | |
- try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } | |
- | |
- awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) | |
- } else | |
- value.isDefined | |
- } | |
- awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue) | |
- } | |
- | |
- def ready(atMost: Duration)(implicit permit: CanAwait): this.type = | |
- if (value.isDefined || tryAwait(atMost)) this | |
- else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") | |
- | |
- def result(atMost: Duration)(implicit permit: CanAwait): T = | |
- ready(atMost).value.get match { | |
- case Left(e) ⇒ throw e | |
- case Right(r) ⇒ r | |
- } | |
- | |
- def value: Option[Either[Throwable, T]] = getState.value | |
- | |
- @inline | |
- private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] | |
- | |
- @inline | |
- protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) | |
- | |
- @inline | |
- protected final def getState: FState[T] = updater.get(this) | |
- | |
- def tryComplete(value: Either[Throwable, T]): Boolean = { | |
- val callbacks: List[Future[T] ⇒ Unit] = { | |
- try { | |
- @tailrec | |
- def tryComplete: List[Future[T] ⇒ Unit] = { | |
- val cur = getState | |
- | |
- cur match { | |
- case Pending(listeners) ⇒ | |
- if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners | |
- else tryComplete | |
- case _ ⇒ null | |
- } | |
- } | |
- tryComplete | |
- } finally { | |
- synchronized { notifyAll() } //Notify any evil blockers | |
- } | |
- } | |
- | |
- callbacks match { | |
- case null ⇒ false | |
- case cs if cs.isEmpty ⇒ true | |
- case cs ⇒ Future.dispatchTask(() ⇒ cs foreach notifyCompleted); true | |
- } | |
- } | |
- | |
- def onComplete(func: Future[T] ⇒ Unit): this.type = { | |
- @tailrec //Returns whether the future has already been completed or not | |
- def tryAddCallback(): Boolean = { | |
- val cur = getState | |
- cur match { | |
- case _: Success[_] | _: Failure[_] ⇒ true | |
- case p: Pending[_] ⇒ | |
- val pt = p.asInstanceOf[Pending[T]] | |
- if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() | |
- } | |
- } | |
- | |
- if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func)) | |
- | |
- this | |
- } | |
- | |
- private def notifyCompleted(func: Future[T] ⇒ Unit) { | |
- // TODO FIXME catching all and continue isn't good for OOME, ticket #1418 | |
- try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } | |
- } | |
-} | |
- | |
-/** | |
- * An already completed Future is seeded with it's result at creation, is useful for when you are participating in | |
- * a Future-composition but you already have a value to contribute. | |
- */ | |
-final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] { | |
- val value = Some(suppliedValue) | |
- | |
- def tryComplete(value: Either[Throwable, T]): Boolean = true | |
- def onComplete(func: Future[T] ⇒ Unit): this.type = { | |
- Future dispatchTask (() ⇒ func(this)) | |
- this | |
- } | |
- | |
- def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this | |
- def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { | |
- case Left(e) ⇒ throw e | |
- case Right(r) ⇒ r | |
- } | |
-} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment