Skip to content

Instantly share code, notes, and snippets.

@ashee
Forked from viktorklang/minscalaactors.scala
Created April 12, 2012 12:53
Show Gist options
  • Save ashee/2367023 to your computer and use it in GitHub Desktop.
Save ashee/2367023 to your computer and use it in GitHub Desktop.
Minimalist Scala Actors
©2012 Viktor Klang
object Actor {
object Ops { // Actor Behavior results
sealed trait Op
case object Stay extends Op
case class Become(b: Any => Op) extends Op
case object Die extends Become( // Mandatory for this all to work, but it's convenient
msg => { println("Dropping msg [" + msg + "]for " + this + " due to severe case of death."); Stay } // Stay Dead plz
)
}
import java.util.concurrent.{ConcurrentLinkedQueue, Executor}
import java.util.concurrent.atomic.{AtomicBoolean}
import Ops._
trait Address { // The notion of an Address to where you can post messages to
def !(msg: Any): Unit
}
def apply(initial: Address => Any => Op)(implicit e: Executor): Address = {
val a: Address = new Address with Runnable {
private final val mbox = new ConcurrentLinkedQueue[Any]
private final val on = new AtomicBoolean(false)
private var behavior: Any => Op = { // The rebindable point on top of the mailbox
case a: Address => Become(initial(a)) // Bootstrap
case other => println("Unknown message: " + other); Stay
}
final def trySchedule(): Unit = if(!mbox.isEmpty && on.compareAndSet(false, true)) {
try e.execute(this) catch { case anything => assert(on.getAndSet(false)); throw anything }
}
final def reactTo(msg: Any): Unit = behavior(msg) match {
case Become(newBeh) => behavior = newBeh
case Stay => //Just remain as you were
}
final def run() = try reactTo(mbox.poll()) finally {
assert(on.getAndSet(false))
trySchedule()
}
final override def !(msg: Any): Unit = behavior match {
case f if f eq Die.b => f(msg) // Efficiently bail out if we _known_ to be dead
case _ => assert(mbox.offer(msg)); trySchedule()
}
}
a ! a // Make the actor self aware by seeding its address to the initial behavior
a
}
}
//Usage
import Actor.Ops._
implicit val e: java.util.concurrent.Executor = java.util.concurrent.Executors.newCachedThreadPool
//Creates an actor that will, after it's first message is received, Die
val actor = Actor( self => msg => { println("self: " + self + " got msg " + msg); Die } )
actor ! "foo"
actor ! "foo"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment