Skip to content

Instantly share code, notes, and snippets.

@hiway
Forked from felipecruz/asyncsrv.py
Last active October 2, 2016 05:40
Show Gist options
  • Save hiway/a70c46ef14df5aa8e56d3c19c338b941 to your computer and use it in GitHub Desktop.
Save hiway/a70c46ef14df5aa8e56d3c19c338b941 to your computer and use it in GitHub Desktop.
python zmq async server example
"""
ZMQ with Threaded Workers on Python 3
Python 3 compatible port of: https://gist.github.com/felipecruz/883983
"""
import zmq
import threading
import time
from random import choice
class ClientTask(threading.Thread):
"""ClientTask"""
def __init__(self):
threading.Thread.__init__(self)
def run(self):
context = zmq.Context()
socket = context.socket(zmq.XREQ)
identity = 'worker-%d' % (choice([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]))
socket.setsockopt(zmq.IDENTITY, identity.encode('utf-8'))
socket.connect('tcp://localhost:5570')
print('Client %s started' % (identity))
poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
reqs = 0
while True:
for i in range(0, 5):
sockets = dict(poll.poll(1000))
if socket in sockets:
if sockets[socket] == zmq.POLLIN:
msg = socket.recv()
print('%s: %s\n' % (identity, msg))
del msg
reqs = reqs + 1
print('Req #%d sent..' % (reqs))
socket.send(b'request #%d' % (reqs))
socket.close()
context.term()
class ServerTask(threading.Thread):
"""ServerTask"""
def __init__(self):
threading.Thread.__init__(self)
def run(self):
context = zmq.Context()
frontend = context.socket(zmq.XREP)
frontend.bind('tcp://*:5570')
backend = context.socket(zmq.XREQ)
backend.bind('inproc://backend')
workers = []
for i in range(0, 5):
worker = ServerWorker(context)
worker.start()
workers.append(worker)
poll = zmq.Poller()
poll.register(frontend, zmq.POLLIN)
poll.register(backend, zmq.POLLIN)
while True:
sockets = dict(poll.poll())
if frontend in sockets:
if sockets[frontend] == zmq.POLLIN:
msg = frontend.recv()
print('Server received %s' % (msg))
backend.send(msg)
if backend in sockets:
if sockets[backend] == zmq.POLLIN:
msg = backend.recv()
frontend.send(msg)
frontend.close()
backend.close()
context.term()
class ServerWorker(threading.Thread):
"""ServerWorker"""
def __init__(self, context):
threading.Thread.__init__(self)
self.context = context
def run(self):
worker = self.context.socket(zmq.XREQ)
worker.connect('inproc://backend')
print('Worker started')
while True:
msg = worker.recv()
print('Worker received %s' % (msg))
replies = choice(range(0, 5))
for i in range(0, replies):
time.sleep(1 / choice(range(1, 10)))
worker.send(msg)
del msg
worker.close()
def main():
"""main function"""
server = ServerTask()
server.start()
for i in range(0, 3):
client = ClientTask()
client.start()
server.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment