Created
April 21, 2011 21:20
-
-
Save derekjw/935500 to your computer and use it in GitHub Desktop.
Delimited Continuations with Akka Future
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala | |
index e12294a..c90ddb9 100644 | |
--- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala | |
+++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala | |
@@ -381,4 +381,41 @@ class FutureSpec extends JUnitSuite { | |
assert(dataflowVar2() === 5) | |
assert(dataflowVar.get === 5) | |
} | |
+ | |
+ @Test def futureComposingWithContinuations { | |
+ import Future.{reset,shift} | |
+ | |
+ val x = Future("Hello ") | |
+ val y = Future("World") | |
+ | |
+ val r = reset(shift(x) + shift(y)) | |
+ | |
+ assert(r.get === "Hello World") | |
+ } | |
+ | |
+ @Test def futureCompletingWithContinuations { | |
+ import Future.reset | |
+ | |
+ val begin, end = new StandardLatch | |
+ | |
+ val dataflowVar = Future { | |
+ begin.await | |
+ 5 | |
+ } | |
+ | |
+ val dataflowVar2 = new DefaultCompletableFuture[Int](Long.MaxValue) | |
+ | |
+ reset { | |
+ dataflowVar2 <<~ dataflowVar | |
+ end.open | |
+ } | |
+ | |
+ assert(end.isOpen === false) | |
+ | |
+ begin.open | |
+ end.await | |
+ | |
+ assert(end.isOpen === true) | |
+ assert(dataflowVar2.get === 5) | |
+ } | |
} | |
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala | |
index c69ca82..7b052a7 100644 | |
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala | |
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala | |
@@ -10,6 +10,8 @@ import akka.actor.Actor | |
import akka.routing.Dispatcher | |
import akka.japi.{ Procedure, Function => JFunc } | |
+import scala.util.{continuations => cps} | |
+ | |
import java.util.concurrent.locks.ReentrantLock | |
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable} | |
import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS} | |
@@ -261,6 +263,21 @@ object Future { | |
val fb = fn(a.asInstanceOf[A]) | |
for (r <- fr; b <-fb) yield (r += b) | |
}.map(_.result) | |
+ | |
+ def reset[A](f: => A @cps.suspendable, timeout: Long = Actor.TIMEOUT): Future[A] = { | |
+ val future = new DefaultCompletableFuture[A](timeout) | |
+ Future { | |
+ cps.reset { | |
+ future << f | |
+ () | |
+ } | |
+ } | |
+ future | |
+ } | |
+ | |
+ def shift[A](future: Future[A]): A @cps.suspendable = cps.shift { k: (A => Unit) => | |
+ future foreach k | |
+ } | |
} | |
sealed trait Future[+T] { | |
@@ -585,6 +602,11 @@ trait CompletableFuture[T] extends Future[T] { | |
* Alias for completeWith(other). | |
*/ | |
final def << (other : Future[T]): Future[T] = completeWith(other) | |
+ | |
+ final def <<~ (other: Future[T]): Future[T] @cps.suspendable = cps.shift { k: (Future[T] => Unit) => | |
+ this << other onComplete k | |
+ } | |
+ | |
} | |
/** | |
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala | |
index 8073d88..db1f97a 100644 | |
--- a/project/build/AkkaProject.scala | |
+++ b/project/build/AkkaProject.scala | |
@@ -10,7 +10,7 @@ import sbt._ | |
import sbt.CompileOrder._ | |
import spde._ | |
-class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { | |
+class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) with AutoCompilerPlugins { | |
// ------------------------------------------------------------------------------------------------------------------- | |
// Compile settings | |
@@ -273,8 +273,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { | |
// akka-actor subproject | |
// ------------------------------------------------------------------------------------------------------------------- | |
- class AkkaActorProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with OsgiProject { | |
+ class AkkaActorProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with OsgiProject with AutoCompilerPlugins { | |
override def bndExportPackage = super.bndExportPackage ++ Seq("com.eaio.*;version=3.2") | |
+ val cont = compilerPlugin("org.scala-lang.plugins" % "continuations" % "2.9.0.RC1") | |
+ override def compileOptions = super.compileOptions ++ compileOptions("-P:continuations:enable") | |
} | |
// ------------------------------------------------------------------------------------------------------------------- | |
@@ -436,11 +438,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { | |
// akka-actor-tests subproject | |
// ------------------------------------------------------------------------------------------------------------------- | |
- class AkkaActorTestsProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { | |
+ class AkkaActorTestsProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with AutoCompilerPlugins { | |
// testing | |
val junit = Dependencies.junit | |
val scalatest = Dependencies.scalatest | |
val multiverse_test = Dependencies.multiverse_test // StandardLatch | |
+ val cont = compilerPlugin("org.scala-lang.plugins" % "continuations" % "2.9.0.RC1") | |
+ override def compileOptions = super.compileOptions ++ compileOptions("-P:continuations:enable") | |
} | |
// ------------------------------------------------------------------------------------------------------------------- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
'shift(future)' causes exceptions to be lost, I need to make sure they get thrown again so the 'Future' returned by 'reset' can catch it.