Created
September 27, 2013 08:44
-
-
Save remeniuk/6725792 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.viaden.crm.server | |
import java.net.InetSocketAddress | |
import akka.actor._ | |
import akka.io._ | |
import akka.io.Tcp._ | |
import akka.io.IO | |
import akka.io.Tcp.Connected | |
import akka.actor.Terminated | |
import akka.io.Tcp.Bind | |
import akka.io.Tcp.Bound | |
import akka.actor.SupervisorStrategy.Restart | |
import com.viaden.crm.config.Configuration | |
import scala.concurrent.{ExecutionContext, Future} | |
import akka.dispatch.MessageDispatcher | |
import java.util.concurrent.{Callable, ThreadPoolExecutor, ExecutorService} | |
import akka.dispatch.Futures.future; | |
/** | |
* User: Maxim Korolyov | |
*/ | |
class SslProtobufEndpoint(local: InetSocketAddress) extends Actor with ActorLogging with SslConfiguration { | |
override val supervisorStrategy = | |
OneForOneStrategy() { | |
case _ => Restart | |
} | |
implicit def system = context.system | |
IO(Tcp) ! Bind(self, local) | |
log.debug(s"BindTCP connection to ${local}") | |
def receive: Receive = { | |
case _: Bound ⇒ | |
log.debug("Successfully bound TCP connection") | |
context.become(bound(sender)) | |
} | |
def bound(listener: ActorRef): Receive = { | |
case Connected(remote, _) ⇒ | |
val dispatcher = system.dispatchers.lookup(Configuration.SSL_INIT_THREAD_POOL_DISPATCHER) | |
val f = Future { | |
val init = TcpPipelineHandler.withLogger(log, | |
new TcpReadWriteAdapter >> | |
new SslTlsSupport(sslEngine(remote, client = false)) >> | |
new BackpressureBuffer(lowBytes = 100, highBytes = 1000, maxBytes = 1000000)) | |
val connection = sender | |
val handler = context.actorOf(Props(new ProtobufRequestHandler(init, connection)). | |
withDispatcher(Configuration.PROTOBUF_HANDLER_DISPATCHER)) | |
val pipeline = context.actorOf(TcpPipelineHandler.props(init, connection, handler)) | |
connection ! Tcp.Register(pipeline) | |
}(dispatcher) | |
case _: Terminated ⇒ | |
listener ! Unbind | |
context.become { | |
case Unbound ⇒ context stop self | |
} | |
} | |
def sslEngine(address: InetSocketAddress, client: Boolean) = { | |
val engine = sslContext.createSSLEngine(address.getHostName, address.getPort) | |
engine.setUseClientMode(client) | |
engine | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment