Created
October 30, 2012 10:09
-
-
Save baljanak/3979427 to your computer and use it in GitHub Desktop.
Playing with threads, greenlets and ZMQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gevent | |
from gevent import monkey; monkey.patch_all() | |
from gevent_zeromq import zmq | |
import simplejson as json | |
import threading | |
import time | |
def server(context): | |
""" server routine """ | |
poll = zmq.Poller() | |
# Socket to talk to dispatcher | |
socket = context.socket(zmq.REP) | |
poll.register(socket, zmq.POLLIN) | |
runid = 1234 | |
print "Socket bound on %s ..." % (runid) | |
rc = socket.bind("inproc://%s" % (runid)) | |
print "rc = ", rc | |
#socket.bind("tcp://*:8040") | |
while True: | |
string = socket.recv() | |
print("Received request for : [%s]" % (string)) | |
# do some 'work' | |
# See the difference in handling more clients with different sleep | |
# values | |
# time.sleep(1) | |
gevent.sleep(1) | |
#send reply back to client | |
socket.send("World") | |
# We never get here but clean up anyhow | |
context.term() | |
def client(context, i): | |
# Create Lazy Pirate ZMQ REQ endpoint | |
# Inspired from http://zguide2.zeromq.org/py:lpclient | |
poll = zmq.Poller() | |
# Socket to talk to server | |
client = context.socket(zmq.REQ) | |
poll.register(client, zmq.POLLIN) | |
runid = 1234 | |
print "[Client %s] Connecting to Outbound Server %s ..." % (i, runid) | |
client.connect("inproc://%s" % (runid)) | |
args = {'client' : i, 'params' : 'Hello'} | |
print "Sending request with args = %s" %(args) | |
client.send(json.dumps(args)) | |
REQUEST_TIMEOUT = 5000 | |
while True: | |
socks = dict(poll.poll(REQUEST_TIMEOUT)) | |
if socks.get(client) == zmq.POLLIN: | |
# Get the reply. | |
result = client.recv() | |
else: | |
print "No response from Outbound Server" | |
result = None | |
break | |
client.setsockopt(zmq.LINGER, 0) | |
client.close() | |
poll.unregister(client) | |
return result | |
if __name__ == '__main__': | |
context = zmq.Context(1) | |
js = [gevent.spawn(server, context), ] | |
print "Server Greenlet started - " | |
# This makes sure the REP endpoint is setup. The REQ endpoint cannot connect if this is not done. | |
gevent.sleep(5) | |
jobs = [gevent.spawn(client, context, i) for i in range(10)] | |
print "Client Greenlets started - " | |
gevent.joinall(jobs) | |
print [job.value for job in jobs] | |
# You will see that the later clients get the responses which seems to be unfair. Thoughts? |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment