Skip to content

Instantly share code, notes, and snippets.

@dcolish
Created November 11, 2011 05:45
Show Gist options
  • Select an option

  • Save dcolish/1357302 to your computer and use it in GitHub Desktop.

Select an option

Save dcolish/1357302 to your computer and use it in GitHub Desktop.
Zmq Tests
from zmq import Context, PULL
ctx = Context()
sock = ctx.socket(PULL)
sock.connect("tcp://localhost:5555")
counter = 1
try:
with open('test.txt', 'w+') as f:
while True:
msg = sock.recv()
print >> f, msg
except KeyboardInterrupt:
sock.close()
ctx.term()
from zmq import Context, PUSH
ctx = Context()
sock = ctx.socket(PUSH)
sock.bind("tcp://*:5555")
counter = 1
try:
while True:
sock.send("hello %s" % counter)
counter += 1
except KeyboardInterrupt:
sock.close()
ctx.term()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment