|
from multiprocessing import Pool, Manager |
|
|
|
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer |
|
import SocketServer |
|
import json |
|
import cgi |
|
import random, time |
|
|
|
# Resources to read |
|
# |
|
# http://stackoverflow.com/a/1239252/603280 |
|
# http://stackoverflow.com/questions/13689927/how-to-get-the-amount-of-work-left-to-be-done-by-a-python-multiprocessing-pool |
|
# |
|
|
|
|
|
# This is your task which will run its its own process. You can modify it to have your desired args and kwargs and |
|
# report back to the manager dictionary 'd' or any other manager resource as necessary. |
|
# Read the documentation for multiprocessing to find the facilities provided by multiprocess.Manager |
|
def task(d, sessionid, number, repeatcount): |
|
""" |
|
This function is a stub funciton which is incrementing a success counter in a random time between 0 to 2 seconds |
|
with a 2% chance that the failure counter will be incremented instead of the success. After each success or failure |
|
the manager dictionary 'd' is updated with the session's progress. |
|
|
|
You should replace this with an actual task and update necessary information about the task into the manager dict |
|
""" |
|
success = 0 |
|
fail = 0 |
|
while success+fail<repeatcount: |
|
time.sleep(random.random()*2.0) |
|
if (random.random()*100)>98.0: |
|
fail+=1 |
|
else: |
|
success+=1 |
|
d[sessionid] = { |
|
'success': success, |
|
'fail': fail, |
|
'number': number, |
|
'repeatcount': repeatcount |
|
} |
|
return |
|
|
|
# Initializing our global resources. |
|
p = Pool() |
|
m = Manager() |
|
d = m.dict() |
|
|
|
# This is the HTTP Server which provides a simple JSON REST API |
|
class Server(BaseHTTPRequestHandler): |
|
def _set_headers(self): |
|
self.send_response(200) |
|
self.send_header('Content-type', 'application/json') |
|
self.end_headers() |
|
|
|
def do_HEAD(self): |
|
self._set_headers() |
|
|
|
# GET sends back the complete contents of the manager dictionary 'd' as JSON. |
|
# This can be modified to any desired response (should be JSON) |
|
def do_GET(self): |
|
self._set_headers() |
|
self.wfile.write(json.dumps(d)) |
|
|
|
# POST echoes the message adding a JSON field |
|
def do_POST(self): |
|
ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) |
|
|
|
# refuse to receive non-json content |
|
if ctype != 'application/json': |
|
self.send_response(400) |
|
self.end_headers() |
|
return |
|
|
|
# read the message and convert it into a python dictionary |
|
length = int(self.headers.getheader('content-length')) |
|
message = json.loads(self.rfile.read(length)) |
|
|
|
if message.has_key('runtask'): |
|
""" |
|
To run a new task simply send the following JSON as POST: |
|
{"runtask": true, "sessionid": "ANY-UNIQUE-NAME-FOR-YOUR-TASK", 'arg1', 'repeatcount'} |
|
|
|
Curl Syntax: |
|
curl --data "{\"runtask\":\"true\", \"sessionid\":\"session-5\", \"number\":\"+\", \"repeatcount\": 100 }" \ |
|
--header "Content-Type: application/json" http://localhost:8111 |
|
""" |
|
print "Starting task with %s, %s, %s" % (message['sessionid'], message['number'], message['repeatcount']) |
|
result = p.apply_async(task, (d, message['sessionid'], message['number'], message['repeatcount'])) |
|
elif message.has_key('sessionid'): |
|
""" |
|
To see the status of a currently running task (or completed task) simpley POST the following JSON |
|
{"sessionid": "THE-UNIQUE-NAME-FOR-YOUR-TASK"} |
|
|
|
Curl Syntax: |
|
curl --data "{\"sessionid\":\"session-5\"}" --header "Content-Type: application/json" http://localhost:8111 |
|
""" |
|
message['status'] = d[message['sessionid']] |
|
|
|
# send the message back |
|
self._set_headers() |
|
self.wfile.write(json.dumps(message)) |
|
|
|
|
|
def run(server_class=HTTPServer, handler_class=Server, port=8111): |
|
server_address = ('', port) |
|
httpd = server_class(server_address, handler_class) |
|
|
|
print 'Starting httpd on port %d...' % port |
|
httpd.serve_forever() |
|
|
|
if __name__ == "__main__": |
|
# Run the task broker. |
|
run() |