Skip to content

Instantly share code, notes, and snippets.

@mayli
Created April 24, 2015 20:59
Show Gist options
  • Save mayli/b2bac2aa6c22dec3d0b5 to your computer and use it in GitHub Desktop.
Save mayli/b2bac2aa6c22dec3d0b5 to your computer and use it in GitHub Desktop.
RPCQueue
import Queue
import xmlrpclib
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
import threading
from SocketServer import ThreadingMixIn
class ThreadingXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):
pass
class RPCQueue(object):
_methods = [
"task_done", "join", "qsize",
"empty", "full", "put", "put_nowait",
"get", "get_nowait"]
def __init__(self, listen=None, master=None,
logRequests=False, **kwargs):
if not master:
self._queue = Queue.Queue(**kwargs)
# Init internal XMLServer.
if listen == None:
# TODO: Assign random port number.
self._listen = ("localhost", 11235)
else:
self._listen = listen
# Start XMLRPCServer.
class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2',)
self._server = ThreadingXMLRPCServer(
self._listen,
requestHandler=RequestHandler, allow_none=True, logRequests=logRequests)
self._server.register_introspection_functions()
for fn in self._methods:
self._server.register_function(getattr(self._queue, fn), fn)
self._server_thread = threading.Thread(target=self._server.serve_forever)
self._server_thread.start()
master = 'http://%s:%d' % self._listen
self._client = xmlrpclib.ServerProxy(master)
# Proxy Queue methods.
for fn in self._methods:
setattr(self, fn, getattr(self._client, fn))
def __del__(self):
if getattr(self, "_server_thread", None):
self._server.shutdown()
if __name__ == '__main__':
import threading
num_worker_threads = 3
def worker():
q = RPCQueue(master='http://localhost:11235')
while True:
item = q.get()
print "working on ", item
q.task_done()
q = RPCQueue(logRequests=True)
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
for item in range(10):
q.put((item, "Some Task"))
q.join()
print "Done, shutdown server"
del q
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment