Created
February 7, 2018 02:09
-
-
Save sjlongland/a8acc05e399ec1700d61fe11aba2f0f4 to your computer and use it in GitHub Desktop.
pika test case: channel close bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh -ex | |
: ${PYTHON2:=$( which python2 )} | |
: ${PYTHON3:=$( which python3 )} | |
pass="" | |
fail="" | |
for python in ${PYTHON2} ${PYTHON3}; do | |
for pika in 0.9.14 0.10.0 0.11.2; do | |
ver=$( ${python} --version 2>&1 | cut -f 2 -d' ' ) | |
virtenv=${PWD}/py${ver}-pika${pika} | |
${python} -m virtualenv ${virtenv} | |
${virtenv}/bin/pip install pika==${pika} tornado | |
if ${virtenv}/bin/python testpika.py > test-py${ver}-pika${pika}.log 2>&1 ; then | |
pass="${pass} py${ver}-pika${pika}" | |
else | |
fail="${fail} py${ver}-pika${pika}" | |
cat test-py${ver}-pika${pika}.log | |
fi | |
done | |
done | |
set +x | |
echo "PASS: ${pass}" | |
echo "FAIL: ${fail}" | |
if [ -n "${fail}" ]; then | |
exit 1 | |
fi |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import functools | |
from sys import stdout | |
from sys import version as PYTHON_VERSION | |
from tornado.ioloop import IOLoop | |
from pika.adapters import TornadoConnection | |
from pika.connection import URLParameters | |
from pika import __version__ as PIKA_VERSION | |
class PikaTestCase(object): | |
""" | |
Our test case for Pika handling channel close events. | |
""" | |
AMQP_URI = 'amqp://guest:guest@localhost:5672/%2f' | |
NUM_CH = 100 | |
def __init__(self, io_loop=None): | |
if io_loop is None: | |
io_loop = IOLoop.current() | |
self._io_loop = io_loop | |
self._connection = None | |
self._channel = {} | |
self._first = False | |
self._test_done = False | |
self._declared = set() | |
self._declaring = set() | |
self._ch_cycles = {} | |
self._watchdog = None | |
io_loop.add_callback(self._connect) | |
def _force_shutdown(): | |
logging.error('Test timed out') | |
self._set_result('Test timed out') | |
self._io_loop.stop() | |
self._io_loop.add_timeout( | |
self._io_loop.time() + 30, _force_shutdown) | |
self.result = None | |
def _set_result(self, result): | |
if self.result is None: | |
logging.info('Storing result: %s', result) | |
self.result = result | |
def _connect(self): | |
try: | |
logging.info('Opening connection to broker') | |
self._connection = TornadoConnection( | |
parameters=URLParameters(self.AMQP_URI), | |
on_open_callback=self._on_conn_open, | |
on_open_error_callback=self._on_conn_open_err, | |
on_close_callback=self._on_conn_close, | |
custom_ioloop=self._io_loop) | |
except: | |
logging.exception('Failed to connect') | |
self._set_result('Failed to connect') | |
self._io_loop.stop() | |
def _on_conn_open_err(self, *args, **kwargs): | |
logging.error('Failed to open connection; handler called with '\ | |
'args=%s, kwargs=%s', args, kwargs) | |
self._io_loop.stop() | |
def _on_conn_close(self, connection, res_code, res_reason): | |
logging.info('Connection closed; code %s reason %s', | |
res_code, res_reason) | |
if res_code not in (0, 200): | |
self._set_result('CLOSED: %s %s' % (res_code, res_reason)) | |
self._io_loop.stop() | |
def _on_conn_open(self, *args, **kwargs): | |
logging.info('Connection opened; handler called with '\ | |
'args=%s, kwargs=%s', args, kwargs) | |
# Now, declare some channels | |
for ch in range(0, self.NUM_CH): | |
self._ch_cycles[ch] = 3 | |
self._connection.channel( | |
on_open_callback=functools.partial( | |
self._on_chan_open, ch)) | |
def _on_chan_open(self, ch, channel): | |
logging.info('Channel %s opened (first=%s)', ch, self._first) | |
if self._test_done: | |
logging.info('Closing newly opened channel') | |
channel.close() | |
return | |
self._channel[ch] = channel | |
channel.add_on_close_callback( | |
functools.partial( | |
self._on_chan_close, ch)) | |
if self._first: | |
self._io_loop.add_callback(self._run_test) | |
else: | |
self._io_loop.add_callback(self._check_channels) | |
def _check_channels(self): | |
# Do we have all channels yet? | |
if set(self._channel.keys()) == \ | |
set(range(0, self.NUM_CH)): | |
logging.info('All channels created') | |
self._first = True | |
self._io_loop.add_callback(self._run_test) | |
def _watchdog_timeout(self): | |
logging.error('Watchdog timeout!') | |
self._set_result('WATCHDOG TIMEOUT') | |
self._io_loop.add_callback(self._stop_test) | |
def _run_test(self): | |
# Try passively declaring exchanges. | |
# One will exist; two will not. Let's assume the use case for this is | |
# that a daemon running elsewhere creates the other two exchanges, and | |
# that daemon is down (maybe booting up, maybe crashed). We need *our* | |
# daemon to remain connected and keep retrying until it succeeds. | |
if self._test_done: | |
logging.info('Run cancelled due to test shutdown') | |
return | |
logging.info('Running test') | |
# Reset watchdog | |
if self._watchdog is not None: | |
self._io_loop.remove_timeout(self._watchdog) | |
self._watchdog = self._io_loop.add_timeout( | |
self._io_loop.time() + 10, | |
self._watchdog_timeout) | |
for ch in range(0, self.NUM_CH): | |
if ch not in self._channel: | |
continue | |
if ch == 0: | |
ex_name = 'amq.fanout' | |
else: | |
ex_name = 'ex_%s' % ch | |
if (ch not in self._declared) and \ | |
(ch not in self._declaring): | |
logging.info('Declaring %s in channel %s', ex_name, ch) | |
self._declaring.add(ch) | |
# This should succeed | |
self._channel[ch].exchange_declare( | |
callback=functools.partial( | |
self._on_exchange_declared, ch), | |
exchange=ex_name, exchange_type='fanout', | |
passive=True) | |
def _on_exchange_declared(self, ch, *args, **kwargs): | |
logging.info('Channel %s declared exchange; handler called with '\ | |
'args=%s, kwargs=%s', ch, args, kwargs) | |
self._declaring.discard(ch) | |
self._declared.add(ch) | |
def _on_chan_close(self, ch, channel, reply_code, reply_text): | |
logging.info('Channel %s closed with reply code %s, '\ | |
'reason %s, cycles %s', | |
ch, reply_code, reply_text, self._ch_cycles.get(ch)) | |
self._declaring.discard(ch) | |
self._channel.pop(ch, None) | |
ch_num = channel.channel_number | |
if channel.is_open: | |
channel.close() | |
if self._test_done: | |
# Are all channels closed? | |
logging.info('Waiting for other channels to close') | |
if len(self._channel) == 0: | |
self._io_loop.add_callback(self._channels_closed) | |
elif self._ch_cycles.get(ch, 0) > 0: | |
# Try to re-open the channel, re-cycling the channel number | |
def _reopen(): | |
logging.info('Re-opening channel %s (AMQP ch %s)', ch, ch_num) | |
self._ch_cycles[ch] -= 1 | |
self._connection.channel( | |
channel_number=ch_num, | |
on_open_callback=functools.partial( | |
self._on_chan_open, ch)) | |
self._io_loop.add_callback(_reopen) | |
else: | |
# Exhausted attempts for channel | |
self._io_loop.add_callback(self._stop_test) | |
def _stop_test(self): | |
logging.info('Closing channels') | |
self._test_done = True | |
def _force_disconnect(): | |
logging.error('Channel clean-up timed out') | |
self._set_result('Channel clean-up timed out') | |
self._io_loop.add_callback(self._channels_closed) | |
self._io_loop.add_timeout( | |
self._io_loop.time() + 10, _force_disconnect) | |
for channel in self._channel.values(): | |
if channel.is_open: | |
channel.close() | |
def _channels_closed(self): | |
logging.info('Disconnecting') | |
self._connection.close() | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.DEBUG, stream=stdout, | |
format='%(asctime)s %(levelname)10s '\ | |
'%(name)16s: %(message)s') | |
logging.info('Starting up.\nPython: %s\nPika: %s', PYTHON_VERSION, PIKA_VERSION) | |
try: | |
testcase = PikaTestCase() | |
IOLoop.current().start() | |
assert testcase.result is None, 'Failed with result: %s' % testcase.result | |
except: | |
logging.exception('Test aborted') | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment