This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Publishes all registered local actor as a remote ref on all | |
* linked remote registries | |
*/ | |
trait InlifeActorRefsDistribution extends RegistryActor{ | |
override def specificMessageHandler = { | |
case ActorRegistered(actor) => | |
log.debug("Actor [%s] is registered" format(actor)) | |
registerOnLinks(actor) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SimpleTypedBalancer[T](implicit manifest: Manifest[T]) extends Actor{ | |
def receive = { | |
case message :AnyRef => | |
forwardMessage(message, | |
self.getSenderFuture orElse self.getSender, | |
idleWorkerId) | |
} | |
def idleWorkerId = Futures.awaitOne{ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
RemoteNode.start | |
log.info("Starting registry actor at %s:%s" format(RemoteServer.HOSTNAME, RemoteServer.PORT)) | |
val registryActor = actorOf(new RegistryActor | |
with StartupActorRefsDistribution | |
with InlifeActorRefsDistribution).start | |
RegistryActorUtil.initialize | |
(1 to 3).foreach(_ => actorOf[SimpleActor].start) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
doBeforeSpec{ | |
(1 to 3).foreach(_ => actorOf[SimpleActor].start) | |
actorOf(new SimpleTypedBalancer[SimpleActor]).start | |
} | |
"Messages sent to the balancer should be distributed across local and remote workers" in { | |
val balancer = ActorRegistry.filter(actor => | |
Class.forName(actor.actorClassName) | |
.isAssignableFrom(classOf[SimpleTypedBalancer[SimpleActor]]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[INFO] [2010-11-23 23:23:18,687] [Thread-18] s.s.a.a.Actor$: Adding RegistryActor as listener to local actor registry | |
[INFO] [2010-11-23 23:23:18,703] [Thread-18] s.s.a.a.Actor$: Making RegistryActor remote... | |
[INFO] [2010-11-23 23:23:18,703] [Thread-18] s.s.a.a.Actor$: Adding link to a neighbouring host... | |
[INFO] [2010-11-23 23:23:18,859] [Thread-18] s.s.a.a.Actor$: ========SENDING MESSAGES TO BALANCER========= | |
[INFO] [2010-11-23 23:23:18,890] [Thread-18] s.s.a.a.Actor$: All messages are disaptched... | |
[INFO] [2010-11-23 23:23:19,656] [akka:event-driven:dispatcher:global-1] s.s.a.r.RemoteClient: Starting remote client co | |
nnection to [localhost:9999] | |
[INFO] [2010-11-23 23:23:24,906] [Thread-18] s.s.a.a.Actor$: Process time by 6 workers: 6031 | |
[INFO] [2010-11-23 23:23:24,968] [Thread-18] s.s.a.a.Actor$: ====SHUTTING DOWN ACTOR REGISTRY==== | |
[info] + Messages sent to the balancer should be distributed across local and remote workers |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class RegistryActor extends Actor{ | |
... | |
def defaultMessageHandler: PartialFunction[Any, Unit] = { | |
case RegisterActor(actor) => | |
log.debug("Registering remote actor [%s]" format(actor)) | |
if(!isActorInRegistry(actor.uuid) && !isLinkToLocal(actor)) | |
ActorRegistry.register( // Hack for 0.10, 1.0-M1 | |
RemoteClient.actorFor(actor.uuid.toString, actor.className, actor.hostname, actor.port) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* RegistryActors located on the other hosts | |
*/ | |
protected[easyscale] val linkedRegistries = new ConcurrentHashMap[RegistryLink, ActorRef]() | |
def defaultMessageHandler: PartialFunction[Any, Unit] = { | |
... | |
case AddRegistryLink(link) => |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scala> object Outer { | |
| | |
| object Inner { | |
| val B = "B" | |
| } | |
| import Inner._ | |
| class Inner { | |
| def b = B | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////// Scalaz Validation | |
sealed trait Name extends NewType[String] | |
object Name { | |
def apply(s: String): Validation[String, Name] = if (s.headOption.exists(_.isUpper)) | |
(new Name {val value = s}).success | |
else | |
"Name must start with a capital letter".fail | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val dispatcher = new ExecutorBasedEventDrivenDispatcher("name") | |
with ContainerManagedThreadPoolBuilder | |
// If you want to use work-stealing dispatcher | |
val dispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("name") | |
with ContainerManagedThreadPoolBuilder | |
// Use named GlassFish thread-pool | |
dispatcher | |
.withGlassFishManagedThreadPool("actor-thread-pool") |