Skip to content

Instantly share code, notes, and snippets.

@baljanak
Created October 30, 2012 10:09
Show Gist options
  • Save baljanak/3979427 to your computer and use it in GitHub Desktop.
Save baljanak/3979427 to your computer and use it in GitHub Desktop.
Playing with threads, greenlets and ZMQ
import gevent
from gevent import monkey; monkey.patch_all()
from gevent_zeromq import zmq
import simplejson as json
import threading
import time
def server(context):
""" server routine """
poll = zmq.Poller()
# Socket to talk to dispatcher
socket = context.socket(zmq.REP)
poll.register(socket, zmq.POLLIN)
runid = 1234
print "Socket bound on %s ..." % (runid)
rc = socket.bind("inproc://%s" % (runid))
print "rc = ", rc
#socket.bind("tcp://*:8040")
while True:
string = socket.recv()
print("Received request for : [%s]" % (string))
# do some 'work'
# See the difference in handling more clients with different sleep
# values
# time.sleep(1)
gevent.sleep(1)
#send reply back to client
socket.send("World")
# We never get here but clean up anyhow
context.term()
def client(context, i):
# Create Lazy Pirate ZMQ REQ endpoint
# Inspired from http://zguide2.zeromq.org/py:lpclient
poll = zmq.Poller()
# Socket to talk to server
client = context.socket(zmq.REQ)
poll.register(client, zmq.POLLIN)
runid = 1234
print "[Client %s] Connecting to Outbound Server %s ..." % (i, runid)
client.connect("inproc://%s" % (runid))
args = {'client' : i, 'params' : 'Hello'}
print "Sending request with args = %s" %(args)
client.send(json.dumps(args))
REQUEST_TIMEOUT = 5000
while True:
socks = dict(poll.poll(REQUEST_TIMEOUT))
if socks.get(client) == zmq.POLLIN:
# Get the reply.
result = client.recv()
else:
print "No response from Outbound Server"
result = None
break
client.setsockopt(zmq.LINGER, 0)
client.close()
poll.unregister(client)
return result
if __name__ == '__main__':
context = zmq.Context(1)
js = [gevent.spawn(server, context), ]
print "Server Greenlet started - "
# This makes sure the REP endpoint is setup. The REQ endpoint cannot connect if this is not done.
gevent.sleep(5)
jobs = [gevent.spawn(client, context, i) for i in range(10)]
print "Client Greenlets started - "
gevent.joinall(jobs)
print [job.value for job in jobs]
# You will see that the later clients get the responses which seems to be unfair. Thoughts?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment