Created
April 25, 2011 22:48
-
-
Save casualjim/941427 to your computer and use it in GitHub Desktop.
A poller to have a more event driven interface to jzmq
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ZeroMQPoller { | |
type ZMessageHandler = ZMessage => Unit | |
} | |
class ZeroMQPoller(context: Context) { | |
import ZeroMQPoller._ | |
private var poller: Poller = null | |
private val pollinHandlers = ListBuffer[ZMessageHandler]() | |
private val sockets = ListBuffer[Socket]() | |
protected def register(socket: Socket, messageHandler: ZMessageHandler) { | |
sockets += socket | |
pollinHandlers += messageHandler | |
if(poller != null) { | |
poller.register(socket, Poller.POLLIN) | |
} | |
} | |
def init() { | |
poller = context.poller(sockets.size) | |
sockets.foreach(poller.register(_, Poller.POLLIN)) | |
} | |
def dispose() { | |
sockets.foreach(poller.unregister(_)) | |
sockets.clear() | |
pollinHandlers.clear() | |
} | |
def -=(socket: Socket) { | |
val idx = sockets indexOf socket | |
poller unregister socket | |
sockets -= socket | |
pollinHandlers remove idx | |
} | |
def +=(handler: (Socket, ZMessageHandler)) { | |
(register _).tupled(handler) | |
} | |
def poll(timeout: Long = -1) { | |
if(poller == null) init() | |
val timo = if (timeout > 0) timeout * 1000 else timeout | |
poller.poll(timo) | |
(0 until poller.getSize) foreach { idx => | |
if(poller.pollin(idx)) { | |
pollinHandlers(idx)(ZMessage(poller.getSocket(idx))) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment