-
-
Save jedi4ever/6591925 to your computer and use it in GitHub Desktop.
var pubListener = 'tcp://127.0.0.1:5555'; | |
var subListener = 'tcp://127.0.0.1:5556'; | |
var hwm = 1000; | |
var verbose = 0; | |
// The xsub listener is where pubs connect to | |
var subSock = zmq.socket('xsub'); | |
subSock.identity = 'subscriber' + process.pid; | |
subSock.bindSync(subListener); | |
// The xpub listener is where subs connect to | |
var pubSock = zmq.socket('xpub'); | |
pubSock.identity = 'publisher' + process.pid; | |
pubSock.setsockopt(zmq.ZMQ_SNDHWM, hwm); | |
// By default xpub only signals new subscriptions | |
// Settings it to verbose = 1 , will signal on every new subscribe | |
pubSock.setsockopt(zmq.ZMQ_XPUB_VERBOSE, verbose); | |
pubSock.bindSync(pubListener); | |
// When we receive data on subSock , it means someone is publishing | |
subSock.on('message', function(data) { | |
// We just relay it to the pubSock, so subscribers can receive it | |
pubSock.send(data); | |
}); | |
// When Pubsock receives a message , it's subscribe requests | |
pubSock.on('message', function(data, bla) { | |
// The data is a slow Buffer | |
// The first byte is the subscribe (1) /unsubscribe flag (0) | |
var type = data[0]===0 ? 'unsubscribe' : 'subscribe'; | |
// The channel name is the rest of the buffer | |
var channel = data.slice(1).toString(); | |
console.log(type + ':' + channel); | |
// We send it to subSock, so it knows to what channels to listen to | |
subSock.send(data); | |
}); |
// Beware if pub is started before a subcriber listening, it just goes into /dev/null space | |
var zmq = require('zmq'); | |
var sock = zmq.socket('pub'); | |
// Instead of binding our pub socket, we connect to the PUB -> XSUB | |
sock.connect('tcp://127.0.0.1:5556'); | |
sock.send('MYCHANNEL ' + 'here is the info'); |
var zmq = require('zmq'); | |
var sock = zmq.socket('sub'); | |
// We subscribe to MYCHANNEL only | |
sock.subscribe('MYCHANNEL'); | |
// We connect our sub -> XPUB channel | |
sock.connect('tcp://127.0.0.1:5555'); | |
// When a message comes, | |
sock.on('message', function(data) { | |
// data is a SlowBuffer | |
// data contains : 'MYCHANNEL' + aspace + 'here is the info' | |
console.log(data.toString()); | |
}); |
Thanks! I have a few questions to your design:
You set the identity on the xsub socket and forward messages received on xpub directly to the xsub socket. Does that mean that ZMQ exposes subscriptions also on the xsub socket? How does it behave if you have multiple cliens subscribing and unsubscribing, does ZMQ use the identity to keep track of wheter topic messages should be sent or not? Because if this would not be the case a client could unsubscribe other clients connected to the broker.
For what it's worth, this will not work if multipart messages are exchanged (typically done with sock.send([array, of, stuff])
in Node). See JustinTulloss/zeromq.node#412 for more details. The problem is that when using that, send()
expects an array but on('message')
receives a split out list of arguments. That is to say, instead of:
subSock.on('message', function(data) {
// We just relay it to the pubSock, so subscribers can receive it
pubSock.send(data);
});
you need
subSock.on('message', (...args) => pubSock.send(args));
and conversely on the other socket.
Thank you very much, you saved my day. It is shameful that XPUB/XSUB example is missing in https://github.com/imatix/zguide/tree/master/examples/
When there will be some time, I will probably send PR to them.