Created
March 31, 2012 20:48
-
-
Save dvarrazzo/2268338 to your computer and use it in GitHub Desktop.
zmq eventlet tests
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import zmq | |
try: | |
n = int(sys.argv[1]) | |
except: | |
n = 10 | |
ctx = zmq.Context() | |
req = ctx.socket(zmq.REQ) | |
req.connect("tcp://localhost:5555") | |
for i in range(n): | |
print "Sending Hello %d..." % i | |
req.send("Hello %s" % i) | |
reply = req.recv() | |
print "Received %s %d" % (reply, i) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
def is_eventlet(): | |
return bool(len(sys.argv) > 1 and sys.argv[1] == 'eventlet') | |
if is_eventlet(): | |
print "going eventlet" | |
import eventlet | |
from eventlet.green import zmq | |
from eventlet.green import time | |
from eventlet.green import threading | |
else: | |
print "going thread" | |
import zmq | |
import time | |
import threading | |
def worker(ctx, i): | |
print "worker", i, "started" | |
sock = ctx.socket(zmq.REP) | |
sock.connect('tcp://localhost:5560') | |
while 1: | |
msg = sock.recv() | |
print "worker", i, "got", msg | |
time.sleep(1) | |
sock.send('World') | |
def queue(frontend, backend): | |
poller = zmq.Poller() | |
poller.register(frontend, zmq.POLLIN) | |
poller.register(backend, zmq.POLLIN) | |
while 1: | |
socks = dict(poller.poll()) | |
if socks.get(frontend) == zmq.POLLIN: | |
while 1: | |
s = frontend.recv() | |
print "got msg from frontend: [%s]" % ( | |
",".join(map(str, map(ord, s))) if s and s[0] == '\x00' else s) | |
more = frontend.getsockopt(zmq.RCVMORE) | |
backend.send(s, more and zmq.SNDMORE or 0) | |
if not more: | |
break | |
if socks.get(backend) == zmq.POLLIN: | |
while 1: | |
s = backend.recv() | |
print "got msg from backend: [%s]" % ( | |
",".join(map(str, map(ord, s))) if s and s[0] == '\x00' else s) | |
more = backend.getsockopt(zmq.RCVMORE) | |
frontend.send(s, more and zmq.SNDMORE or 0) | |
if not more: | |
break | |
def queue_eventlet(frontend, backend): | |
def f1(): | |
print "queue thread 1" | |
while 1: | |
while 1: | |
s = frontend.recv() | |
print "got msg from frontend: [%s]" % ( | |
",".join(map(str, map(ord, s))) if s and s[0] == '\x00' else s) | |
more = frontend.getsockopt(zmq.RCVMORE) | |
backend.send(s, more and zmq.SNDMORE or 0) | |
if not more: | |
break | |
def f2(): | |
print "queue thread 2" | |
while 1: | |
while 1: | |
s = backend.recv() | |
print "got msg from backend: [%s]" % ( | |
",".join(map(str, map(ord, s))) if s and s[0] == '\x00' else s) | |
more = backend.getsockopt(zmq.RCVMORE) | |
frontend.send(s, more and zmq.SNDMORE or 0) | |
if not more: | |
break | |
eventlet.spawn(f1) | |
eventlet.spawn(f2) | |
while 1: | |
print "sleep" | |
time.sleep(5) | |
def main(): | |
ctx = zmq.Context() | |
frontend = ctx.socket(zmq.ROUTER) | |
frontend.bind("tcp://*:5555") | |
backend = ctx.socket(zmq.DEALER) | |
backend.bind("tcp://*:5560") | |
for i in range(5): | |
t = threading.Thread(target=worker, args=(ctx, i)) | |
t.setDaemon(True) | |
t.start() | |
if is_eventlet(): | |
queue_eventlet(frontend, backend) | |
else: | |
queue(frontend, backend) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment