Skip to content

Instantly share code, notes, and snippets.

@derekjw
Created December 13, 2011 00:55
Show Gist options
  • Save derekjw/1469887 to your computer and use it in GitHub Desktop.
Save derekjw/1469887 to your computer and use it in GitHub Desktop.
Dispatcher Future
diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
index b7f2afc..1663db6 100644
--- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
@@ -7,8 +7,9 @@ package akka.dispatch
import java.util.concurrent._
import akka.event.Logging.Error
import akka.util.{ Duration, Switch, ReentrantGuard }
-import atomic.{ AtomicInteger, AtomicLong }
+import atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicLong }
import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy }
+import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
import akka.actor._
import akka.actor.ActorSystem
import locks.ReentrantLock
@@ -16,6 +17,7 @@ import scala.annotation.tailrec
import akka.event.EventStream
import akka.actor.ActorSystem.Settings
import com.typesafe.config.Config
+import akka.dispatch.Await.CanAwait
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@@ -90,6 +92,7 @@ object MessageDispatcher {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable {
+ self ⇒
import MessageDispatcher._
import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater }
@@ -259,6 +262,127 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
* Returns the "current" emptiness status of the mailbox for the specified actor
*/
def mailboxIsEmpty(actor: ActorCell): Boolean = !actor.mailbox.hasMessages
+
+ /**
+ * The default concrete Future implementation.
+ */
+ final class DefaultPromise[T]() extends AbstractPromise with Promise[T] {
+
+ import DefaultPromise.{ FState, Success, Failure, Pending }
+
+ implicit def dispatcher: MessageDispatcher = self
+
+ protected final def tryAwait(atMost: Duration): Boolean = {
+ Future.blocking
+
+ @tailrec
+ def awaitUnsafe(waitTimeNanos: Long): Boolean = {
+ if (value.isEmpty && waitTimeNanos > 0) {
+ val ms = NANOSECONDS.toMillis(waitTimeNanos)
+ val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
+ val start = System.nanoTime()
+ try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
+
+ awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
+ } else
+ value.isDefined
+ }
+ awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
+ }
+
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
+ if (value.isDefined || tryAwait(atMost)) this
+ else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
+
+ def result(atMost: Duration)(implicit permit: CanAwait): T =
+ ready(atMost).value.get match {
+ case Left(e) ⇒ throw e
+ case Right(r) ⇒ r
+ }
+
+ def value: Option[Either[Throwable, T]] = getState.value
+
+ @inline
+ private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]]
+
+ @inline
+ protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState)
+
+ @inline
+ protected final def getState: FState[T] = updater.get(this)
+
+ def tryComplete(value: Either[Throwable, T]): Boolean = {
+ val callbacks: List[Future[T] ⇒ Unit] = {
+ try {
+ @tailrec
+ def tryComplete: List[Future[T] ⇒ Unit] = {
+ val cur = getState
+
+ cur match {
+ case Pending(listeners) ⇒
+ if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners
+ else tryComplete
+ case _ ⇒ null
+ }
+ }
+ tryComplete
+ } finally {
+ synchronized { notifyAll() } //Notify any evil blockers
+ }
+ }
+
+ callbacks match {
+ case null ⇒ false
+ case cs if cs.isEmpty ⇒ true
+ case cs ⇒ Future.dispatchTask(() ⇒ cs foreach notifyCompleted); true
+ }
+ }
+
+ def onComplete(func: Future[T] ⇒ Unit): this.type = {
+ @tailrec //Returns whether the future has already been completed or not
+ def tryAddCallback(): Boolean = {
+ val cur = getState
+ cur match {
+ case _: Success[_] | _: Failure[_] ⇒ true
+ case p: Pending[_] ⇒
+ val pt = p.asInstanceOf[Pending[T]]
+ if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
+ }
+ }
+
+ if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func))
+
+ this
+ }
+
+ private def notifyCompleted(func: Future[T] ⇒ Unit) {
+ // TODO FIXME catching all and continue isn't good for OOME, ticket #1418
+ try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) }
+ }
+ }
+
+ /**
+ * An already completed Future is seeded with it's result at creation, is useful for when you are participating in
+ * a Future-composition but you already have a value to contribute.
+ */
+ final class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] {
+ val value = Some(suppliedValue)
+
+ implicit def dispatcher: MessageDispatcher = self
+
+ def tryComplete(value: Either[Throwable, T]): Boolean = true
+ def onComplete(func: Future[T] ⇒ Unit): this.type = {
+ Future dispatchTask (() ⇒ func(this))
+ this
+ }
+
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
+ def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
+ case Left(e) ⇒ throw e
+ case Right(r) ⇒ r
+ }
+ }
+
}
/**
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index 6e0691b..01e6f55 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -581,17 +581,17 @@ object Promise {
*
* Scala API
*/
- def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new DefaultPromise[A]()
+ def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = new dispatcher.DefaultPromise[A]
/**
* Creates an already completed Promise with the specified exception
*/
- def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Left(exception))
+ def failed[T](exception: Throwable)(implicit dispatcher: MessageDispatcher): Promise[T] = new dispatcher.KeptPromise[T](Left(exception))
/**
* Creates an already completed Promise with the specified result
*/
- def successful[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new KeptPromise[T](Right(result))
+ def successful[T](result: T)(implicit dispatcher: MessageDispatcher): Promise[T] = new dispatcher.KeptPromise[T](Right(result))
}
/**
@@ -684,120 +684,3 @@ private[dispatch] object DefaultPromise {
}
private val emptyPendingValue = Pending[Nothing](Nil)
}
-
-/**
- * The default concrete Future implementation.
- */
-class DefaultPromise[T](implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] {
- self ⇒
-
- import DefaultPromise.{ FState, Success, Failure, Pending }
-
- protected final def tryAwait(atMost: Duration): Boolean = {
- Future.blocking
-
- @tailrec
- def awaitUnsafe(waitTimeNanos: Long): Boolean = {
- if (value.isEmpty && waitTimeNanos > 0) {
- val ms = NANOSECONDS.toMillis(waitTimeNanos)
- val ns = (waitTimeNanos % 1000000l).toInt //As per object.wait spec
- val start = System.nanoTime()
- try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ }
-
- awaitUnsafe(waitTimeNanos - (System.nanoTime() - start))
- } else
- value.isDefined
- }
- awaitUnsafe(if (atMost.isFinite) atMost.toNanos else Long.MaxValue)
- }
-
- def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
- if (value.isDefined || tryAwait(atMost)) this
- else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
-
- def result(atMost: Duration)(implicit permit: CanAwait): T =
- ready(atMost).value.get match {
- case Left(e) ⇒ throw e
- case Right(r) ⇒ r
- }
-
- def value: Option[Either[Throwable, T]] = getState.value
-
- @inline
- private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]]
-
- @inline
- protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState)
-
- @inline
- protected final def getState: FState[T] = updater.get(this)
-
- def tryComplete(value: Either[Throwable, T]): Boolean = {
- val callbacks: List[Future[T] ⇒ Unit] = {
- try {
- @tailrec
- def tryComplete: List[Future[T] ⇒ Unit] = {
- val cur = getState
-
- cur match {
- case Pending(listeners) ⇒
- if (updateState(cur, if (value.isLeft) Failure(Some(value)) else Success(Some(value)))) listeners
- else tryComplete
- case _ ⇒ null
- }
- }
- tryComplete
- } finally {
- synchronized { notifyAll() } //Notify any evil blockers
- }
- }
-
- callbacks match {
- case null ⇒ false
- case cs if cs.isEmpty ⇒ true
- case cs ⇒ Future.dispatchTask(() ⇒ cs foreach notifyCompleted); true
- }
- }
-
- def onComplete(func: Future[T] ⇒ Unit): this.type = {
- @tailrec //Returns whether the future has already been completed or not
- def tryAddCallback(): Boolean = {
- val cur = getState
- cur match {
- case _: Success[_] | _: Failure[_] ⇒ true
- case p: Pending[_] ⇒
- val pt = p.asInstanceOf[Pending[T]]
- if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback()
- }
- }
-
- if (tryAddCallback()) Future.dispatchTask(() ⇒ notifyCompleted(func))
-
- this
- }
-
- private def notifyCompleted(func: Future[T] ⇒ Unit) {
- // TODO FIXME catching all and continue isn't good for OOME, ticket #1418
- try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) }
- }
-}
-
-/**
- * An already completed Future is seeded with it's result at creation, is useful for when you are participating in
- * a Future-composition but you already have a value to contribute.
- */
-final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val dispatcher: MessageDispatcher) extends Promise[T] {
- val value = Some(suppliedValue)
-
- def tryComplete(value: Either[Throwable, T]): Boolean = true
- def onComplete(func: Future[T] ⇒ Unit): this.type = {
- Future dispatchTask (() ⇒ func(this))
- this
- }
-
- def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
- def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
- case Left(e) ⇒ throw e
- case Right(r) ⇒ r
- }
-}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment