Created
April 3, 2011 18:45
-
-
Save derekjw/900664 to your computer and use it in GitHub Desktop.
Some possible ideas for making dataflow easier with Future
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala | |
index cfe64a8..f1fb976 100644 | |
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala | |
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala | |
@@ -217,6 +217,15 @@ object Future { | |
dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body)) | |
f | |
} | |
+ | |
+ def task(body: => Unit, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[Unit] = | |
+ apply(body, timeout)(dispatcher) | |
+ | |
+ def promise[T](timeout: Long = Actor.TIMEOUT): CompletableFuture[T] = | |
+ new DefaultCompletableFuture[T](timeout) | |
+ | |
+ def value[T](in: Either[Throwable,T]): Future[T] = | |
+ new AlreadyCompletedFuture(in) | |
} | |
sealed trait Future[+T] { | |
@@ -335,6 +344,14 @@ sealed trait Future[+T] { | |
} | |
} | |
+ final def onException(pf: PartialFunction[Throwable, Unit]): Future[T] = onComplete { f => | |
+ val opte = f.exception | |
+ if (opte.isDefined) { | |
+ val e = opte.get | |
+ if (pf.isDefinedAt(e)) pf(e) | |
+ } | |
+ } | |
+ | |
/** | |
* Creates a new Future by applying a PartialFunction to the successful | |
* result of this Future if a match is found, or else return a MatchError. | |
diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala | |
index a946713..afa7f26 100644 | |
--- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala | |
+++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala | |
@@ -364,16 +364,16 @@ class FutureSpec extends JUnitSuite { | |
} | |
@Test def lesslessIsMore { | |
- import akka.actor.Actor.spawn | |
- val dataflowVar, dataflowVar2 = new DefaultCompletableFuture[Int](Long.MaxValue) | |
+ import Future.task | |
+ val dataflowVar, dataflowVar2 = Future.promise[Int]() | |
val begin, end = new StandardLatch | |
- spawn { | |
+ task { | |
begin.await | |
dataflowVar2 << dataflowVar | |
end.open | |
} | |
- spawn { | |
+ task { | |
dataflowVar << 5 | |
} | |
begin.open | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment