Last active
August 10, 2016 12:21
-
-
Save nfaggian/b204f48fe1bc7868b791ba0b2ba092fb to your computer and use it in GitHub Desktop.
Simple remote simulator executor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Copyright Bureau Of Meteorology. | |
This software is provided under license "as is", without warranty of any | |
kind including, but not limited to, fitness for a particular purpose. The | |
user assumes the entire risk as to the use and performance of the software. | |
In no event shall the copyright holder be held liable for any claim, damages | |
or other liability arising from the use of the software. | |
""" | |
import json | |
import tornado.ioloop | |
import tornado.web | |
import tornado.escape | |
import tornado.queues | |
import tornado.gen | |
import concurrent | |
import time | |
import uuid | |
# A simple mechanism to run and manage jobs, default is the number of cores. | |
# https://docs.python.org/3.2/library/concurrent.futures.html#module-concurrent.futures | |
EXECUTOR = concurrent.futures.ProcessPoolExecutor(max_workers=2) | |
# Dictionary that tracks futures | |
SIMULATIONS = {} | |
def future_state(future): | |
""" | |
String represetation of state | |
""" | |
if future.cancelled(): | |
return 'cancel' | |
elif future.running(): | |
return 'running' | |
elif future.done(): | |
return 'finished' | |
else: | |
return 'waiting' | |
def simulator_runner(specification): | |
""" | |
Cache simulator output in RAM - bad idea generally. | |
""" | |
# Call an external process that generates output | |
time.sleep(30) | |
# Capture the output | |
return json.loads(open('data.json', 'r').read()) | |
class JSONHandler(tornado.web.RequestHandler): | |
""" | |
Request handler where requests and responses speak JSON. | |
derived from: https://gist.github.com/mminer/5464753 | |
""" | |
def prepare(self): | |
if self.request.headers["Content-Type"].startswith("application/json"): | |
self.json_args = tornado.escape.json_decode(self.request.body) | |
else: | |
self.json_args = None | |
class SubmitHandler(JSONHandler): | |
""" | |
Recieves a simulation specification and queues a job. | |
""" | |
@tornado.gen.coroutine | |
def post(self): | |
specification = self.json_args | |
identifier = uuid.uuid1().__str__() | |
SIMULATIONS[ identifier ] = EXECUTOR.submit(simulator_runner, specification) | |
self.write(json.dumps({"job-id":identifier})) | |
class StatusHandler(tornado.web.RequestHandler): | |
""" | |
Returns the status of a job. | |
""" | |
@tornado.gen.coroutine | |
def get(self): | |
self.write(dict([(k, future_state(v)) | |
for k, v in SIMULATIONS.items()])) | |
class CancelHandler(JSONHandler): | |
""" | |
Cancels a job, removes it from the job queue. | |
""" | |
@tornado.gen.coroutine | |
def post(self): | |
data = self.json_args | |
try: | |
if data["job-id"] in SIMULATIONS: | |
future = SIMULATIONS[data["job-id"]] | |
future.cancel() | |
self.write({"job-id":data["job-id"], | |
"status":future_state(future)}) | |
else: | |
self.write({"error":"invalid-job"}) | |
except Exception as error: | |
self.write({"error": error.__str__()}) | |
class RetrieveHandler(JSONHandler): | |
""" | |
Retrieves a completed job. | |
""" | |
@tornado.gen.coroutine | |
def post(self): | |
data = self.json_args | |
try: | |
if data["job-id"] in SIMULATIONS: | |
future = SIMULATIONS[data["job-id"]] | |
# pull data from job | |
self.write(future.result()) | |
else: | |
self.write({"error":"invalid-job"}) | |
except Exception as error: | |
self.write({"error": error.__str__()}) | |
if __name__ == "__main__": | |
app = tornado.web.Application([ | |
(r"/submit", SubmitHandler), | |
(r"/status", StatusHandler), | |
(r"/cancel", CancelHandler), | |
(r"/retrieve", RetrieveHandler), | |
], debug=True) | |
app.listen(9999) | |
tornado.ioloop.IOLoop.current().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Basic tornado based job scheduler using concurrent futures.