Skip to content

Instantly share code, notes, and snippets.

@crmaxx
Created January 28, 2015 07:48
Show Gist options
  • Save crmaxx/24926409b7ee67f60f8a to your computer and use it in GitHub Desktop.
Save crmaxx/24926409b7ee67f60f8a to your computer and use it in GitHub Desktop.
Как готовить Akka.IO от Шниперсона Александра
import akka.actor.{Props, ActorLogging, Actor, ActorRef}
import akka.io.Tcp
import ProtoMessages.MessageRequestBase.MessageRequest
import ProtoMessages.MessageResponseBase.MessageResponse
import net.orionlab.brr.utils.CoreSystem
import net.orionlab.brr.CommunicationMessage.BinarySerializer
import akka.util.ByteString
case class IncomingMessage(message: MessageRequest)
case class OutgoingMessage(message: MessageResponse)
class SocketClientActor(connection: ActorRef) extends Actor with ActorLogging {
import Tcp._
val actorId = System.nanoTime()
val frameBuilder = context.actorOf(Props(classOf[FrameBuilderActor], 256))
var userActor: Option[ActorRef] = None
CoreSystem.supervisor.foreach(_ ! ClientConnected(self, actorId))
def receive = {
case msg: ClientConnected =>
if (msg.actor == self && msg.actorId == actorId) {
log.info(s"ClientConnected '${sender()}'")
userActor = Some(sender())
}
case Received(data) => frameBuilder ! BuildFrame(data)
case CompleteMessage(data) =>
userActor match {
case None => log.error(s"Cant send CompleteMessage to empty UserActor.")
case Some(actor) =>
BinarySerializer.Deserialize(data.toArray) match {
case None => log.info(s"Cant send empty message to UserActor.")
case Some(message) =>
// log.info("IncomingMessage sent to UserActor.")
actor ! IncomingMessage(message)
}
}
case OutgoingMessage(message) =>
// log.info(s"Received OutgoingMessage, send it to $connection")
connection ! Write(ByteString(BinarySerializer.Serialize(message)))
case PeerClosed =>
CoreSystem.supervisor.foreach(_ ! ClientDisconnected(self, actorId))
context.stop(self)
case any => log.info(s"Unhandled event $any")
}
}
case class BuildFrame(data: ByteString)
case class CompleteMessage(data: ByteString)
class FrameBuilderActor(bufferSize: Int) extends Actor with ActorLogging {
private val headerSize = 4
private var messageLength = 0
private var messageBuffer = ByteString.empty
def receive = {
case BuildFrame(data) => parse(data)
}
private def parse(data: ByteString) {
if (data.isEmpty) {
messageLength = 0
messageBuffer = ByteString.empty
} else {
if (messageLength == 0) {
messageLength = data.iterator.getLongPart(headerSize)(java.nio.ByteOrder.LITTLE_ENDIAN).toInt
val newData = data.drop(headerSize)
// log.info(s"Message(${data.map("%02X" format _).mkString(" ")}) MessageLength(${messageLength}) dataLenBefore(${data.length}) dataLenAfter(${newData.length})")
messageBuffer = ByteString.empty
parse(newData)
} else {
val canTakeLen = if (data.length <= bufferSize) data.length else bufferSize
val needTakeLen = if (messageBuffer.length + canTakeLen > messageLength) messageBuffer.length + canTakeLen - messageLength else canTakeLen
messageBuffer ++= data.take(needTakeLen)
if (messageBuffer.length == messageLength) {
// log.info(s"messageBuffer(${messageBuffer.map("%02X" format _).mkString(" ")})")
context.parent ! CompleteMessage(messageBuffer)
messageLength = 0
}
// MessageLength(22) dataLenBefore(46) dataLenAfter(42)]]
// cantTakeLen(42) needTakeLen(20) messageBufferLen(20) dataLenBefore(42) dataLenAfter(22)
val newData = data.drop(needTakeLen)
// log.info(s"cantTakeLen(${canTakeLen}) needTakeLen(${needTakeLen}) messageBufferLen(${messageBuffer.length}) dataLenBefore(${data.length}) dataLenAfter(${newData.length}))")
parse(newData)
}
}
}
}
import akka.actor.{ActorLogging, Actor, Props}
import akka.io.{IO, Tcp}
import java.net.InetSocketAddress
class SocketServerActor(serverPort: Int) extends Actor with ActorLogging {
import Tcp._
import context.system
log.info("SocketServerActor created\nTrying to bind socket.")
IO(Tcp) ! Bind(self, new InetSocketAddress(serverPort))
def receive = {
case Bound(localAddress) => log.info(s"Server listening on $localAddress")
case CommandFailed(_: Bind) => context stop self
case Connected(remote, local) =>
val sb = remote.getAddress.getAddress.addString(new StringBuilder(), "tcp-", "_", "x").append(remote.getPort)
val actorName = local.getAddress.getAddress.addString(sb, "-", "_", "x").append(local.getPort).toString()
val connection = sender()
val handler = context.actorOf(Props(classOf[SocketClientActor], connection), actorName)
connection ! Register(handler)
case any => log.info(s">>> Unhandled message '$any'")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment