-
-
Save lukebakken/ab49531e504dfd8f4c133fcb9994a7d7 to your computer and use it in GitHub Desktop.
pika test case: channel close bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
venv/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh -ex | |
: ${PYTHON2:=$( which python2 )} | |
: ${PYTHON3:=$( which python3 )} | |
pass="" | |
fail="" | |
for python in ${PYTHON2} ${PYTHON3}; do | |
for pika in 0.9.14 0.10.0 0.11.2; do | |
ver=$( ${python} --version 2>&1 | cut -f 2 -d' ' ) | |
virtenv=${PWD}/py${ver}-pika${pika} | |
${python} -m virtualenv ${virtenv} | |
${virtenv}/bin/pip install pika==${pika} tornado | |
if ${virtenv}/bin/python testpika.py > test-py${ver}-pika${pika}.log 2>&1 ; then | |
pass="${pass} py${ver}-pika${pika}" | |
else | |
fail="${fail} py${ver}-pika${pika}" | |
cat test-py${ver}-pika${pika}.log | |
fi | |
done | |
done | |
set +x | |
echo "PASS: ${pass}" | |
echo "FAIL: ${fail}" | |
if [ -n "${fail}" ]; then | |
exit 1 | |
fi |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import functools | |
import sys | |
from sys import stdout | |
from sys import version as PYTHON_VERSION | |
from tornado.ioloop import IOLoop | |
from pika.adapters import TornadoConnection | |
from pika.connection import URLParameters | |
from pika import __version__ as PIKA_VERSION | |
class PikaTestCase(object): | |
""" | |
Our test case for Pika handling channel close events. | |
""" | |
AMQP_URI = 'amqp://guest:guest@localhost:5672/%2f' | |
def __init__(self, num_channels, io_loop=None): | |
if io_loop is None: | |
io_loop = IOLoop.current() | |
self._num_channels = num_channels + 1 # Increment by 1 since we're starting at index 1 | |
self._io_loop = io_loop | |
self._connection = None | |
self._channels = {} | |
self._all_channels_created = False | |
self._test_done = False | |
self._declared = set() | |
self._declaring = set() | |
self._ch_cycles = {} | |
self._watchdog = None | |
io_loop.add_callback(self._connect) | |
def _force_shutdown(): | |
logging.error('Test timed out') | |
self._set_result('Test timed out') | |
self._io_loop.stop() | |
self._io_loop.add_timeout( | |
self._io_loop.time() + 30, _force_shutdown) | |
self.result = None | |
def _set_result(self, result): | |
if self.result is None: | |
logging.info('Storing result: %s', result) | |
self.result = result | |
def _connect(self): | |
try: | |
logging.info('Opening connection to broker') | |
self._connection = TornadoConnection( | |
parameters=URLParameters(self.AMQP_URI), | |
on_open_callback=self._on_conn_open, | |
on_open_error_callback=self._on_conn_open_err, | |
on_close_callback=self._on_conn_close, | |
custom_ioloop=self._io_loop) | |
except: | |
logging.exception('Failed to connect') | |
self._set_result('Failed to connect') | |
self._io_loop.stop() | |
def _on_conn_open_err(self, *args, **kwargs): | |
logging.error('Failed to open connection; handler called with '\ | |
'args=%s, kwargs=%s', args, kwargs) | |
self._io_loop.stop() | |
def _on_conn_close(self, connection, res_code, res_reason): | |
logging.info('Connection closed; code %s reason %s', | |
res_code, res_reason) | |
if res_code not in (0, 200): | |
self._set_result('CLOSED: %s %s' % (res_code, res_reason)) | |
self._io_loop.stop() | |
def _on_conn_open(self, *args, **kwargs): | |
logging.info('Connection opened; handler called with '\ | |
'args=%s, kwargs=%s', args, kwargs) | |
# Now, declare some channels | |
for ch_idx in range(1, self._num_channels): | |
self._ch_cycles[ch_idx] = 3 | |
cb = functools.partial(self._on_chan_open, ch_idx) | |
self._connection.channel(on_open_callback=cb) | |
def _on_chan_open(self, ch_idx, channel): | |
logging.info('Channel %s opened (_all_channels_created=%s)', ch_idx, self._all_channels_created) | |
if self._test_done: | |
logging.info('Closing newly opened channel') | |
channel.close() | |
return | |
self._channels[ch_idx] = channel | |
cb = functools.partial(self._on_chan_close, ch_idx) | |
channel.add_on_close_callback(cb) | |
if self._all_channels_created: | |
self._io_loop.add_callback(self._run_test) | |
else: | |
self._io_loop.add_callback(self._check_channels) | |
def _check_channels(self): | |
# Do we have all channels yet? | |
if set(self._channels.keys()) == set(range(1, self._num_channels)): | |
logging.info('All channels created') | |
self._all_channels_created = True | |
self._io_loop.add_callback(self._run_test) | |
else: | |
self._all_channels_created = False | |
def _watchdog_timeout(self): | |
logging.error('Watchdog timeout!') | |
self._set_result('WATCHDOG TIMEOUT') | |
self._io_loop.add_callback(self._stop_test) | |
def _run_test(self): | |
# Try passively declaring exchanges. | |
# One will exist; two will not. Let's assume the use case for this is | |
# that a daemon running elsewhere creates the other two exchanges, and | |
# that daemon is down (maybe booting up, maybe crashed). We need *our* | |
# daemon to remain connected and keep retrying until it succeeds. | |
if self._test_done: | |
logging.info('Run cancelled due to test shutdown') | |
return | |
logging.info('Running test') | |
# Reset watchdog | |
if self._watchdog is not None: | |
self._io_loop.remove_timeout(self._watchdog) | |
wd_time = self._io_loop.time() + 10 | |
self._watchdog = self._io_loop.add_timeout(wd_time, self._watchdog_timeout) | |
for ch_idx, channel in self._channels.items(): | |
if ch_idx == 1: | |
ex_name = 'amq.fanout' | |
else: | |
ex_name = 'ex_%s' % ch_idx | |
if (ch_idx not in self._declared) and (ch_idx not in self._declaring): | |
logging.info("Passively declaring exchange '%s' in channel '%s'", ex_name, ch_idx) | |
self._declaring.add(ch_idx) | |
# This should succeed for idx 0, | |
# fail for others | |
cb = callback=functools.partial(self._on_exchange_declared, ch_idx, ex_name) | |
channel.exchange_declare(callback=cb, exchange=ex_name, exchange_type='fanout', passive=True) | |
def _on_exchange_declared(self, ch_idx, ex_name, *args, **kwargs): | |
logging.info('Channel %s declared exchange %s; handler called with '\ | |
'args=%s, kwargs=%s', ch_idx, ex_name, args, kwargs) | |
self._declaring.discard(ch_idx) | |
self._declared.add(ch_idx) | |
def _on_chan_close(self, ch_idx, channel, reply_code, reply_text): | |
if channel.is_open: | |
logging.error('Channel %s should be closed, but is open?!?', ch_idx) | |
channel.close() | |
return | |
logging.info('Channel %s closed with reply code %s, '\ | |
'reason %s, cycles %s', | |
ch_idx, reply_code, reply_text, self._ch_cycles.get(ch_idx)) | |
self._all_channels_created = False | |
self._declaring.discard(ch_idx) | |
self._declared.discard(ch_idx) | |
self._channels.pop(ch_idx) | |
ch_num = channel.channel_number | |
if self._test_done: | |
# Are all channels closed? | |
if len(self._channels) == 0: | |
self._io_loop.add_callback(self._channels_closed) | |
else: | |
key_str = ' '.join(str(k) for k in self._channels.keys()) | |
logging.info('Waiting for other channels to close: %s', key_str) | |
elif self._ch_cycles.get(ch_idx, 0) > 0: | |
# Try to re-open the channel, re-cycling the channel number | |
def _reopen(): | |
logging.info('Re-opening channel %s (AMQP ch %s)', ch_idx, ch_num) | |
self._ch_cycles[ch_idx] -= 1 | |
cb = on_open_callback=functools.partial(self._on_chan_open, ch_idx) | |
self._connection.channel(channel_number=ch_num, on_open_callback=cb) | |
self._io_loop.add_callback(_reopen) | |
else: | |
# Exhausted attempts for one channel, stopping test | |
if self._test_done: | |
logging.info('Channel %s (AMQP ch %s) exhausted attempts, test already stopped', ch_idx, ch_num) | |
else: | |
logging.info('Channel %s (AMQP ch %s) exhausted attempts, STOPPING TEST', ch_idx, ch_num) | |
self._io_loop.add_callback(self._stop_test) | |
def _stop_test(self): | |
if self._test_done: | |
logging.info('Test already stopped, not closing channels') | |
return | |
else: | |
logging.info('Stopping test and closing channels') | |
self._test_done = True | |
def _force_disconnect(): | |
logging.error('force disconnect: channel clean-up timed out') | |
self._set_result('force disconnect: channel clean-up timed out') | |
self._io_loop.add_callback(self._channels_closed) | |
t = self._io_loop.time() + 10 | |
self._io_loop.add_timeout(t, _force_disconnect) | |
for ch_idx, channel in self._channels.items(): | |
ch_num = channel.channel_number | |
if channel.is_open: | |
logging.info('Closing channel %s (AMQP ch %s)', ch_idx, ch_num) | |
channel.close(reply_code=ch_idx) | |
else: | |
logging.warning('Channel %s (AMQP ch %s) is no longer open, not closing', ch_idx, ch_num) | |
def _channels_closed(self): | |
logging.info('Disconnecting') | |
self._connection.close() | |
if __name__ == '__main__': | |
num_channels = int(sys.argv[1]) | |
logging.basicConfig(level=logging.DEBUG, stream=stdout, | |
format='%(asctime)s %(levelname)10s '\ | |
'%(name)16s: %(message)s') | |
logging.info('Starting up.\nPython: %s\nPika: %s', PYTHON_VERSION, PIKA_VERSION) | |
try: | |
testcase = PikaTestCase(num_channels) | |
IOLoop.current().start() | |
assert testcase.result is None, 'Failed with result: %s' % testcase.result | |
except: | |
logging.exception('Test aborted') | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment