Skip to content

Instantly share code, notes, and snippets.

@arturaz
Created October 18, 2013 07:08
Show Gist options
  • Save arturaz/7037599 to your computer and use it in GitHub Desktop.
Save arturaz/7037599 to your computer and use it in GitHub Desktop.
Akka IO TCP connection handler using pipelines
package com.tinylabproductions.quazibuild.server.actor.connection
import akka.actor.{Props, ActorRef, Actor}
import akka.io.Tcp._
import java.net.InetSocketAddress
import akka.util.ByteString
import com.tinylabproductions.quazibuild.server.messaging._
import com.tinylabproductions.quazibuild.server.actor.Logging
import com.tinylabproductions.quazibuild.messaging.Messages.{S2C, C2S}
import akka.io._
import akka.event.LoggingReceive
object Handler {
/** Frame size header length in bytes **/
private[this] val frameSizeHeaderLength = 4
/** Maximum size of frame in bytes **/
private[this] val maxFrame = 1024 * 256 /* 256kb */
private val Stages = new LengthFieldFrame(
maxFrame, headerSize = frameSizeHeaderLength, lengthIncludesHeader = false
)
}
class Handler(
connection: ActorRef, remote: InetSocketAddress, gameManager: ActorRef
) extends Actor with Logging {
import Handler._
private[this] val pipelineCtx = new HasActorContext {
def getContext = context
}
private[this] val PipelinePorts(cmd, evt, _) =
PipelineFactory.buildFunctionTriple(pipelineCtx, Stages)
private[this] val messageHandler = context.actorOf(
Props(classOf[C2SMessageHandler], gameManager),
s"message-handler-${remote.getHostString}-${remote.getPort}"
)
def receive = LoggingReceive {
case Received(chunk) =>
val (events, _) = evt(chunk)
events.foreach { event =>
log.debug(s"Handling ${event.size} bytes as event from $remote.")
handleMessage(C2S.parseFrom(event.toArray))
}
case s2c: S2C =>
writeS2C(s2c)
case sm: S2CMessage =>
writeS2C(sm.forSending)
case msg: ConnectionClosed =>
log.debug(s"Connection with $remote was closed: $msg")
messageHandler ! msg
}
private[this] def writeS2C(s2c: S2C) {
val (_, commands) = cmd(ByteString(s2c.toByteArray))
commands.foreach { command =>
log.debug(s"Sending ${command.size} bytes to $remote.")
connection ! Write(command)
}
}
private[this] def handleMessage(c2s: C2S) {
log.debug(s"Handling message: $c2s")
messageHandler ! c2s.asScala
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment