Skip to content

Instantly share code, notes, and snippets.

@mattbennett
Last active June 3, 2024 14:14
Show Gist options
  • Save mattbennett/4250ce5d56b36a99bc39 to your computer and use it in GitHub Desktop.
Save mattbennett/4250ce5d56b36a99bc39 to your computer and use it in GitHub Desktop.
Nameko celery integration
from flask import Flask, request
from nameko.standalone.rpc import ServiceRpcProxy
app = Flask(__name__)
@app.route('/')
def task_list():
return """
<html>
<body>
<h1>Available Tasks</h1>
<h2>Fibonacci</h2>
<form action="/fibonacci" method="post">
Number:
<input type="text" name="n">
<input type="submit" value="Submit">
</form>
</body>
</html>
"""
@app.route('/fibonacci', methods=['POST'])
def start_task():
n = int(request.form['n'])
with rpc_proxy() as task_proxy:
task_id = task_proxy.start_task("fibonacci", n)
return """
<html>
<body>
<p>
Your task is running.
<a href="/task/{task_id}">Result</a>
</p>
</body>
</html>
""".format(task_id=task_id)
@app.route('/task/<string:task_id>')
def task_result(task_id):
with rpc_proxy() as task_proxy:
result = task_proxy.get_result(task_id)
return """
<html>
<body>
<p>The result of task {task_id} is {result}.</p>
</body>
</html>
""".format(task_id=task_id, result=result)
def rpc_proxy():
# the ServiceRpcProxy instance isn't thread safe so we constuct one for
# each request; a more intelligent solution would be a thread-local or
# pool of shared proxies
config = {'AMQP_URI': 'amqp://guest:guest@localhost/'}
return ServiceRpcProxy('tasks', config)
if __name__ == '__main__':
app.run(debug=True)
import uuid
import eventlet
from eventlet.event import Event
from nameko.rpc import rpc
from nameko.extensions import DependencyProvider
# a simple task
def fibonacci(n):
a, b = 1, 1
for i in xrange(n-1):
a, b = b, a+b
if n % 50 == 0:
eventlet.sleep() # won't yield voluntarily since there's no i/o
return a
class TaskProcessor(DependencyProvider):
def __init__(self):
self.tasks = {
'fibonacci': fibonacci
# add other tasks here
}
self.results = {}
def start_task(self, name, args, kwargs):
# generate unique id
task_id = uuid.uuid4().hex
# get the named task
task = self.tasks.get(name)
# execute it in a container thread and send the result to an Event
event = Event()
gt = self.container.spawn_managed_thread(lambda: task(*args, **kwargs))
gt.link(lambda res: event.send(res.wait()))
# store the Event and return the task's unique id to the caller
self.results[task_id] = event
return task_id
def get_result(self, task_id):
# get the result Event for `task_id`
result = self.results.get(task_id)
if result is None:
return "missing"
# if the Event is ready, return its value
if result.ready():
return result.wait()
return "pending"
def get_dependency(self, worker_ctx):
class TaskApi(object):
start_task = self.start_task
get_result = self.get_result
return TaskApi()
class TaskService(object):
name = "tasks"
processor = TaskProcessor()
@rpc
def start_task(self, name, *args, **kwargs):
return self.processor.start_task(name, args, kwargs)
@rpc
def get_result(self, task_id):
return self.processor.get_result(task_id)
@SerRichard
Copy link

SerRichard commented Nov 19, 2021

@mattbennett Hi, is there any update concerning the question from @qileroro

Having the problem that when I define my RpcProxy, if that service has died, then the execution will hang indefinitely. I am unable to effect a timeout that works in conjunction with nameko here. Do you have any ideas/ input to help here?

Example of what I'm trying:

try:
    with Timeout(1):
         # Imagine this services container has recently died
        data_service = RpcProxy("data")
except Timeout.Timeout:
    data_service = None

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment