Last active
November 3, 2021 10:29
-
-
Save jxskiss/aadf53659837b2052145cfc1c37cc8ea to your computer and use it in GitHub Desktop.
Get celery task results within tornado, celery [issues#3577](https://github.com/celery/celery/issues/3577)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Tornado app.py | |
from tornado import web, options | |
from tornado.platform.asyncio import AsyncIOMainLoop | |
import asyncio | |
AsyncIOMainLoop().install() | |
import tasks | |
from celery_task_utils import CeleryTaskProxy | |
class AddHandler(web.RequestHandler): | |
@property | |
def celery_proxy(self): | |
return self.application.settings['celery_proxy'] | |
async def get(self): | |
x, y, sleep = map(lambda arg: int(self.get_argument(arg)), | |
('x', 'y', 'sleep')) | |
result = await self.celery_proxy.commit(tasks.add, x, y, sleep) | |
self.finish(str(result)) | |
options.parse_command_line() | |
app = web.Application( | |
handlers=[ | |
(r'/add', AddHandler), | |
], | |
debug=True, | |
celery_proxy=CeleryTaskProxy() | |
) | |
app.listen(8000) | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
finally: | |
loop.run_until_complete( | |
app.settings['celery_proxy'].cleanup()) | |
loop.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Celery Task Proxy | |
from aioredis.errors import ChannelClosedError, ConnectionClosedError | |
from celery.exceptions import ImproperlyConfigured | |
import aioredis | |
import asyncio | |
import json | |
import logging | |
import re | |
_logger = logging.getLogger(__name__) | |
class CeleryTaskProxy(object): | |
""" | |
Commit celery task using this when you need to retrieve the result within | |
Tornado/asyncio loop, ONLY redis backend is supported. | |
NOTE: this is not a common case when using celery and is not recommended. | |
""" | |
def __init__(self, celery_app=None, loop=None): | |
if celery_app is None: | |
celery_app = celery.current_app | |
self.backend = celery_app.conf['result_backend'] or '' | |
self.backend_db = None | |
self.pool = None | |
self.pending_tasks = {} | |
self.loop = loop or asyncio.get_event_loop() | |
self.initialize() | |
def initialize(self): | |
match = re.match( | |
r'redis://(?::(\w+)?@)?([\w.]+):(\d+)/(\d{1,2})', | |
self.backend | |
) | |
if not match: | |
raise ImproperlyConfigured( | |
'redis backend improperly configured: %s', self.backend) | |
password, host, port, db_idx = match.groups() | |
self.backend_db = int(db_idx or 0) | |
self.pool = self.loop.run_until_complete( | |
aioredis.create_pool( | |
(host, int(port)), | |
db=self.backend_db, password=password, | |
minsize=2)) | |
try: | |
self.loop.run_until_complete(self._check_redis_option()) | |
except ImproperlyConfigured: | |
self.pool.close() | |
self.loop.run_until_complete(self.pool.wait_closed()) | |
raise | |
self.loop.call_soon(asyncio.ensure_future, self.handle_events()) | |
async def _check_redis_option(self): | |
with await self.pool as red: | |
redis_config = await red.config_get('notify-keyspace-events') | |
notify_conf = redis_config.get('notify-keyspace-events') | |
_logger.info('redis server notify-keyspace-events config: "%s"', | |
notify_conf) | |
if not (notify_conf and ( | |
('E' in notify_conf and 'A' in notify_conf) or | |
('E' in notify_conf and '$' in notify_conf))): | |
raise ImproperlyConfigured( | |
'redis server notify-keyspace-events improperly ' | |
'configured: "%s"' % (notify_conf, )) | |
async def cleanup(self): | |
self.pool.close() | |
await self.pool.wait_closed() | |
async def handle_events(self): | |
while not (self.loop.is_closed() or self.pool.closed): | |
try: | |
await self._handle_events() | |
except (ChannelClosedError, ConnectionClosedError) as err: | |
_logger.warning( | |
'unexpected %s error, retrying to connect to redis ' | |
'in 2 seconds', type(err)) | |
await asyncio.sleep(2) | |
except ConnectionRefusedError as err: | |
_logger.warning( | |
'connecting to redis refused, retrying to connect ' | |
'in 10 seconds') | |
await asyncio.sleep(10) | |
except Exception as err: | |
if self.loop.is_closed(): | |
return | |
_logger.exception('unhandled error: %s', err) | |
async def _handle_events(self): | |
with await self.pool as sub: | |
channel = '__keyevent@{}__:set'.format(self.backend_db) | |
_logger.debug('subscribing channel: %s', channel) | |
channel = (await sub.psubscribe(channel))[0] | |
async for msg in channel.iter(): | |
_logger.debug('message: %s', msg) | |
key = msg[1] | |
if key not in self.pending_tasks: | |
continue | |
await self.on_task_result(key) | |
async def on_task_result(self, backend_key): | |
fut = self.pending_tasks.pop(backend_key) | |
with await self.pool as res_client: | |
result = await res_client.get(backend_key) | |
result = json.loads(result.decode('utf-8'))['result'] | |
fut.set_result(result) | |
async def commit(self, task, *args, | |
callback=None, timeout=30, **kwargs): | |
task_result = task.delay(*args, **kwargs) | |
backend_key = task.backend.get_key_for_task(task_result.task_id) | |
fut = self.loop.create_future() | |
self.pending_tasks[backend_key] = fut | |
_logger.debug(self.pending_tasks) | |
if callback: | |
fut.add_done_callback(lambda f: callback(f.result())) | |
if timeout <= 0: | |
result = await fut | |
return result | |
try: | |
result = await asyncio.wait_for(fut, timeout=timeout) | |
except asyncio.futures.TimeoutError as err: | |
self.pending_tasks.pop(backend_key) | |
raise | |
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Celery tasks.py | |
from celery import Celery | |
import time | |
app = Celery('tasks', | |
broker='redis://:@127.0.0.1:6379/0', | |
backend='redis://:@127.0.0.1:6379/1') | |
@app.task | |
def add(x, y, sleep=0): | |
if sleep: | |
time.sleep(sleep) | |
return x + y |
Great work on this, just ran into a problem with subscription connections becoming unresponsive after some time, without throwing any exceptions.
If you run into this issue with the latest version of Redis change the defaults in redis.conf to:
timeout 600
tcp-keepalive 60
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Forgot to mention, the redis server must be configured with "notify-keyspace-events" at least contains "E$", and version 2.8+ is required.