Skip to content

Instantly share code, notes, and snippets.

View remeniuk's full-sized avatar

Vasil Remeniuk remeniuk

View GitHub Profile
/**
* Publishes all registered local actor as a remote ref on all
* linked remote registries
*/
trait InlifeActorRefsDistribution extends RegistryActor{
override def specificMessageHandler = {
case ActorRegistered(actor) =>
log.debug("Actor [%s] is registered" format(actor))
registerOnLinks(actor)
class SimpleTypedBalancer[T](implicit manifest: Manifest[T]) extends Actor{
def receive = {
case message :AnyRef =>
forwardMessage(message,
self.getSenderFuture orElse self.getSender,
idleWorkerId)
}
def idleWorkerId = Futures.awaitOne{
RemoteNode.start
log.info("Starting registry actor at %s:%s" format(RemoteServer.HOSTNAME, RemoteServer.PORT))
val registryActor = actorOf(new RegistryActor
with StartupActorRefsDistribution
with InlifeActorRefsDistribution).start
RegistryActorUtil.initialize
(1 to 3).foreach(_ => actorOf[SimpleActor].start)
doBeforeSpec{
(1 to 3).foreach(_ => actorOf[SimpleActor].start)
actorOf(new SimpleTypedBalancer[SimpleActor]).start
}
"Messages sent to the balancer should be distributed across local and remote workers" in {
val balancer = ActorRegistry.filter(actor =>
Class.forName(actor.actorClassName)
.isAssignableFrom(classOf[SimpleTypedBalancer[SimpleActor]])
[INFO] [2010-11-23 23:23:18,687] [Thread-18] s.s.a.a.Actor$: Adding RegistryActor as listener to local actor registry
[INFO] [2010-11-23 23:23:18,703] [Thread-18] s.s.a.a.Actor$: Making RegistryActor remote...
[INFO] [2010-11-23 23:23:18,703] [Thread-18] s.s.a.a.Actor$: Adding link to a neighbouring host...
[INFO] [2010-11-23 23:23:18,859] [Thread-18] s.s.a.a.Actor$: ========SENDING MESSAGES TO BALANCER=========
[INFO] [2010-11-23 23:23:18,890] [Thread-18] s.s.a.a.Actor$: All messages are disaptched...
[INFO] [2010-11-23 23:23:19,656] [akka:event-driven:dispatcher:global-1] s.s.a.r.RemoteClient: Starting remote client co
nnection to [localhost:9999]
[INFO] [2010-11-23 23:23:24,906] [Thread-18] s.s.a.a.Actor$: Process time by 6 workers: 6031
[INFO] [2010-11-23 23:23:24,968] [Thread-18] s.s.a.a.Actor$: ====SHUTTING DOWN ACTOR REGISTRY====
[info] + Messages sent to the balancer should be distributed across local and remote workers
class RegistryActor extends Actor{
...
def defaultMessageHandler: PartialFunction[Any, Unit] = {
case RegisterActor(actor) =>
log.debug("Registering remote actor [%s]" format(actor))
if(!isActorInRegistry(actor.uuid) && !isLinkToLocal(actor))
ActorRegistry.register( // Hack for 0.10, 1.0-M1
RemoteClient.actorFor(actor.uuid.toString, actor.className, actor.hostname, actor.port)
/**
* RegistryActors located on the other hosts
*/
protected[easyscale] val linkedRegistries = new ConcurrentHashMap[RegistryLink, ActorRef]()
def defaultMessageHandler: PartialFunction[Any, Unit] = {
...
case AddRegistryLink(link) =>
scala> object Outer {
|
| object Inner {
| val B = "B"
| }
| import Inner._
| class Inner {
| def b = B
| }
| }
//////////////// Scalaz Validation
sealed trait Name extends NewType[String]
object Name {
def apply(s: String): Validation[String, Name] = if (s.headOption.exists(_.isUpper))
(new Name {val value = s}).success
else
"Name must start with a capital letter".fail
}
@remeniuk
remeniuk / gist:723428
Created December 1, 2010 12:46
Container manager dispatchers
val dispatcher = new ExecutorBasedEventDrivenDispatcher("name")
with ContainerManagedThreadPoolBuilder
// If you want to use work-stealing dispatcher
val dispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("name")
with ContainerManagedThreadPoolBuilder
// Use named GlassFish thread-pool
dispatcher
.withGlassFishManagedThreadPool("actor-thread-pool")