Skip to content

Instantly share code, notes, and snippets.

@coderfi
Last active November 30, 2016 00:12
Show Gist options
  • Save coderfi/c354013cda523151c2aa to your computer and use it in GitHub Desktop.
Save coderfi/c354013cda523151c2aa to your computer and use it in GitHub Desktop.
from __future__ import print_function
from nanomsg import Socket, REQ, REP, PUB, SUB, DONTWAIT, \
NanoMsgAPIError, EAGAIN, SUB_SUBSCRIBE
import time
from gevent import monkey
import gevent
monkey.patch_all()
print("REQ/REP Sample")
s1 = Socket(REP)
s2 = Socket(REQ)
try:
s1.bind('inproc://hello')
s2.connect('inproc://hello')
s2.send(b'hello nanomsg', flags=DONTWAIT)
msg = s1.recv()
print("server got: " + msg)
s1.send(b'hello back', flags=DONTWAIT)
print("client got: " + s2.recv())
finally:
s1.close()
s2.close()
def send(socket, msg):
print("Sending message: %s" % msg)
socket.send(msg, flags=DONTWAIT)
print("\nPUB/SUB Sample")
s1 = Socket(PUB)
s2 = Socket(SUB)
try:
s1.bind('inproc://hellopubsub')
s2.connect('inproc://hellopubsub')
s2.set_string_option(SUB, SUB_SUBSCRIBE, "") #subscribe to all topics
print("Sending messages later")
gevent.spawn_later(1, send, s1, 'hello nano 1')
gevent.spawn_later(2, send, s1, 'hello nano 2')
gevent.spawn_later(3, send, s1, 'hello nano 3')
gevent.spawn_later(4, send, s1, 'hello nano 4')
gevent.spawn_later(5, send, s1, 'hello nano 5')
print("receiving message")
count = 0
while True:
try:
msg = s2.recv(flags=DONTWAIT)
print("client got: " + msg)
count += 1
if count >= 5:
break
except NanoMsgAPIError, ne:
if ne.errno != EAGAIN:
raise
print('timeout')
#else no message available
gevent.sleep(0.5)
finally:
s1.close()
s2.close()
print("\nPUB/SUB Sample with Subscription")
s1 = Socket(PUB)
s2 = Socket(SUB)
try:
s1.bind('inproc://hellopubsub')
s2.connect('inproc://hellopubsub')
#subscribe to hello messages (starting with the 'hello' prefix)
s2.set_string_option(SUB, SUB_SUBSCRIBE, "hello")
print("Sending messages later")
gevent.spawn_later(1, send, s1, 'hello nano 1')
gevent.spawn_later(2, send, s1, 'something else')
gevent.spawn_later(3, send, s1, 'hello nano 2')
gevent.spawn_later(4, send, s1, 'hello nano 3')
gevent.spawn_later(5, send, s1, 'hello nano 4 ')
print("receiving message")
count = 0
while True:
try:
msg = s2.recv(flags=DONTWAIT)
print("client got: " + msg)
count += 1
if count >= 4:
break
except NanoMsgAPIError, ne:
if ne.errno != EAGAIN:
raise
print('timeout')
#else no message available
gevent.sleep(0.5)
finally:
s1.close()
s2.close()
print("done")
@coderfi
Copy link
Author

coderfi commented Jul 13, 2014

Added subscription example

The number of pub/sub examples are lacking.
All seem to have the comment
// TODO learn more about publishing/subscribe keys
and are apparently copied from http://tim.dysinger.net/posts/2013-09-16-getting-started-with-nanomsg.html

After looking at nanocat, I realized, the subscriptions are simply prefixes of the messages.

I was expecting a topic parameter on the publish side of some sort, or even a complicated message envelope/header.

However, prefixes are quite simple, really. So this works just fine from a protocol standpoint.

Therefore, examples that subscribe to the empty string "", are really subscribing to all messages prefixed by the empty string, i.e. all messages.

Note: I was testing pub/sub with JSON messages.

Obviously, this would not work so well with subscriptions... which means the developer is in charge of basically generating their own message envelope.

i.e. in order to get subscriptions to work, the simplest strategy is to prefix all pub/sub messages with the subscription prefix.

i.e.
s1.send('my_topic1 {"foo": "bar"}')
s1.send('my_topic2 {"foo": "bar"}')
s1.send('my_topic3 {"foo": "bar"}')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment