Created
December 6, 2013 03:22
-
-
Save ngocdaothanh/5b8adf6a981258e0ba0b to your computer and use it in GitHub Desktop.
reactive.5
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> | |
*/ | |
package actorbintree | |
import akka.actor._ | |
import scala.collection.immutable.Queue | |
object BinaryTreeSet { | |
trait Operation { | |
def requester: ActorRef | |
def id: Int | |
def elem: Int | |
} | |
trait OperationReply { | |
def id: Int | |
} | |
/** Request with identifier `id` to insert an element `elem` into the tree. | |
* The actor at reference `requester` should be notified when this operation | |
* is completed. | |
*/ | |
case class Insert(requester: ActorRef, id: Int, elem: Int) extends Operation | |
/** Request with identifier `id` to check whether an element `elem` is present | |
* in the tree. The actor at reference `requester` should be notified when | |
* this operation is completed. | |
*/ | |
case class Contains(requester: ActorRef, id: Int, elem: Int) extends Operation | |
/** Request with identifier `id` to remove the element `elem` from the tree. | |
* The actor at reference `requester` should be notified when this operation | |
* is completed. | |
*/ | |
case class Remove(requester: ActorRef, id: Int, elem: Int) extends Operation | |
/** Request to perform garbage collection*/ | |
case object GC | |
/** Holds the answer to the Contains request with identifier `id`. | |
* `result` is true if and only if the element is present in the tree. | |
*/ | |
case class ContainsResult(id: Int, result: Boolean) extends OperationReply | |
/** Message to signal successful completion of an insert or remove operation. */ | |
case class OperationFinished(id: Int) extends OperationReply | |
} | |
class BinaryTreeSet extends Actor { | |
import BinaryTreeSet._ | |
import BinaryTreeNode._ | |
private def createRoot: ActorRef = context.actorOf(BinaryTreeNode.props(0, initiallyRemoved = true)) | |
private var root = createRoot | |
// optional | |
private var pendingQueue = Queue.empty[Operation] | |
// optional | |
def receive = normal | |
// optional | |
/** Accepts `Operation` and `GC` messages. */ | |
private val normal: Receive = { | |
case o: Operation => | |
root ! o | |
case GC => | |
val newRoot = createRoot | |
root ! CopyTo(newRoot) | |
context.become(garbageCollecting(newRoot)) | |
case unknown => | |
println("[BinaryTreeSet.normal] Unknown msg: " + unknown) | |
} | |
// optional | |
/** Handles messages while garbage collection is performed. | |
* `newRoot` is the root of the new binary tree where we want to copy | |
* all non-removed elements into. | |
*/ | |
private def garbageCollecting(newRoot: ActorRef): Receive = { | |
case o: Operation => | |
pendingQueue = pendingQueue.enqueue(o) | |
case CopyFinished => | |
context.stop(root) | |
root = newRoot | |
// Dump pending messages. | |
// | |
// This is a bug: | |
// self ! _ | |
// | |
// Because the operations will become out of order if there are new | |
// operations coming in during the dumping. | |
pendingQueue.foreach(root ! _) | |
pendingQueue = Queue.empty[Operation] | |
context.become(normal) | |
case GC => | |
// Ignored, we are doing GC already | |
case unknown => | |
println("[BinaryTreeSet.garbageCollecting] Unknown msg: " + unknown) | |
} | |
} | |
object BinaryTreeNode { | |
trait Position | |
case object Left extends Position | |
case object Right extends Position | |
case class CopyTo(treeNode: ActorRef) | |
case object CopyFinished | |
def props(elem: Int, initiallyRemoved: Boolean) = Props(classOf[BinaryTreeNode], elem, initiallyRemoved) | |
} | |
class BinaryTreeNode(val elem: Int, initiallyRemoved: Boolean) extends Actor { | |
import BinaryTreeNode._ | |
import BinaryTreeSet._ | |
private var subtrees = Map[Position, ActorRef]() | |
private var removed = initiallyRemoved | |
// optional | |
def receive = normal | |
// optional | |
/** Handles `Operation` messages and `CopyTo` requests. */ | |
private val normal: Receive = { | |
case req @ Insert(requester, id, elem) => | |
if (elem == this.elem) { | |
removed = false | |
requester ! OperationFinished(id) | |
} else if (elem < this.elem) { | |
tryInsertSubtree(Left, req) | |
} else { | |
tryInsertSubtree(Right, req) | |
} | |
case req @ Contains(requester, id, elem) => | |
if (elem == this.elem) { | |
requester ! ContainsResult(id, !removed) | |
} else if (elem < this.elem) { | |
tryContainsSubtree(Left, req) | |
} else { | |
tryContainsSubtree(Right, req) | |
} | |
case req @ Remove(requester, id, elem) => | |
if (elem == this.elem) { | |
removed = true | |
requester ! OperationFinished(id) | |
} else if (elem < this.elem) { | |
tryRemoveSubtree(Left, req) | |
} else { | |
tryRemoveSubtree(Right, req) | |
} | |
case req @ CopyTo(treeNode) => | |
val insertConfirmed = removed | |
if (!removed) treeNode ! Insert(self, 0, elem) | |
var expected = Set[ActorRef]() | |
if (subtrees.contains(Left)) { | |
val node = subtrees(Left) | |
node ! req | |
expected = expected + node | |
} | |
if (subtrees.contains(Right)) { | |
val node = subtrees(Right) | |
node ! req | |
expected = expected + node | |
} | |
tryCopyingDone(expected, insertConfirmed) | |
case unknown => | |
println("[BinaryTreeNode.normal] Unknown msg: " + unknown) | |
} | |
// optional | |
/** `expected` is the set of ActorRefs whose replies we are waiting for, | |
* `insertConfirmed` tracks whether the copy of this node to the new tree has been confirmed. | |
*/ | |
private def copying(expected: Set[ActorRef], insertConfirmed: Boolean): Receive = { | |
case CopyFinished => | |
if (expected.contains(sender)) | |
tryCopyingDone(expected - sender, insertConfirmed) | |
else | |
println("[BinaryTreeNode.copying] Unexpected CopyFinished") | |
case o: OperationFinished => | |
tryCopyingDone(expected, true) | |
case unknown => | |
println("[BinaryTreeNode.copying] Unknown msg: " + unknown) | |
} | |
//---------------------------------------------------------------------------- | |
private def tryInsertSubtree(position: Position, req: Insert) { | |
subtrees.get(position) match { | |
case Some(node) => | |
node ! req | |
case None => | |
val node = context.actorOf(BinaryTreeNode.props(req.elem, false)) | |
subtrees = subtrees.updated(position, node) | |
req.requester ! OperationFinished(req.id) | |
} | |
} | |
private def tryContainsSubtree(position: Position, req: Contains) { | |
subtrees.get(position) match { | |
case Some(node) => node ! req | |
case None => req.requester ! ContainsResult(req.id, false) | |
} | |
} | |
private def tryRemoveSubtree(position: Position, req: Remove) { | |
subtrees.get(position) match { | |
case Some(node) => node ! req | |
case None => req.requester ! OperationFinished(req.id) | |
} | |
} | |
private def tryCopyingDone(expected: Set[ActorRef], insertConfirmed: Boolean) { | |
if (expected.isEmpty && insertConfirmed) { | |
context.parent ! CopyFinished | |
// Stopped at root node | |
// context.stop(self) | |
} else { | |
context.become(copying(expected, insertConfirmed)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment