-
-
Save chansdad/45b77d22257fb5e1c0006a5b4045374b to your computer and use it in GitHub Desktop.
Playing with threads, greenlets and ZMQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gevent | |
from gevent import monkey; monkey.patch_all() | |
from gevent_zeromq import zmq | |
import simplejson as json | |
import threading | |
import time | |
def server(context): | |
poll = zmq.Poller() | |
# Socket to talk to dispatcher | |
socket = context.socket(zmq.REP) | |
poll.register(socket, zmq.POLLIN) | |
runid = 1234 | |
print "Socket bound on %s ..." % (runid) | |
rc = socket.bind("inproc://%s" % (runid)) | |
print "rc = ", rc | |
#socket.bind("tcp://*:8040") | |
while True: | |
string = socket.recv() | |
print("Received request for : [%s]" % (string)) | |
# do some 'work' | |
# See the difference in handling more clients with different sleep | |
# values | |
time.sleep(1) | |
#send reply back to client | |
socket.send("World") | |
#break | |
# We never get here but clean up the context | |
socket.setsockopt(zmq.LINGER, 0) | |
socket.close() | |
# WARNING - if context is terminated, any active clients who share the same | |
# context will get screwed. | |
# context.term() | |
def client(context, i): | |
# Create Lazy Pirate ZMQ REQ endpoint | |
# Inspired from http://zguide2.zeromq.org/py:lpclient | |
poll = zmq.Poller() | |
# Socket to talk to server | |
client = context.socket(zmq.REQ) | |
poll.register(client, zmq.POLLIN) | |
runid = 1234 | |
print "[Client %s] Connecting to Outbound Server %s ..." % (i, runid) | |
client.connect("inproc://%s" % (runid)) | |
args = {'client' : i, 'params' : 'Hello'} | |
print "Sending request with args = %s" %(args) | |
client.send(json.dumps(args)) | |
REQUEST_TIMEOUT = 5000 | |
while True: | |
socks = dict(poll.poll(REQUEST_TIMEOUT)) | |
if socks.get(client) == zmq.POLLIN: | |
# Get the reply. | |
result = client.recv() | |
else: | |
print "No response from Outbound Server" | |
result = None | |
break | |
client.setsockopt(zmq.LINGER, 0) | |
client.close() | |
poll.unregister(client) | |
return result | |
def server_thread(context): | |
print "Server thread started ..." | |
js = [gevent.spawn(server, context), ] | |
print "Server Greenlet started - " | |
def client_thread(context): | |
print "Client thread started ..." | |
CLIENTS = 10 | |
jobs = [gevent.spawn(client, context, i) for i in range(CLIENTS)] | |
print "Client Greenlets started - " | |
gevent.joinall(jobs) | |
print [job.value for job in jobs] | |
if __name__ == '__main__': | |
# Prepare our context | |
# You should create and use exactly one context in your process that will | |
# be used to communicate between threads. What this means is that for | |
# "inproc://" communications there has to be one single context. | |
# Multiple contexts may coexist within a single application which do not | |
# talk to each other over zmq "inproc://". | |
context = zmq.Context(1) | |
# First start the server thread. This is necessary as "bind" should happen | |
# before "connect". Pass the zmq context to the thread. | |
t1 = threading.Thread(target=server_thread, args=(context, )) | |
t1.start() | |
# Now, start the client thread | |
t2 = threading.Thread(target=client_thread, args=(context, )) | |
t2.start() | |
# Note - Thread Safety | |
# 1. A ZMQ context is thread safe and may be shared among as many application | |
# threads as necessary, without any additional locking required on the part | |
# of the caller. | |
# 2. Individual ZMQ sockets are not thread safe except in the case where full | |
# memory barriers are issued when migrating a socket from one thread to | |
# another. In practice this means applications can create a socket in one | |
# thread with zmq_socket() and then pass it to a newly created thread as part | |
# of thread initialization, for example via a structure passed as an argument | |
# to pthread_create(). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment