Last active
May 21, 2018 15:21
-
-
Save ionelmc/4791c8dba3002f4f415321fa76cc9745 to your computer and use it in GitHub Desktop.
A more sophisticated celery fixture (logs + tree term)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import subprocess | |
import sys | |
import time | |
import psutil | |
import pytest | |
def _process_cleanup(pid, *args, **kwargs): | |
timeout = kwargs.pop('timeout', 0.25) | |
assert kwargs.keys() == [] | |
print("<<<<<", file=sys.stderr) | |
print("<<<<< Stopping process %s %s" % (pid, args), file=sys.stderr) | |
print("<<<<<", file=sys.stderr) | |
try: | |
parent = psutil.Process(pid) | |
if not parent.is_running(): | |
raise psutil.NoSuchProcess | |
except psutil.NoSuchProcess: | |
print("NOTICE: Process is already dead! Command line: ", args, file=sys.stderr) | |
return | |
process_tree = [] | |
process_tree.append(parent) | |
process_tree.extend(parent.children(recursive=True)) | |
try: | |
parent.terminate() | |
except psutil.NoSuchProcess: | |
pass | |
for _ in range(int(timeout * 100)): | |
if not parent.is_running(): | |
break | |
time.sleep(0.01) | |
for process in process_tree: | |
try: | |
process.kill() | |
except psutil.NoSuchProcess: | |
pass | |
def _subprocess_fixture_helper(request, logfile, args): | |
loghandle = open(str(logfile), 'wb') | |
request.addfinalizer(loghandle.close) | |
process = subprocess.Popen( | |
args, | |
bufsize=0, | |
stdout=loghandle, | |
stderr=loghandle, | |
) | |
request.addfinalizer(lambda: _process_cleanup(process.pid, *args)) | |
return process | |
@pytest.fixture(scope="session") | |
def celery(request, session_tmpdir): | |
""" | |
Note that this fixture doesn't use the 'db' fixture (and force using test to specify it explicitly, BEFORE 'celery') | |
because the 'db' fixture is function-scoped. | |
You must explicitly request the 'db' fixture before this! | |
""" | |
logfile = session_tmpdir.join('celery.log') | |
_subprocess_fixture_helper(request, logfile, [ | |
sys.executable, | |
'-u', | |
'-mcelery', | |
'worker', | |
'--loglevel=DEBUG', | |
'--app=something.celeryapp', | |
'--without-mingle', | |
'--without-gossip', | |
'--without-heartbeat', | |
]) | |
def wait_ready(*required_queues): | |
from something.celeryapp import app | |
required = set(required_queues) | |
for _ in range(60): | |
data = app.control.inspect(timeout=1).active_queues() | |
if data is None: | |
continue | |
all_queues = {queue['name'] for queues in data.values() for queue in queues} | |
required.difference_update(all_queues) | |
if not required: | |
break | |
else: | |
raise RuntimeError("Workers for queues %s didn't start in time!\n\n*** LOGS:\n\n%s" % ( | |
required, | |
logfile.read(), | |
)) | |
wait_ready('celery') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment