Skip to content

Instantly share code, notes, and snippets.

@derekjw
Created April 21, 2011 21:20
Show Gist options
  • Save derekjw/935500 to your computer and use it in GitHub Desktop.
Save derekjw/935500 to your computer and use it in GitHub Desktop.
Delimited Continuations with Akka Future
diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala
index e12294a..c90ddb9 100644
--- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala
@@ -381,4 +381,41 @@ class FutureSpec extends JUnitSuite {
assert(dataflowVar2() === 5)
assert(dataflowVar.get === 5)
}
+
+ @Test def futureComposingWithContinuations {
+ import Future.{reset,shift}
+
+ val x = Future("Hello ")
+ val y = Future("World")
+
+ val r = reset(shift(x) + shift(y))
+
+ assert(r.get === "Hello World")
+ }
+
+ @Test def futureCompletingWithContinuations {
+ import Future.reset
+
+ val begin, end = new StandardLatch
+
+ val dataflowVar = Future {
+ begin.await
+ 5
+ }
+
+ val dataflowVar2 = new DefaultCompletableFuture[Int](Long.MaxValue)
+
+ reset {
+ dataflowVar2 <<~ dataflowVar
+ end.open
+ }
+
+ assert(end.isOpen === false)
+
+ begin.open
+ end.await
+
+ assert(end.isOpen === true)
+ assert(dataflowVar2.get === 5)
+ }
}
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index c69ca82..7b052a7 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -10,6 +10,8 @@ import akka.actor.Actor
import akka.routing.Dispatcher
import akka.japi.{ Procedure, Function => JFunc }
+import scala.util.{continuations => cps}
+
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable}
import java.util.concurrent.TimeUnit.{NANOSECONDS => NANOS, MILLISECONDS => MILLIS}
@@ -261,6 +263,21 @@ object Future {
val fb = fn(a.asInstanceOf[A])
for (r <- fr; b <-fb) yield (r += b)
}.map(_.result)
+
+ def reset[A](f: => A @cps.suspendable, timeout: Long = Actor.TIMEOUT): Future[A] = {
+ val future = new DefaultCompletableFuture[A](timeout)
+ Future {
+ cps.reset {
+ future << f
+ ()
+ }
+ }
+ future
+ }
+
+ def shift[A](future: Future[A]): A @cps.suspendable = cps.shift { k: (A => Unit) =>
+ future foreach k
+ }
}
sealed trait Future[+T] {
@@ -585,6 +602,11 @@ trait CompletableFuture[T] extends Future[T] {
* Alias for completeWith(other).
*/
final def << (other : Future[T]): Future[T] = completeWith(other)
+
+ final def <<~ (other: Future[T]): Future[T] @cps.suspendable = cps.shift { k: (Future[T] => Unit) =>
+ this << other onComplete k
+ }
+
}
/**
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index 8073d88..db1f97a 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -10,7 +10,7 @@ import sbt._
import sbt.CompileOrder._
import spde._
-class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
+class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) with AutoCompilerPlugins {
// -------------------------------------------------------------------------------------------------------------------
// Compile settings
@@ -273,8 +273,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// akka-actor subproject
// -------------------------------------------------------------------------------------------------------------------
- class AkkaActorProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with OsgiProject {
+ class AkkaActorProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with OsgiProject with AutoCompilerPlugins {
override def bndExportPackage = super.bndExportPackage ++ Seq("com.eaio.*;version=3.2")
+ val cont = compilerPlugin("org.scala-lang.plugins" % "continuations" % "2.9.0.RC1")
+ override def compileOptions = super.compileOptions ++ compileOptions("-P:continuations:enable")
}
// -------------------------------------------------------------------------------------------------------------------
@@ -436,11 +438,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// akka-actor-tests subproject
// -------------------------------------------------------------------------------------------------------------------
- class AkkaActorTestsProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
+ class AkkaActorTestsProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) with AutoCompilerPlugins {
// testing
val junit = Dependencies.junit
val scalatest = Dependencies.scalatest
val multiverse_test = Dependencies.multiverse_test // StandardLatch
+ val cont = compilerPlugin("org.scala-lang.plugins" % "continuations" % "2.9.0.RC1")
+ override def compileOptions = super.compileOptions ++ compileOptions("-P:continuations:enable")
}
// -------------------------------------------------------------------------------------------------------------------
@derekjw
Copy link
Author

derekjw commented Apr 21, 2011

'shift(future)' causes exceptions to be lost, I need to make sure they get thrown again so the 'Future' returned by 'reset' can catch it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment