-
-
Save rizplate/6a8eb561b49cc2151dff865f64a70e7b to your computer and use it in GitHub Desktop.
Nameko celery integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, request | |
from nameko.standalone.rpc import ServiceRpcProxy | |
app = Flask(__name__) | |
@app.route('/') | |
def task_list(): | |
return """ | |
<html> | |
<body> | |
<h1>Available Tasks</h1> | |
<h2>Fibonacci</h2> | |
<form action="/fibonacci" method="post"> | |
Number: | |
<input type="text" name="n"> | |
<input type="submit" value="Submit"> | |
</form> | |
</body> | |
</html> | |
""" | |
@app.route('/fibonacci', methods=['POST']) | |
def start_task(): | |
n = int(request.form['n']) | |
with rpc_proxy() as task_proxy: | |
task_id = task_proxy.start_task("fibonacci", n) | |
return """ | |
<html> | |
<body> | |
<p> | |
Your task is running. | |
<a href="/task/{task_id}">Result</a> | |
</p> | |
</body> | |
</html> | |
""".format(task_id=task_id) | |
@app.route('/task/<string:task_id>') | |
def task_result(task_id): | |
with rpc_proxy() as task_proxy: | |
result = task_proxy.get_result(task_id) | |
return """ | |
<html> | |
<body> | |
<p>The result of task {task_id} is {result}.</p> | |
</body> | |
</html> | |
""".format(task_id=task_id, result=result) | |
def rpc_proxy(): | |
# the ServiceRpcProxy instance isn't thread safe so we constuct one for | |
# each request; a more intelligent solution would be a thread-local or | |
# pool of shared proxies | |
config = {'AMQP_URI': 'amqp://guest:guest@localhost/'} | |
return ServiceRpcProxy('tasks', config) | |
if __name__ == '__main__': | |
app.run(debug=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
import eventlet | |
from eventlet.event import Event | |
from nameko.rpc import rpc | |
from nameko.extensions import DependencyProvider | |
# a simple task | |
def fibonacci(n): | |
a, b = 1, 1 | |
for i in xrange(n-1): | |
a, b = b, a+b | |
if n % 50 == 0: | |
eventlet.sleep() # won't yield voluntarily since there's no i/o | |
return a | |
class TaskProcessor(DependencyProvider): | |
def __init__(self): | |
self.tasks = { | |
'fibonacci': fibonacci | |
# add other tasks here | |
} | |
self.results = {} | |
def start_task(self, name, args, kwargs): | |
# generate unique id | |
task_id = uuid.uuid4().hex | |
# get the named task | |
task = self.tasks.get(name) | |
# execute it in a container thread and send the result to an Event | |
event = Event() | |
gt = self.container.spawn_managed_thread(lambda: task(*args, **kwargs)) | |
gt.link(lambda res: event.send(res.wait())) | |
# store the Event and return the task's unique id to the caller | |
self.results[task_id] = event | |
return task_id | |
def get_result(self, task_id): | |
# get the result Event for `task_id` | |
result = self.results.get(task_id) | |
if result is None: | |
return "missing" | |
# if the Event is ready, return its value | |
if result.ready(): | |
return result.wait() | |
return "pending" | |
def get_dependency(self, worker_ctx): | |
class TaskApi(object): | |
start_task = self.start_task | |
get_result = self.get_result | |
return TaskApi() | |
class TaskService(object): | |
name = "tasks" | |
processor = TaskProcessor() | |
@rpc | |
def start_task(self, name, *args, **kwargs): | |
return self.processor.start_task(name, args, kwargs) | |
@rpc | |
def get_result(self, task_id): | |
return self.processor.get_result(task_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment