Skip to content

Instantly share code, notes, and snippets.

@ngocdaothanh
Created December 6, 2013 03:22
Show Gist options
  • Save ngocdaothanh/5b8adf6a981258e0ba0b to your computer and use it in GitHub Desktop.
Save ngocdaothanh/5b8adf6a981258e0ba0b to your computer and use it in GitHub Desktop.
reactive.5
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package actorbintree
import akka.actor._
import scala.collection.immutable.Queue
object BinaryTreeSet {
trait Operation {
def requester: ActorRef
def id: Int
def elem: Int
}
trait OperationReply {
def id: Int
}
/** Request with identifier `id` to insert an element `elem` into the tree.
* The actor at reference `requester` should be notified when this operation
* is completed.
*/
case class Insert(requester: ActorRef, id: Int, elem: Int) extends Operation
/** Request with identifier `id` to check whether an element `elem` is present
* in the tree. The actor at reference `requester` should be notified when
* this operation is completed.
*/
case class Contains(requester: ActorRef, id: Int, elem: Int) extends Operation
/** Request with identifier `id` to remove the element `elem` from the tree.
* The actor at reference `requester` should be notified when this operation
* is completed.
*/
case class Remove(requester: ActorRef, id: Int, elem: Int) extends Operation
/** Request to perform garbage collection*/
case object GC
/** Holds the answer to the Contains request with identifier `id`.
* `result` is true if and only if the element is present in the tree.
*/
case class ContainsResult(id: Int, result: Boolean) extends OperationReply
/** Message to signal successful completion of an insert or remove operation. */
case class OperationFinished(id: Int) extends OperationReply
}
class BinaryTreeSet extends Actor {
import BinaryTreeSet._
import BinaryTreeNode._
private def createRoot: ActorRef = context.actorOf(BinaryTreeNode.props(0, initiallyRemoved = true))
private var root = createRoot
// optional
private var pendingQueue = Queue.empty[Operation]
// optional
def receive = normal
// optional
/** Accepts `Operation` and `GC` messages. */
private val normal: Receive = {
case o: Operation =>
root ! o
case GC =>
val newRoot = createRoot
root ! CopyTo(newRoot)
context.become(garbageCollecting(newRoot))
case unknown =>
println("[BinaryTreeSet.normal] Unknown msg: " + unknown)
}
// optional
/** Handles messages while garbage collection is performed.
* `newRoot` is the root of the new binary tree where we want to copy
* all non-removed elements into.
*/
private def garbageCollecting(newRoot: ActorRef): Receive = {
case o: Operation =>
pendingQueue = pendingQueue.enqueue(o)
case CopyFinished =>
context.stop(root)
root = newRoot
// Dump pending messages.
//
// This is a bug:
// self ! _
//
// Because the operations will become out of order if there are new
// operations coming in during the dumping.
pendingQueue.foreach(root ! _)
pendingQueue = Queue.empty[Operation]
context.become(normal)
case GC =>
// Ignored, we are doing GC already
case unknown =>
println("[BinaryTreeSet.garbageCollecting] Unknown msg: " + unknown)
}
}
object BinaryTreeNode {
trait Position
case object Left extends Position
case object Right extends Position
case class CopyTo(treeNode: ActorRef)
case object CopyFinished
def props(elem: Int, initiallyRemoved: Boolean) = Props(classOf[BinaryTreeNode], elem, initiallyRemoved)
}
class BinaryTreeNode(val elem: Int, initiallyRemoved: Boolean) extends Actor {
import BinaryTreeNode._
import BinaryTreeSet._
private var subtrees = Map[Position, ActorRef]()
private var removed = initiallyRemoved
// optional
def receive = normal
// optional
/** Handles `Operation` messages and `CopyTo` requests. */
private val normal: Receive = {
case req @ Insert(requester, id, elem) =>
if (elem == this.elem) {
removed = false
requester ! OperationFinished(id)
} else if (elem < this.elem) {
tryInsertSubtree(Left, req)
} else {
tryInsertSubtree(Right, req)
}
case req @ Contains(requester, id, elem) =>
if (elem == this.elem) {
requester ! ContainsResult(id, !removed)
} else if (elem < this.elem) {
tryContainsSubtree(Left, req)
} else {
tryContainsSubtree(Right, req)
}
case req @ Remove(requester, id, elem) =>
if (elem == this.elem) {
removed = true
requester ! OperationFinished(id)
} else if (elem < this.elem) {
tryRemoveSubtree(Left, req)
} else {
tryRemoveSubtree(Right, req)
}
case req @ CopyTo(treeNode) =>
val insertConfirmed = removed
if (!removed) treeNode ! Insert(self, 0, elem)
var expected = Set[ActorRef]()
if (subtrees.contains(Left)) {
val node = subtrees(Left)
node ! req
expected = expected + node
}
if (subtrees.contains(Right)) {
val node = subtrees(Right)
node ! req
expected = expected + node
}
tryCopyingDone(expected, insertConfirmed)
case unknown =>
println("[BinaryTreeNode.normal] Unknown msg: " + unknown)
}
// optional
/** `expected` is the set of ActorRefs whose replies we are waiting for,
* `insertConfirmed` tracks whether the copy of this node to the new tree has been confirmed.
*/
private def copying(expected: Set[ActorRef], insertConfirmed: Boolean): Receive = {
case CopyFinished =>
if (expected.contains(sender))
tryCopyingDone(expected - sender, insertConfirmed)
else
println("[BinaryTreeNode.copying] Unexpected CopyFinished")
case o: OperationFinished =>
tryCopyingDone(expected, true)
case unknown =>
println("[BinaryTreeNode.copying] Unknown msg: " + unknown)
}
//----------------------------------------------------------------------------
private def tryInsertSubtree(position: Position, req: Insert) {
subtrees.get(position) match {
case Some(node) =>
node ! req
case None =>
val node = context.actorOf(BinaryTreeNode.props(req.elem, false))
subtrees = subtrees.updated(position, node)
req.requester ! OperationFinished(req.id)
}
}
private def tryContainsSubtree(position: Position, req: Contains) {
subtrees.get(position) match {
case Some(node) => node ! req
case None => req.requester ! ContainsResult(req.id, false)
}
}
private def tryRemoveSubtree(position: Position, req: Remove) {
subtrees.get(position) match {
case Some(node) => node ! req
case None => req.requester ! OperationFinished(req.id)
}
}
private def tryCopyingDone(expected: Set[ActorRef], insertConfirmed: Boolean) {
if (expected.isEmpty && insertConfirmed) {
context.parent ! CopyFinished
// Stopped at root node
// context.stop(self)
} else {
context.become(copying(expected, insertConfirmed))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment