Skip to content

Instantly share code, notes, and snippets.

@TanUkkii007
Last active September 15, 2015 09:26
Show Gist options
  • Save TanUkkii007/0bbef0d32c4a55a80e19 to your computer and use it in GitHub Desktop.
Save TanUkkii007/0bbef0d32c4a55a80e19 to your computer and use it in GitHub Desktop.
akka.ConfigurationException: configuration problem while creating [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._
import akka.actor._
import akka.io.{IO, Tcp}
import akka.routing.{BalancingPool}
import akka.stream.actor.{ActorPublisher, ActorSubscriberMessage, WatermarkRequestStrategy, ActorSubscriber}
import akka.util.ByteString
import scala.concurrent.Promise
object TcpClientActorProtocol {
case object StartConnection
case class Send(data: ByteString, promise: Promise[Unit])
case class Receive(data: ByteString)
case class ReSend(command: Send)
}
object TcpClientActor {
def props(remoteAddress: InetSocketAddress, publisher: ActorRef) = Props(new TcpClientActor(remoteAddress, publisher))
}
class TcpClientActor(remoteAddress: InetSocketAddress, publisher: ActorRef) extends Actor with ActorLogging {
import TcpClientActorProtocol._
import Tcp._
import context.system
case class Ack(promise: Promise[Unit]) extends Event
def receive = waiting
def waiting: Receive = {
case StartConnection => {
IO(Tcp) ! Connect(remoteAddress)
}
case Connected(remote, local) => {
val connection = sender()
connection ! Register(self, keepOpenOnPeerClosed = true)
context.become(connected(connection))
log.info("connected")
}
case send: Send => {
log.warning("connection is not established yet so retry.")
publisher ! ReSend(send)
}
}
def connected(connection: ActorRef): Receive = {
case Send(data, promise) => connection ! Write(data, Ack(promise))
case Ack(promise) => promise.success(())
case Received(data) => publisher ! Receive(data)
case CommandFailed(Write(data, ack: Ack)) => publisher ! ReSend(Send(data, ack.promise))
case _: ConnectionClosed => destroy()
}
def destroy() = context.stop(self)
override def preStart() = {
super.preStart()
self ! StartConnection
log.info("starting TCP connection")
}
}
trait ConnectionPoolRouter {
val maxConnection: Int
def createTcpClientRouter(remoteAddress: InetSocketAddress, publisher: ActorRef) =
BalancingPool(maxConnection)
.withSupervisorStrategy(SupervisorStrategy.defaultStrategy).props(TcpClientActor.props(remoteAddress, publisher))
}
object TcpStreamActor {
def props(remoteAddress: InetSocketAddress, highWatermark: Int, maxConnection: Int) = Props(new TcpStreamActor(remoteAddress, highWatermark, maxConnection))
}
class TcpStreamActor(remoteAddress: InetSocketAddress, highWatermark: Int, val maxConnection: Int) extends ActorPublisher[ByteString]
with ActorSubscriber with ConnectionPoolRouter {
import TcpClientActorProtocol._
import ActorSubscriberMessage._
override def requestStrategy = WatermarkRequestStrategy(highWatermark)
val tcpClient = context.actorOf(createTcpClientRouter(remoteAddress, self), "tcp-client-router")
def receive: Receive = {
case OnNext(send: Send) => tcpClient ! send
case ReSend(command) => tcpClient ! command
case Receive(data) => onNext(data)
}
}
//-------------------------------------------------test--------------------------------------------//
import java.net.InetSocketAddress
import akka.actor._
import akka.io.{ IO, Tcp }
import akka.util.ByteString
class TcpServerTestSetup(system: ActorSystem, target: ActorRef) {
val serverActor = system.actorOf(TcpServer.props(target))
}
object TcpServer {
def props(probe: ActorRef) = Props(new TcpServer(probe))
}
class TcpServer(probe: ActorRef) extends Actor with ActorLogging {
import Tcp._
import context.system
IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))
def receive = waiting
def waiting: Receive = {
case b @ Bound(localAddress) => {
log.info("listening on port {}", localAddress.getPort)
probe ! b
}
case CommandFailed(_: Bind) => context stop self
case c@Connected(remote, local) => {
log.info("connection established with {}", remote)
probe ! c
val connection = sender()
val handler = context.actorOf(TcpServerHandler.props(connection, probe))
probe ! Register(handler)
connection ! Register(handler)
}
}
}
object TcpServerHandler {
case class TestSend(data: ByteString)
def props(connection: ActorRef, probe: ActorRef) = Props(new TcpServerHandler(connection, probe))
}
class TcpServerHandler(connection: ActorRef, probe: ActorRef) extends Actor with ActorLogging {
import Tcp._
import TcpServerHandler._
def receive: Receive = {
case r: Received => probe ! r
case p@PeerClosed => {
log.info("connection closed")
probe ! p
context stop self
}
case c@Close => {
connection ! c
log.info("closing connection")
}
case TestSend(data) => connection ! Write(data)
case Closed => log.info("connection closed")
case other => log.warning("unknown message {}", other)
}
}
import akka.actor.{PoisonPill, ActorSystem}
import akka.io.Tcp
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.{TestKit}
import akka.stream.scaladsl.{Keep, Sink}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import org.scalatest.{MustMatchers, WordSpecLike}
import skarn.{TcpServerTestSetup, StopSystemAfterAllWithAwaitTermination}
import scala.concurrent.Promise
class TcpStreamActorITest extends TestKit(ActorSystem("TcpStreamActorITest", ConfigFactory.empty()))
with WordSpecLike with MustMatchers with StopSystemAfterAllWithAwaitTermination { testSelf =>
"TcpStreamActor" must {
import Tcp._
import TcpClientActorProtocol._
"send a request with stream" in {
new TcpServerTestSetup(system, testActor) {
implicit val m = ActorMaterializer()
val connection = expectMsgClass(classOf[Bound])
val clientSink = Sink.actorSubscriber(TcpStreamActor.props(connection.localAddress, 10, 1))
val probe = TestSource.probe[Send].toMat(clientSink)(Keep.left).run()
expectMsgClass(classOf[Connected])
val handler = expectMsgClass(classOf[Register]).handler
1 to 100 foreach { _ =>
val p = Promise[Unit]
probe.sendNext(Send(ByteString("abcde"), p))
expectMsg(Received(ByteString("abcde")))
}
serverActor ! PoisonPill
}
}
}
}
@TanUkkii007
Copy link
Author

[INFO] [09/15/2015 17:57:24.552] [TcpStreamActorITest-akka.actor.default-dispatcher-2] [akka://TcpStreamActorITest/user/$a] listening on port 64665
[ERROR] [09/15/2015 17:57:24.624] [TcpStreamActorITest-akka.actor.default-dispatcher-3] [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink] received Supervise from unregistered child Actor[akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router#3824265], this will not end well
[ERROR] [09/15/2015 17:57:24.630] [TcpStreamActorITest-akka.actor.default-dispatcher-2] [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink] configuration problem while creating [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:166)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.ConfigurationException: configuration problem while creating [akka://TcpStreamActorITest/user/$b/flow-1-1-actorSubscriberSink/tcp-client-router] with router dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox] and routee dispatcher [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:753)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:206)
at akka.actor.dungeon.Children$class.actorOf(Children.scala:37)
at akka.actor.ActorCell.actorOf(ActorCell.scala:369)
at skarn.apns.TcpStreamActor.(TcpStreamActor.scala:138)
at skarn.apns.TcpStreamActor$$anonfun$props$4.apply(TcpStreamActor.scala:128)
at skarn.apns.TcpStreamActor$$anonfun$props$4.apply(TcpStreamActor.scala:128)
at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:346)
at akka.actor.Props.newActor(Props.scala:255)
at akka.actor.ActorCell.newActor(ActorCell.scala:552)
at akka.actor.ActorCell.create(ActorCell.scala:578)
... 9 more
Caused by: com.typesafe.config.ConfigException$BadPath: path parameter: Invalid path 'BalancingPool-/$b/flow-1-1-actorSubscriberSink/tcp-client-router': Token not allowed in path expression: 'b' ('$' not followed by {, 'b' not allowed after '$') (you can double-quote this token if you really want it here)
at com.typesafe.config.impl.Parser.parsePathExpression(Parser.java:1095)
at com.typesafe.config.impl.Parser.parsePath(Parser.java:1135)
at com.typesafe.config.impl.Path.newPath(Path.java:224)
at com.typesafe.config.impl.SimpleConfig.hasPath(SimpleConfig.java:80)
at akka.dispatch.CachingConfig.hasPath(CachingConfig.scala:97)
at akka.dispatch.Dispatchers.hasDispatcher(Dispatchers.scala:89)
at akka.routing.BalancingPool.newRoutee(Balancing.scala:106)
at akka.routing.RoutedActorCell$$anonfun$start$3.apply(RoutedActorCell.scala:116)
at akka.routing.RoutedActorCell$$anonfun$start$3.apply(RoutedActorCell.scala:116)
at scala.collection.generic.GenTraversableFactory.fill(GenTraversableFactory.scala:90)
at akka.routing.RoutedActorCell.start(RoutedActorCell.scala:116)
at akka.routing.RoutedActorCell.start(RoutedActorCell.scala:41)
at akka.actor.RepointableActorRef.point(RepointableActorRef.scala:105)
at akka.actor.RepointableActorRef.initialize(RepointableActorRef.scala:82)
at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:751)
... 19 more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment