Created
October 18, 2011 11:52
-
-
Save viktorklang/1295252 to your computer and use it in GitHub Desktop.
0mq
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com> | |
*/ | |
package akka.zeromq | |
import akka.actor.{Actor, ActorRef} | |
import akka.dispatch.MessageDispatcher | |
import akka.util.Duration | |
import akka.zeromq.SocketType._ | |
import java.util.concurrent.atomic.AtomicReference | |
import org.zeromq.ZMQ.{Socket, Poller} | |
import org.zeromq.{ZMQ => JZMQ} | |
import scala.annotation.tailrec | |
private[zeromq] class ConcurrentSocketActor(params: SocketParameters, dispatcher: MessageDispatcher, pollIntervalMs: Long) extends Actor { | |
private val noBytes = Array[Byte]() | |
private val socket: Socket = params.context.socket(params.socketType) | |
private val poller: Poller = params.context.poller | |
self.dispatcher = dispatcher | |
self.receiveTimeout = pollIntervalMs | |
poller.register(socket, Poller.POLLIN) | |
override def postStop { poller.unregister(socket); socket.close; notifyListener(Closed) } | |
def notifyListener(msg: Any) { | |
params.listener match { | |
case Some(x) if x.isShutdown => self.stop //No point in continuing if the listener is down? | |
case Some(x) => x ! msg | |
case None => //??? | |
} | |
} | |
override def receive: Receive = { | |
case Connect(endpoint) => socket.connect(endpoint); notifyListener(Connecting) | |
case Bind(endpoint) => socket.bind(endpoint) | |
case Close => self.stop | |
case Send(frames) => | |
for (i <- 0 until frames.length) | |
socket.send(frames(i).payload.toArray, if (i < frames.length - 1) JZMQ.SNDMORE else 0) | |
receiveFrames() | |
case Subscribe(topic) => socket.subscribe(topic.toArray); receiveFrames() | |
case Unsubscribe(topic) => socket.unsubscribe(topic.toArray); receiveFrames() | |
case ZMQMessage(frames) => sendFrames(frames); receiveFrames() | |
case ReceiveTimeout => receiveFrames() | |
} | |
def receiveFrames(): Seq[Frame] = { | |
val i = new Iterator[Frame] { | |
@inline def receiveBytes(): Array[Byte] = socket.recv(0) match { | |
case null => noBytes | |
case bytes: Array[Byte] if bytes.length > 0 => bytes | |
case _ => noBytes | |
} | |
var nextBytes = receiveBytes() | |
def hasNext = nextBytes ne noBytes | |
def next = nextBytes match { | |
case `noBytes` => throw new EOFException("EOF") | |
case bytes => | |
nextBytes = if (socket.hasReceiveMore) receiveBytes() else noBytes | |
Frame(bytes) | |
} | |
} | |
if (poller.poll(params.pollTimeoutDuration.toMillis) > 0 && poller.pollin(0)) { | |
val frames = (Vector.empty[Frame] /: i)(_ :+ _) | |
if (frames.nonEmpty) notifyListener(params.deserializer(frames)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment