Last active
September 15, 2015 09:26
-
-
Save TanUkkii007/0bbef0d32c4a55a80e19 to your computer and use it in GitHub Desktop.
akka.ConfigurationException: configuration problem while creating [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.InetSocketAddress | |
import java.util.concurrent.ConcurrentHashMap | |
import scala.collection.JavaConversions._ | |
import akka.actor._ | |
import akka.io.{IO, Tcp} | |
import akka.routing.{BalancingPool} | |
import akka.stream.actor.{ActorPublisher, ActorSubscriberMessage, WatermarkRequestStrategy, ActorSubscriber} | |
import akka.util.ByteString | |
import scala.concurrent.Promise | |
object TcpClientActorProtocol { | |
case object StartConnection | |
case class Send(data: ByteString, promise: Promise[Unit]) | |
case class Receive(data: ByteString) | |
case class ReSend(command: Send) | |
} | |
object TcpClientActor { | |
def props(remoteAddress: InetSocketAddress, publisher: ActorRef) = Props(new TcpClientActor(remoteAddress, publisher)) | |
} | |
class TcpClientActor(remoteAddress: InetSocketAddress, publisher: ActorRef) extends Actor with ActorLogging { | |
import TcpClientActorProtocol._ | |
import Tcp._ | |
import context.system | |
case class Ack(promise: Promise[Unit]) extends Event | |
def receive = waiting | |
def waiting: Receive = { | |
case StartConnection => { | |
IO(Tcp) ! Connect(remoteAddress) | |
} | |
case Connected(remote, local) => { | |
val connection = sender() | |
connection ! Register(self, keepOpenOnPeerClosed = true) | |
context.become(connected(connection)) | |
log.info("connected") | |
} | |
case send: Send => { | |
log.warning("connection is not established yet so retry.") | |
publisher ! ReSend(send) | |
} | |
} | |
def connected(connection: ActorRef): Receive = { | |
case Send(data, promise) => connection ! Write(data, Ack(promise)) | |
case Ack(promise) => promise.success(()) | |
case Received(data) => publisher ! Receive(data) | |
case CommandFailed(Write(data, ack: Ack)) => publisher ! ReSend(Send(data, ack.promise)) | |
case _: ConnectionClosed => destroy() | |
} | |
def destroy() = context.stop(self) | |
override def preStart() = { | |
super.preStart() | |
self ! StartConnection | |
log.info("starting TCP connection") | |
} | |
} | |
trait ConnectionPoolRouter { | |
val maxConnection: Int | |
def createTcpClientRouter(remoteAddress: InetSocketAddress, publisher: ActorRef) = | |
BalancingPool(maxConnection) | |
.withSupervisorStrategy(SupervisorStrategy.defaultStrategy).props(TcpClientActor.props(remoteAddress, publisher)) | |
} | |
object TcpStreamActor { | |
def props(remoteAddress: InetSocketAddress, highWatermark: Int, maxConnection: Int) = Props(new TcpStreamActor(remoteAddress, highWatermark, maxConnection)) | |
} | |
class TcpStreamActor(remoteAddress: InetSocketAddress, highWatermark: Int, val maxConnection: Int) extends ActorPublisher[ByteString] | |
with ActorSubscriber with ConnectionPoolRouter { | |
import TcpClientActorProtocol._ | |
import ActorSubscriberMessage._ | |
override def requestStrategy = WatermarkRequestStrategy(highWatermark) | |
val tcpClient = context.actorOf(createTcpClientRouter(remoteAddress, self), "tcp-client-router") | |
def receive: Receive = { | |
case OnNext(send: Send) => tcpClient ! send | |
case ReSend(command) => tcpClient ! command | |
case Receive(data) => onNext(data) | |
} | |
} | |
//-------------------------------------------------test--------------------------------------------// | |
import java.net.InetSocketAddress | |
import akka.actor._ | |
import akka.io.{ IO, Tcp } | |
import akka.util.ByteString | |
class TcpServerTestSetup(system: ActorSystem, target: ActorRef) { | |
val serverActor = system.actorOf(TcpServer.props(target)) | |
} | |
object TcpServer { | |
def props(probe: ActorRef) = Props(new TcpServer(probe)) | |
} | |
class TcpServer(probe: ActorRef) extends Actor with ActorLogging { | |
import Tcp._ | |
import context.system | |
IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0)) | |
def receive = waiting | |
def waiting: Receive = { | |
case b @ Bound(localAddress) => { | |
log.info("listening on port {}", localAddress.getPort) | |
probe ! b | |
} | |
case CommandFailed(_: Bind) => context stop self | |
case c@Connected(remote, local) => { | |
log.info("connection established with {}", remote) | |
probe ! c | |
val connection = sender() | |
val handler = context.actorOf(TcpServerHandler.props(connection, probe)) | |
probe ! Register(handler) | |
connection ! Register(handler) | |
} | |
} | |
} | |
object TcpServerHandler { | |
case class TestSend(data: ByteString) | |
def props(connection: ActorRef, probe: ActorRef) = Props(new TcpServerHandler(connection, probe)) | |
} | |
class TcpServerHandler(connection: ActorRef, probe: ActorRef) extends Actor with ActorLogging { | |
import Tcp._ | |
import TcpServerHandler._ | |
def receive: Receive = { | |
case r: Received => probe ! r | |
case p@PeerClosed => { | |
log.info("connection closed") | |
probe ! p | |
context stop self | |
} | |
case c@Close => { | |
connection ! c | |
log.info("closing connection") | |
} | |
case TestSend(data) => connection ! Write(data) | |
case Closed => log.info("connection closed") | |
case other => log.warning("unknown message {}", other) | |
} | |
} | |
import akka.actor.{PoisonPill, ActorSystem} | |
import akka.io.Tcp | |
import akka.stream.ActorMaterializer | |
import akka.stream.testkit.scaladsl.TestSource | |
import akka.testkit.{TestKit} | |
import akka.stream.scaladsl.{Keep, Sink} | |
import akka.util.ByteString | |
import com.typesafe.config.ConfigFactory | |
import org.scalatest.{MustMatchers, WordSpecLike} | |
import skarn.{TcpServerTestSetup, StopSystemAfterAllWithAwaitTermination} | |
import scala.concurrent.Promise | |
class TcpStreamActorITest extends TestKit(ActorSystem("TcpStreamActorITest", ConfigFactory.empty())) | |
with WordSpecLike with MustMatchers with StopSystemAfterAllWithAwaitTermination { testSelf => | |
"TcpStreamActor" must { | |
import Tcp._ | |
import TcpClientActorProtocol._ | |
"send a request with stream" in { | |
new TcpServerTestSetup(system, testActor) { | |
implicit val m = ActorMaterializer() | |
val connection = expectMsgClass(classOf[Bound]) | |
val clientSink = Sink.actorSubscriber(TcpStreamActor.props(connection.localAddress, 10, 1)) | |
val probe = TestSource.probe[Send].toMat(clientSink)(Keep.left).run() | |
expectMsgClass(classOf[Connected]) | |
val handler = expectMsgClass(classOf[Register]).handler | |
1 to 100 foreach { _ => | |
val p = Promise[Unit] | |
probe.sendNext(Send(ByteString("abcde"), p)) | |
expectMsg(Received(ByteString("abcde"))) | |
} | |
serverActor ! PoisonPill | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
[INFO] [09/15/2015 17:57:24.552] [TcpStreamActorITest-akka.actor.default-dispatcher-2] [akka://TcpStreamActorITest/user/$a] listening on port 64665
[ERROR] [09/15/2015 17:57:24.624] [TcpStreamActorITest-akka.actor.default-dispatcher-3] [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink] received Supervise from unregistered child Actor[akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router#3824265], this will not end well
[ERROR] [09/15/2015 17:57:24.630] [TcpStreamActorITest-akka.actor.default-dispatcher-2] [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink] configuration problem while creating [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:753)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:206)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:37)
at akka.actor.ActorCell.actorOf(ActorCell.scala:369)
at skarn.apns.TcpStreamActor.(TcpStreamActor.scala:138)
at skarn.apns.TcpStreamActor$$anonfun$props$4.apply(TcpStreamActor.scala:128)
at skarn.apns.TcpStreamActor$$anonfun$props$4.apply(TcpStreamActor.scala:128)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:346)
at akka.actor.Props.newActor(Props.scala:255)
at akka.actor.ActorCell.newActor(ActorCell.scala:552)
at akka.actor.ActorCell.create(ActorCell.scala:578)
... 9 more
Caused by: com.typesafe.config.ConfigException$BadPath: path parameter: Invalid path 'BalancingPool-/$b/flow-1-1-actorSubscriberSink/tcp-client-router': Token not allowed in path expression: 'b' ('$' not followed by {, 'b' not allowed after '$') (you can double-quote this token if you really want it here)
at com.typesafe.config.impl.Parser.parsePathExpression(Parser.java:1095)
at com.typesafe.config.impl.Parser.parsePath(Parser.java:1135)
at com.typesafe.config.impl.Path.newPath(Path.java:224)
at com.typesafe.config.impl.SimpleConfig.hasPath(SimpleConfig.java:80)
at akka.dispatch.CachingConfig.hasPath(CachingConfig.scala:97)
at akka.dispatch.Dispatchers.hasDispatcher(Dispatchers.scala:89)
at akka.routing.BalancingPool.newRoutee(Balancing.scala:106)
at akka.routing.RoutedActorCell$$anonfun$start$3.apply(RoutedActorCell.scala:116)
at akka.routing.RoutedActorCell$$anonfun$start$3.apply(RoutedActorCell.scala:116)
at scala.collection.generic.GenTraversableFactory.fill(GenTraversableFactory.scala:90)
at akka.routing.RoutedActorCell.start(RoutedActorCell.scala:116)
at akka.routing.RoutedActorCell.start(RoutedActorCell.scala:41)
at akka.actor.RepointableActorRef.point(RepointableActorRef.scala:105)
at akka.actor.RepointableActorRef.initialize(RepointableActorRef.scala:82)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:751)
... 19 more