Last active
August 29, 2015 14:09
-
-
Save strobe/aa7f52eaf5b64d9cab73 to your computer and use it in GitHub Desktop.
Akka Clustering issue with per node communication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
akka { | |
loglevel = INFO | |
log-dead-letters = off | |
log-dead-letters-during-shutdown = off | |
# /// Clusterring. /// # | |
# For clustering the following configuration needs to be enabled | |
actor { | |
provider = "akka.cluster.ClusterActorRefProvider" | |
# //#config-router-lookup | |
deployment { | |
/spray-service/routerActor/workerRouter { | |
router = round-robin-group | |
nr-of-instances = 10 | |
routees.paths = ["/user/backend"] | |
cluster { | |
enabled = on | |
allow-local-routees = on | |
} | |
} | |
} | |
# //#config-router-lookup | |
} | |
remote { | |
log-remote-lifecycle-events = off | |
netty.tcp { | |
hostname = "127.0.0.1" | |
port = 0 | |
} | |
} | |
cluster { | |
# list of seed nodes which may be started on same JVM (just few Akka Systems) | |
# and on different JVMs (as well on different nodes) | |
seed-nodes = [ | |
"akka.tcp://[email protected]:2551", | |
"akka.tcp://[email protected]:2552"] | |
# this setting is used for case when nodes is fail, | |
# after that timeout node will be removed from cluster | |
auto-down-unreachable-after = 10s | |
} | |
# remote.netty.tcp.maximum-frame-size = 384000b | |
# /// end /// # | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object BackendApp { | |
def main(args: Array[String]): Unit = { | |
// Override the configuration of the port when specified as program argument | |
val port = if (args.isEmpty) "0" else args(0) | |
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port"). | |
withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")). | |
withFallback(ConfigFactory.load()) | |
val system = ActorSystem("ClusterSystem", config) | |
system.actorOf(Props[BackendWorkerActor], name = "backend") | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BackendsRouterActor extends Actor with ActorLogging { | |
// router which defined in config | |
val workersRouter = context.actorOf(FromConfig.props(Props.empty), name = "workerRouter") | |
implicit def executionContext: ExecutionContextExecutor = context.dispatcher | |
def receive = { | |
case ToRouter(message: BackendMessage) => { | |
workersRouter ! ToBackEnd(message, sender()) | |
} | |
case BackToRouter(message: FrontendMessage, origin: ActorRef) => { | |
log.info("BackToRouter recevied to BackendsRouterActor ") | |
origin ! BackToOrigin(message) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BackendWorkerActor extends Actor with ActorLogging { | |
val cluster = Cluster(context.system) | |
implicit def executionContext: ExecutionContextExecutor = context.dispatcher | |
def receive = { | |
case ToBackEnd(message: BackendMessage, origin: ActorRef) => { | |
message match { | |
case TestAsk(ctx: RequestContext) => { | |
log.info("ToBackEndTestAskResponse recevied to BackendWorkerActor ") | |
sender ! BackToRouter(TestAskResponse("job result"), origin) | |
} | |
case SimpleStringStream(ctx: RequestContext) => { | |
val stream: Stream[String] = new Streamer(context).simpleStringStream | |
sender ! BackToRouter(SimpleStringStreamResponse(stream), origin) | |
} | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ClusterApp { | |
def main(args: Array[String]): Unit = { | |
FrontendSprayApp.main(Seq("2551").toArray) | |
BackendApp.main(Seq("2552").toArray) | |
BackendApp.main(Array.empty) | |
BackendApp.main(Array.empty) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object FrontendSprayApp { | |
def main(args: Array[String]): Unit = { | |
// Override the configuration of the port when specified as program argument | |
val port = if (args.isEmpty) "0" else args(0) | |
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port"). | |
withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")). | |
withFallback(ConfigFactory.load()) | |
implicit val system = ActorSystem("ClusterSystem", config) | |
// server actor initialization | |
val service = system.actorOf(Props[FrontendServerActor], "spray-service") | |
// http initialization | |
IO(Http) ! Http.Bind(service, interface = "localhost", port = 8050) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FrontendServerActor extends Actor with ChunksTestService with ActorLogging { | |
def actorRefFactory = context | |
def receive = { | |
runRoute(apiRoute) | |
} | |
} | |
trait ChunksTestService extends HttpService { | |
// These implicit values allow us to use futures | |
// in this trait. | |
implicit def executionContext = actorRefFactory.dispatcher | |
implicit val timeout = Timeout(5.seconds) | |
// Our worker Actor handles the work of the request. | |
val router = actorRefFactory.actorOf(Props[BackendsRouterActor], "routerActor") | |
val apiRoute = | |
path("ping") { | |
get { | |
complete("PONG") | |
} | |
} ~ | |
path("html_stream") { | |
get { | |
respondWithMediaType(`text/html`) { | |
ctx => doSimpleStringStream(ctx) | |
} | |
} | |
} | |
def doTestAskResponse(ctx: RequestContext) = { | |
actorRefFactory.actorOf { | |
Props { | |
new Actor with ActorLogging { | |
// To set an initial possible delay | |
context.setReceiveTimeout(600 millis) | |
override def preStart = { | |
router ! ToRouter(TestAsk(ctx)) | |
} | |
def receive = { | |
case BackToOrigin(message: FrontendMessage) => | |
message match { | |
case TestAskResponse(text: String) => | |
ctx.responder ! HttpResponse(StatusCodes.OK, text) | |
} | |
case ReceiveTimeout => | |
// To turn it off | |
ctx.responder ! HttpResponse(StatusCodes.NetworkConnectTimeout, | |
"backend worker doesn't produced result in expected time!") | |
} | |
} | |
} | |
} | |
} | |
def doSimpleStringStream(ctx: RequestContext) = { | |
actorRefFactory.actorOf { | |
Props { | |
new Actor with ActorLogging { | |
override def preStart = { | |
router ! ToRouter(SimpleStringStream(ctx)) | |
} | |
def receive = { | |
case BackToOrigin(message: FrontendMessage) => | |
message match { | |
case SimpleStringStreamResponse(stream: Stream[String]) => | |
ctx.responder ! complete(stream) | |
} | |
case ReceiveTimeout => | |
ctx.responder ! HttpResponse(StatusCodes.NetworkConnectTimeout, | |
"backend worker doesn't produced result in expected time!") | |
} | |
} | |
} | |
} | |
} | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// possible messages types | |
object BackendMessages { | |
// | |
case class ToRouter(message: BackendMessage) | |
case class ToBackEnd(message: BackendMessage, originRef: ActorRef) | |
// | |
case class BackToRouter(message: FrontendMessage, originRef: ActorRef) | |
case class BackToOrigin(message: FrontendMessage) | |
// | |
} | |
// messages which may send to backend | |
abstract class BackendMessage | |
case class TestAsk(ctx: RequestContext) extends BackendMessage | |
case class SimpleStringStream(ctx: RequestContext) extends BackendMessage | |
// messages which may send to frontend | |
abstract class FrontendMessage | |
case class TestAskResponse(text: String) extends FrontendMessage | |
case class SimpleStringStreamResponse(stream: Stream[String]) extends FrontendMessage |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Streamer(actorRefFactory: ActorContext) (implicit ec: ExecutionContext) { | |
lazy val streamStart = " " * 2048 + "<html><body><h2>A streaming response</h2><p>(for 15 seconds)<ul>" | |
lazy val streamEnd = "</ul><p>Finished.</p></body></html>" | |
def simpleStringStream: Stream[String] = { | |
val secondStream = Stream.continually { | |
// CAUTION: we block here to delay the stream generation for you to | |
// be able to follow it in your browser, | |
// this is only done for the purpose of this demo, blocking in actor | |
// code should otherwise be avoided | |
// Thread.sleep(500) | |
"<li>" + DateTime.now.toIsoDateTimeString + "</li>" | |
} | |
streamStart #:: secondStream.take(15) #::: streamEnd #:: Stream.empty | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
GET to "/ask_test" returns "job result"
&
GET to "/html_stream" produced:
java.io.NotSerializableException: Streamer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747)
at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:722)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)