Skip to content

Instantly share code, notes, and snippets.

@izmailoff
Created April 3, 2016 13:34
Show Gist options
  • Save izmailoff/e922d1e44dfc60462e58d04ea94c6f62 to your computer and use it in GitHub Desktop.
Save izmailoff/e922d1e44dfc60462e58d04ea94c6f62 to your computer and use it in GitHub Desktop.
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Try
case class Node[T](value: T, children: List[Node[T]])
object DagFuture extends App {
def run[A, B](node: Node[A], result: B)(nodeEval: (Node[A], B) => B)(aggregator: Traversable[B] => B): Future[B] = {
val nodeResult = Future(nodeEval(node, result))
val allResults = nodeResult flatMap { r => Future.sequence(nodeResult :: node.children.map { x => run(x, r)(nodeEval)(aggregator) }) }
allResults map aggregator
}
val debugSum = (l: Traversable[Int]) => {
println(s"aggregating: $l")
l sum
}
def debugNodeEval(f: (Node[Int], Int) => Int)(n: Node[Int], r: Int) = {
val eval = Try {
f(n, r)
}
println(s"node: $n, result: $r, eval: $eval")
eval get
}
val debugNodeEvalDefault = debugNodeEval((n, r) => n.value + r) _
val singleNodeDag = Node(1, Nil)
val multiNodeDag = Node(1, List(Node(20, Nil), Node(300, Nil)))
println("\nSINGLE NODE DAG EXAMPLE:")
val singleNodeFuture = run(singleNodeDag, 0)(debugNodeEvalDefault)(debugSum)
val singleNodeResult = Await.result(singleNodeFuture, 5 seconds)
println(s"Single node result: $singleNodeResult")
println("\nDAG PATH LENGTH EXAMPLE:")
val pathLengthFuture = run(multiNodeDag, 0)(debugNodeEvalDefault)(debugSum)
val pathLengthResult = Await.result(pathLengthFuture, 5 seconds)
println(s"Path length: $pathLengthResult")
println("\nFAILED DAG ROOT NODE EXAMPLE:")
val failedRootNodeFuture = run(multiNodeDag, 0)(debugNodeEval((n, r) => throw new Exception))(debugSum)
val failedRootNodePromise = Await.ready(failedRootNodeFuture, 5 seconds)
println(s"Failed root node: ${failedRootNodePromise.value}")
println("\nFAILED DAG CHILD NODE EXAMPLE:")
val failedChildNodeFuture = run(multiNodeDag, 0)(debugNodeEval((n, r) => if (n.value == 300) throw new Exception else n.value + r))(debugSum)
val failedChildNodePromise = Await.ready(failedChildNodeFuture, 5 seconds)
println(s"Failed child node: ${failedChildNodePromise.value}")
}
@izmailoff
Copy link
Author

Prints:


SINGLE NODE DAG EXAMPLE:
node: Node(1,List()), result: 0, eval: Success(1)
aggregating: List(1)
Single node result: 1

DAG PATH LENGTH EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Success(1)
node: Node(20,List()), result: 1, eval: Success(21)
aggregating: List(21)
node: Node(300,List()), result: 1, eval: Success(301)
aggregating: List(301)
aggregating: List(1, 21, 301)
Path length: 323

FAILED DAG ROOT NODE EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Failure(java.lang.Exception)
Failed root node: Some(Failure(java.lang.Exception))

FAILED DAG CHILD NODE EXAMPLE:
node: Node(1,List(Node(20,List()), Node(300,List()))), result: 0, eval: Success(1)
node: Node(20,List()), result: 1, eval: Success(21)
node: Node(300,List()), result: 1, eval: Failure(java.lang.Exception)
aggregating: List(21)
Failed child node: Some(Failure(java.lang.Exception))

@izmailoff
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment