Last active
June 3, 2024 14:14
-
-
Save mattbennett/4250ce5d56b36a99bc39 to your computer and use it in GitHub Desktop.
Nameko celery integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, request | |
from nameko.standalone.rpc import ServiceRpcProxy | |
app = Flask(__name__) | |
@app.route('/') | |
def task_list(): | |
return """ | |
<html> | |
<body> | |
<h1>Available Tasks</h1> | |
<h2>Fibonacci</h2> | |
<form action="/fibonacci" method="post"> | |
Number: | |
<input type="text" name="n"> | |
<input type="submit" value="Submit"> | |
</form> | |
</body> | |
</html> | |
""" | |
@app.route('/fibonacci', methods=['POST']) | |
def start_task(): | |
n = int(request.form['n']) | |
with rpc_proxy() as task_proxy: | |
task_id = task_proxy.start_task("fibonacci", n) | |
return """ | |
<html> | |
<body> | |
<p> | |
Your task is running. | |
<a href="/task/{task_id}">Result</a> | |
</p> | |
</body> | |
</html> | |
""".format(task_id=task_id) | |
@app.route('/task/<string:task_id>') | |
def task_result(task_id): | |
with rpc_proxy() as task_proxy: | |
result = task_proxy.get_result(task_id) | |
return """ | |
<html> | |
<body> | |
<p>The result of task {task_id} is {result}.</p> | |
</body> | |
</html> | |
""".format(task_id=task_id, result=result) | |
def rpc_proxy(): | |
# the ServiceRpcProxy instance isn't thread safe so we constuct one for | |
# each request; a more intelligent solution would be a thread-local or | |
# pool of shared proxies | |
config = {'AMQP_URI': 'amqp://guest:guest@localhost/'} | |
return ServiceRpcProxy('tasks', config) | |
if __name__ == '__main__': | |
app.run(debug=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
import eventlet | |
from eventlet.event import Event | |
from nameko.rpc import rpc | |
from nameko.extensions import DependencyProvider | |
# a simple task | |
def fibonacci(n): | |
a, b = 1, 1 | |
for i in xrange(n-1): | |
a, b = b, a+b | |
if n % 50 == 0: | |
eventlet.sleep() # won't yield voluntarily since there's no i/o | |
return a | |
class TaskProcessor(DependencyProvider): | |
def __init__(self): | |
self.tasks = { | |
'fibonacci': fibonacci | |
# add other tasks here | |
} | |
self.results = {} | |
def start_task(self, name, args, kwargs): | |
# generate unique id | |
task_id = uuid.uuid4().hex | |
# get the named task | |
task = self.tasks.get(name) | |
# execute it in a container thread and send the result to an Event | |
event = Event() | |
gt = self.container.spawn_managed_thread(lambda: task(*args, **kwargs)) | |
gt.link(lambda res: event.send(res.wait())) | |
# store the Event and return the task's unique id to the caller | |
self.results[task_id] = event | |
return task_id | |
def get_result(self, task_id): | |
# get the result Event for `task_id` | |
result = self.results.get(task_id) | |
if result is None: | |
return "missing" | |
# if the Event is ready, return its value | |
if result.ready(): | |
return result.wait() | |
return "pending" | |
def get_dependency(self, worker_ctx): | |
class TaskApi(object): | |
start_task = self.start_task | |
get_result = self.get_result | |
return TaskApi() | |
class TaskService(object): | |
name = "tasks" | |
processor = TaskProcessor() | |
@rpc | |
def start_task(self, name, *args, **kwargs): | |
return self.processor.start_task(name, args, kwargs) | |
@rpc | |
def get_result(self, task_id): | |
return self.processor.get_result(task_id) |
@mattbennett If the container is dead during the execution of the task, is the task lost?
@mattbennett Hi, is there any update concerning the question from @qileroro
Having the problem that when I define my RpcProxy, if that service has died, then the execution will hang indefinitely. I am unable to effect a timeout that works in conjunction with nameko here. Do you have any ideas/ input to help here?
Example of what I'm trying:
try:
with Timeout(1):
# Imagine this services container has recently died
data_service = RpcProxy("data")
except Timeout.Timeout:
data_service = None
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note that in a real-world scenario
TaskProcessor.results
should be an external shared resource (e.g. a database) rather than an in-process object, because services should be stateless.