Created
April 12, 2011 18:00
-
-
Save j2labs/916035 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| """ | |
| Least-recently used (LRU) queue device | |
| Clients and workers are shown here in-process | |
| Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com> | |
| """ | |
| """Extended by J2 Labs to help make it clear what's happening. | |
| Output looks like below: | |
| $ ./lruqueue.py | |
| Initializing LRU Loop ]-------------------------- | |
| LRU Loop :: received READY from Worker-0 | |
| Worker-0 :: received <- HELLO -- sending -> OK | |
| LRU Loop :: received READY from Worker-1 | |
| Worker-1 :: received <- HELLO -- sending -> OK | |
| LRU Loop :: assigning Client-0 -> Worker-0 | |
| LRU Loop :: sending OK -> Client-0 | |
| Client-0 :: sending -> HELLO -- received <- OK | |
| LRU Loop :: assigning Client-1 -> Worker-1 | |
| Worker-0 :: received <- HELLO -- sending -> OK | |
| LRU Loop :: sending OK -> Client-1 | |
| Client-1 :: sending -> HELLO -- received <- OK | |
| Worker-1 :: received <- HELLO -- sending -> OK | |
| LRU Loop :: assigning Client-2 -> Worker-0 | |
| LRU Loop :: sending OK -> Client-2 | |
| Client-2 :: sending -> HELLO -- received <- OK | |
| Worker-0 :: received <- HELLO -- sending -> OK | |
| LRU Loop :: assigning Client-3 -> Worker-1 | |
| LRU Loop :: sending OK -> Client-3 | |
| Client-3 :: sending -> HELLO -- received <- OK | |
| LRU Loop :: assigning Client-4 -> Worker-0 | |
| LRU Loop :: sending OK -> Client-4 | |
| Client-4 :: sending -> HELLO -- received <- OK | |
| """ | |
| import threading | |
| import time | |
| import zmq | |
| NBR_CLIENTS = 5 | |
| NBR_WORKERS = 2 | |
| def worker_thread(worker_url, context, i): | |
| """ Worker using REQ socket to do LRU routing | |
| """ | |
| socket = context.socket(zmq.REQ) | |
| identity = "Worker-%d" % (i) | |
| socket.setsockopt(zmq.IDENTITY, identity) #set worker identity | |
| socket.connect(worker_url) | |
| # Tell the borker we are ready for work | |
| socket.send("READY") | |
| try: | |
| while True: | |
| # python binding seems to eat empty frames | |
| address = socket.recv() | |
| request = socket.recv() | |
| print("%s :: received <- %s -- sending -> %s" | |
| % (identity, request, "OK")) | |
| socket.send(address, zmq.SNDMORE) | |
| socket.send("", zmq.SNDMORE) | |
| socket.send("OK") | |
| except zmq.ZMQError, zerr: | |
| # context terminated so quit silently | |
| if zerr.strerror == 'Context was terminated': | |
| return | |
| else: | |
| raise zerr | |
| def client_thread(client_url, context, i): | |
| """ Basic request-reply client using REQ socket | |
| """ | |
| socket = context.socket(zmq.REQ) | |
| identity = "Client-%d" % (i) | |
| socket.setsockopt(zmq.IDENTITY, identity) #Set client identity. Makes tracing easier | |
| socket.connect(client_url) | |
| # Send request, get reply | |
| socket.send("HELLO") | |
| reply = socket.recv() | |
| print("%s :: sending -> %s -- received <- %s" | |
| % (identity, "HELLO", reply)) | |
| return | |
| def main(): | |
| """ main method """ | |
| url_worker = "inproc://workers" | |
| url_client = "inproc://clients" | |
| client_nbr = NBR_CLIENTS | |
| # Prepare our context and sockets | |
| context = zmq.Context(1) | |
| frontend = context.socket(zmq.XREP) | |
| frontend.bind(url_client) | |
| backend = context.socket(zmq.XREP) | |
| backend.bind(url_worker) | |
| # create workers and clients threads | |
| for i in range(NBR_WORKERS): | |
| thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, )) | |
| thread.start() | |
| for i in range(NBR_CLIENTS): | |
| thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, )) | |
| thread_c.start() | |
| # Logic of LRU loop | |
| # - Poll backend always, frontend only if 1+ worker ready | |
| # - If worker replies, queue worker as ready and forward reply | |
| # to client if necessary | |
| # - If client requests, pop next worker and send request to it | |
| # Queue of available workers | |
| available_workers = 0 | |
| workers_list = [] | |
| # init poller | |
| poller = zmq.Poller() | |
| # Always poll for worker activity on backend | |
| poller.register(backend, zmq.POLLIN) | |
| # Poll front-end only if we have available workers | |
| poller.register(frontend, zmq.POLLIN) | |
| while True: | |
| socks = dict(poller.poll()) | |
| # Handle worker activity on backend | |
| if (backend in socks and socks[backend] == zmq.POLLIN): | |
| # Queue worker address for LRU routing | |
| worker_addr = backend.recv() | |
| assert available_workers < NBR_WORKERS | |
| # add worker back to the list of workers | |
| available_workers += 1 | |
| workers_list.append(worker_addr) | |
| # Second frame is empty | |
| empty = backend.recv() | |
| assert empty == "" | |
| # Third frame is READY or else a client reply address | |
| client_addr = backend.recv() | |
| # If client reply, send rest back to frontend | |
| if client_addr != "READY": | |
| print("%s :: assigning %s -> %s" | |
| % ("LRU Loop", client_addr, worker_addr)) | |
| # Following frame is empty | |
| empty = backend.recv() | |
| assert empty == "" | |
| reply = backend.recv() | |
| frontend.send(client_addr, zmq.SNDMORE) | |
| frontend.send("", zmq.SNDMORE) | |
| frontend.send(reply) | |
| print("%s :: sending %s -> %s" % ("LRU Loop", reply, client_addr)) | |
| client_nbr -= 1 | |
| if client_nbr == 0: | |
| break # Exit after N messages | |
| else: | |
| print("%s :: received %s from %s" | |
| % ("LRU Loop", client_addr, worker_addr)) | |
| # poll on frontend only if workers are available | |
| if available_workers > 0: | |
| if (frontend in socks and socks[frontend] == zmq.POLLIN): | |
| # Now get next client request, route to LRU worker | |
| # Client request is [address][empty][request] | |
| client_addr = frontend.recv() | |
| empty = frontend.recv() | |
| assert empty == "" | |
| request = frontend.recv() | |
| # Dequeue and drop the next worker address | |
| available_workers -= 1 | |
| worker_id = workers_list.pop() | |
| backend.send(worker_id, zmq.SNDMORE) | |
| backend.send("", zmq.SNDMORE) | |
| backend.send(client_addr, zmq.SNDMORE) | |
| backend.send(request) | |
| #out of infinite loop: do some housekeeping | |
| time.sleep (1) | |
| frontend.close() | |
| backend.close() | |
| context.term() | |
| if __name__ == "__main__": | |
| print 'Initializing LRU Loop ]--------------------------' | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment