Skip to content

Instantly share code, notes, and snippets.

@ionelmc
Last active May 21, 2018 15:21
Show Gist options
  • Save ionelmc/4791c8dba3002f4f415321fa76cc9745 to your computer and use it in GitHub Desktop.
Save ionelmc/4791c8dba3002f4f415321fa76cc9745 to your computer and use it in GitHub Desktop.
A more sophisticated celery fixture (logs + tree term)
from __future__ import print_function
import subprocess
import sys
import time
import psutil
import pytest
def _process_cleanup(pid, *args, **kwargs):
timeout = kwargs.pop('timeout', 0.25)
assert kwargs.keys() == []
print("<<<<<", file=sys.stderr)
print("<<<<< Stopping process %s %s" % (pid, args), file=sys.stderr)
print("<<<<<", file=sys.stderr)
try:
parent = psutil.Process(pid)
if not parent.is_running():
raise psutil.NoSuchProcess
except psutil.NoSuchProcess:
print("NOTICE: Process is already dead! Command line: ", args, file=sys.stderr)
return
process_tree = []
process_tree.append(parent)
process_tree.extend(parent.children(recursive=True))
try:
parent.terminate()
except psutil.NoSuchProcess:
pass
for _ in range(int(timeout * 100)):
if not parent.is_running():
break
time.sleep(0.01)
for process in process_tree:
try:
process.kill()
except psutil.NoSuchProcess:
pass
def _subprocess_fixture_helper(request, logfile, args):
loghandle = open(str(logfile), 'wb')
request.addfinalizer(loghandle.close)
process = subprocess.Popen(
args,
bufsize=0,
stdout=loghandle,
stderr=loghandle,
)
request.addfinalizer(lambda: _process_cleanup(process.pid, *args))
return process
@pytest.fixture(scope="session")
def celery(request, session_tmpdir):
"""
Note that this fixture doesn't use the 'db' fixture (and force using test to specify it explicitly, BEFORE 'celery')
because the 'db' fixture is function-scoped.
You must explicitly request the 'db' fixture before this!
"""
logfile = session_tmpdir.join('celery.log')
_subprocess_fixture_helper(request, logfile, [
sys.executable,
'-u',
'-mcelery',
'worker',
'--loglevel=DEBUG',
'--app=something.celeryapp',
'--without-mingle',
'--without-gossip',
'--without-heartbeat',
])
def wait_ready(*required_queues):
from something.celeryapp import app
required = set(required_queues)
for _ in range(60):
data = app.control.inspect(timeout=1).active_queues()
if data is None:
continue
all_queues = {queue['name'] for queues in data.values() for queue in queues}
required.difference_update(all_queues)
if not required:
break
else:
raise RuntimeError("Workers for queues %s didn't start in time!\n\n*** LOGS:\n\n%s" % (
required,
logfile.read(),
))
wait_ready('celery')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment