Created
April 24, 2015 20:59
-
-
Save mayli/b2bac2aa6c22dec3d0b5 to your computer and use it in GitHub Desktop.
RPCQueue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Queue | |
import xmlrpclib | |
from SimpleXMLRPCServer import SimpleXMLRPCServer | |
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler | |
import threading | |
from SocketServer import ThreadingMixIn | |
class ThreadingXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer): | |
pass | |
class RPCQueue(object): | |
_methods = [ | |
"task_done", "join", "qsize", | |
"empty", "full", "put", "put_nowait", | |
"get", "get_nowait"] | |
def __init__(self, listen=None, master=None, | |
logRequests=False, **kwargs): | |
if not master: | |
self._queue = Queue.Queue(**kwargs) | |
# Init internal XMLServer. | |
if listen == None: | |
# TODO: Assign random port number. | |
self._listen = ("localhost", 11235) | |
else: | |
self._listen = listen | |
# Start XMLRPCServer. | |
class RequestHandler(SimpleXMLRPCRequestHandler): | |
rpc_paths = ('/RPC2',) | |
self._server = ThreadingXMLRPCServer( | |
self._listen, | |
requestHandler=RequestHandler, allow_none=True, logRequests=logRequests) | |
self._server.register_introspection_functions() | |
for fn in self._methods: | |
self._server.register_function(getattr(self._queue, fn), fn) | |
self._server_thread = threading.Thread(target=self._server.serve_forever) | |
self._server_thread.start() | |
master = 'http://%s:%d' % self._listen | |
self._client = xmlrpclib.ServerProxy(master) | |
# Proxy Queue methods. | |
for fn in self._methods: | |
setattr(self, fn, getattr(self._client, fn)) | |
def __del__(self): | |
if getattr(self, "_server_thread", None): | |
self._server.shutdown() | |
if __name__ == '__main__': | |
import threading | |
num_worker_threads = 3 | |
def worker(): | |
q = RPCQueue(master='http://localhost:11235') | |
while True: | |
item = q.get() | |
print "working on ", item | |
q.task_done() | |
q = RPCQueue(logRequests=True) | |
for i in range(num_worker_threads): | |
t = threading.Thread(target=worker) | |
t.daemon = True | |
t.start() | |
for item in range(10): | |
q.put((item, "Some Task")) | |
q.join() | |
print "Done, shutdown server" | |
del q |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment