Created
January 28, 2015 07:48
-
-
Save crmaxx/24926409b7ee67f60f8a to your computer and use it in GitHub Desktop.
Как готовить Akka.IO от Шниперсона Александра
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Props, ActorLogging, Actor, ActorRef} | |
import akka.io.Tcp | |
import ProtoMessages.MessageRequestBase.MessageRequest | |
import ProtoMessages.MessageResponseBase.MessageResponse | |
import net.orionlab.brr.utils.CoreSystem | |
import net.orionlab.brr.CommunicationMessage.BinarySerializer | |
import akka.util.ByteString | |
case class IncomingMessage(message: MessageRequest) | |
case class OutgoingMessage(message: MessageResponse) | |
class SocketClientActor(connection: ActorRef) extends Actor with ActorLogging { | |
import Tcp._ | |
val actorId = System.nanoTime() | |
val frameBuilder = context.actorOf(Props(classOf[FrameBuilderActor], 256)) | |
var userActor: Option[ActorRef] = None | |
CoreSystem.supervisor.foreach(_ ! ClientConnected(self, actorId)) | |
def receive = { | |
case msg: ClientConnected => | |
if (msg.actor == self && msg.actorId == actorId) { | |
log.info(s"ClientConnected '${sender()}'") | |
userActor = Some(sender()) | |
} | |
case Received(data) => frameBuilder ! BuildFrame(data) | |
case CompleteMessage(data) => | |
userActor match { | |
case None => log.error(s"Cant send CompleteMessage to empty UserActor.") | |
case Some(actor) => | |
BinarySerializer.Deserialize(data.toArray) match { | |
case None => log.info(s"Cant send empty message to UserActor.") | |
case Some(message) => | |
// log.info("IncomingMessage sent to UserActor.") | |
actor ! IncomingMessage(message) | |
} | |
} | |
case OutgoingMessage(message) => | |
// log.info(s"Received OutgoingMessage, send it to $connection") | |
connection ! Write(ByteString(BinarySerializer.Serialize(message))) | |
case PeerClosed => | |
CoreSystem.supervisor.foreach(_ ! ClientDisconnected(self, actorId)) | |
context.stop(self) | |
case any => log.info(s"Unhandled event $any") | |
} | |
} | |
case class BuildFrame(data: ByteString) | |
case class CompleteMessage(data: ByteString) | |
class FrameBuilderActor(bufferSize: Int) extends Actor with ActorLogging { | |
private val headerSize = 4 | |
private var messageLength = 0 | |
private var messageBuffer = ByteString.empty | |
def receive = { | |
case BuildFrame(data) => parse(data) | |
} | |
private def parse(data: ByteString) { | |
if (data.isEmpty) { | |
messageLength = 0 | |
messageBuffer = ByteString.empty | |
} else { | |
if (messageLength == 0) { | |
messageLength = data.iterator.getLongPart(headerSize)(java.nio.ByteOrder.LITTLE_ENDIAN).toInt | |
val newData = data.drop(headerSize) | |
// log.info(s"Message(${data.map("%02X" format _).mkString(" ")}) MessageLength(${messageLength}) dataLenBefore(${data.length}) dataLenAfter(${newData.length})") | |
messageBuffer = ByteString.empty | |
parse(newData) | |
} else { | |
val canTakeLen = if (data.length <= bufferSize) data.length else bufferSize | |
val needTakeLen = if (messageBuffer.length + canTakeLen > messageLength) messageBuffer.length + canTakeLen - messageLength else canTakeLen | |
messageBuffer ++= data.take(needTakeLen) | |
if (messageBuffer.length == messageLength) { | |
// log.info(s"messageBuffer(${messageBuffer.map("%02X" format _).mkString(" ")})") | |
context.parent ! CompleteMessage(messageBuffer) | |
messageLength = 0 | |
} | |
// MessageLength(22) dataLenBefore(46) dataLenAfter(42)]] | |
// cantTakeLen(42) needTakeLen(20) messageBufferLen(20) dataLenBefore(42) dataLenAfter(22) | |
val newData = data.drop(needTakeLen) | |
// log.info(s"cantTakeLen(${canTakeLen}) needTakeLen(${needTakeLen}) messageBufferLen(${messageBuffer.length}) dataLenBefore(${data.length}) dataLenAfter(${newData.length}))") | |
parse(newData) | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{ActorLogging, Actor, Props} | |
import akka.io.{IO, Tcp} | |
import java.net.InetSocketAddress | |
class SocketServerActor(serverPort: Int) extends Actor with ActorLogging { | |
import Tcp._ | |
import context.system | |
log.info("SocketServerActor created\nTrying to bind socket.") | |
IO(Tcp) ! Bind(self, new InetSocketAddress(serverPort)) | |
def receive = { | |
case Bound(localAddress) => log.info(s"Server listening on $localAddress") | |
case CommandFailed(_: Bind) => context stop self | |
case Connected(remote, local) => | |
val sb = remote.getAddress.getAddress.addString(new StringBuilder(), "tcp-", "_", "x").append(remote.getPort) | |
val actorName = local.getAddress.getAddress.addString(sb, "-", "_", "x").append(local.getPort).toString() | |
val connection = sender() | |
val handler = context.actorOf(Props(classOf[SocketClientActor], connection), actorName) | |
connection ! Register(handler) | |
case any => log.info(s">>> Unhandled message '$any'") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment