Skip to content

Instantly share code, notes, and snippets.

@sjlongland
Created February 7, 2018 02:09
Show Gist options
  • Save sjlongland/a8acc05e399ec1700d61fe11aba2f0f4 to your computer and use it in GitHub Desktop.
Save sjlongland/a8acc05e399ec1700d61fe11aba2f0f4 to your computer and use it in GitHub Desktop.
pika test case: channel close bug
#!/bin/sh -ex
: ${PYTHON2:=$( which python2 )}
: ${PYTHON3:=$( which python3 )}
pass=""
fail=""
for python in ${PYTHON2} ${PYTHON3}; do
for pika in 0.9.14 0.10.0 0.11.2; do
ver=$( ${python} --version 2>&1 | cut -f 2 -d' ' )
virtenv=${PWD}/py${ver}-pika${pika}
${python} -m virtualenv ${virtenv}
${virtenv}/bin/pip install pika==${pika} tornado
if ${virtenv}/bin/python testpika.py > test-py${ver}-pika${pika}.log 2>&1 ; then
pass="${pass} py${ver}-pika${pika}"
else
fail="${fail} py${ver}-pika${pika}"
cat test-py${ver}-pika${pika}.log
fi
done
done
set +x
echo "PASS: ${pass}"
echo "FAIL: ${fail}"
if [ -n "${fail}" ]; then
exit 1
fi
import logging
import functools
from sys import stdout
from sys import version as PYTHON_VERSION
from tornado.ioloop import IOLoop
from pika.adapters import TornadoConnection
from pika.connection import URLParameters
from pika import __version__ as PIKA_VERSION
class PikaTestCase(object):
"""
Our test case for Pika handling channel close events.
"""
AMQP_URI = 'amqp://guest:guest@localhost:5672/%2f'
NUM_CH = 100
def __init__(self, io_loop=None):
if io_loop is None:
io_loop = IOLoop.current()
self._io_loop = io_loop
self._connection = None
self._channel = {}
self._first = False
self._test_done = False
self._declared = set()
self._declaring = set()
self._ch_cycles = {}
self._watchdog = None
io_loop.add_callback(self._connect)
def _force_shutdown():
logging.error('Test timed out')
self._set_result('Test timed out')
self._io_loop.stop()
self._io_loop.add_timeout(
self._io_loop.time() + 30, _force_shutdown)
self.result = None
def _set_result(self, result):
if self.result is None:
logging.info('Storing result: %s', result)
self.result = result
def _connect(self):
try:
logging.info('Opening connection to broker')
self._connection = TornadoConnection(
parameters=URLParameters(self.AMQP_URI),
on_open_callback=self._on_conn_open,
on_open_error_callback=self._on_conn_open_err,
on_close_callback=self._on_conn_close,
custom_ioloop=self._io_loop)
except:
logging.exception('Failed to connect')
self._set_result('Failed to connect')
self._io_loop.stop()
def _on_conn_open_err(self, *args, **kwargs):
logging.error('Failed to open connection; handler called with '\
'args=%s, kwargs=%s', args, kwargs)
self._io_loop.stop()
def _on_conn_close(self, connection, res_code, res_reason):
logging.info('Connection closed; code %s reason %s',
res_code, res_reason)
if res_code not in (0, 200):
self._set_result('CLOSED: %s %s' % (res_code, res_reason))
self._io_loop.stop()
def _on_conn_open(self, *args, **kwargs):
logging.info('Connection opened; handler called with '\
'args=%s, kwargs=%s', args, kwargs)
# Now, declare some channels
for ch in range(0, self.NUM_CH):
self._ch_cycles[ch] = 3
self._connection.channel(
on_open_callback=functools.partial(
self._on_chan_open, ch))
def _on_chan_open(self, ch, channel):
logging.info('Channel %s opened (first=%s)', ch, self._first)
if self._test_done:
logging.info('Closing newly opened channel')
channel.close()
return
self._channel[ch] = channel
channel.add_on_close_callback(
functools.partial(
self._on_chan_close, ch))
if self._first:
self._io_loop.add_callback(self._run_test)
else:
self._io_loop.add_callback(self._check_channels)
def _check_channels(self):
# Do we have all channels yet?
if set(self._channel.keys()) == \
set(range(0, self.NUM_CH)):
logging.info('All channels created')
self._first = True
self._io_loop.add_callback(self._run_test)
def _watchdog_timeout(self):
logging.error('Watchdog timeout!')
self._set_result('WATCHDOG TIMEOUT')
self._io_loop.add_callback(self._stop_test)
def _run_test(self):
# Try passively declaring exchanges.
# One will exist; two will not. Let's assume the use case for this is
# that a daemon running elsewhere creates the other two exchanges, and
# that daemon is down (maybe booting up, maybe crashed). We need *our*
# daemon to remain connected and keep retrying until it succeeds.
if self._test_done:
logging.info('Run cancelled due to test shutdown')
return
logging.info('Running test')
# Reset watchdog
if self._watchdog is not None:
self._io_loop.remove_timeout(self._watchdog)
self._watchdog = self._io_loop.add_timeout(
self._io_loop.time() + 10,
self._watchdog_timeout)
for ch in range(0, self.NUM_CH):
if ch not in self._channel:
continue
if ch == 0:
ex_name = 'amq.fanout'
else:
ex_name = 'ex_%s' % ch
if (ch not in self._declared) and \
(ch not in self._declaring):
logging.info('Declaring %s in channel %s', ex_name, ch)
self._declaring.add(ch)
# This should succeed
self._channel[ch].exchange_declare(
callback=functools.partial(
self._on_exchange_declared, ch),
exchange=ex_name, exchange_type='fanout',
passive=True)
def _on_exchange_declared(self, ch, *args, **kwargs):
logging.info('Channel %s declared exchange; handler called with '\
'args=%s, kwargs=%s', ch, args, kwargs)
self._declaring.discard(ch)
self._declared.add(ch)
def _on_chan_close(self, ch, channel, reply_code, reply_text):
logging.info('Channel %s closed with reply code %s, '\
'reason %s, cycles %s',
ch, reply_code, reply_text, self._ch_cycles.get(ch))
self._declaring.discard(ch)
self._channel.pop(ch, None)
ch_num = channel.channel_number
if channel.is_open:
channel.close()
if self._test_done:
# Are all channels closed?
logging.info('Waiting for other channels to close')
if len(self._channel) == 0:
self._io_loop.add_callback(self._channels_closed)
elif self._ch_cycles.get(ch, 0) > 0:
# Try to re-open the channel, re-cycling the channel number
def _reopen():
logging.info('Re-opening channel %s (AMQP ch %s)', ch, ch_num)
self._ch_cycles[ch] -= 1
self._connection.channel(
channel_number=ch_num,
on_open_callback=functools.partial(
self._on_chan_open, ch))
self._io_loop.add_callback(_reopen)
else:
# Exhausted attempts for channel
self._io_loop.add_callback(self._stop_test)
def _stop_test(self):
logging.info('Closing channels')
self._test_done = True
def _force_disconnect():
logging.error('Channel clean-up timed out')
self._set_result('Channel clean-up timed out')
self._io_loop.add_callback(self._channels_closed)
self._io_loop.add_timeout(
self._io_loop.time() + 10, _force_disconnect)
for channel in self._channel.values():
if channel.is_open:
channel.close()
def _channels_closed(self):
logging.info('Disconnecting')
self._connection.close()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, stream=stdout,
format='%(asctime)s %(levelname)10s '\
'%(name)16s: %(message)s')
logging.info('Starting up.\nPython: %s\nPika: %s', PYTHON_VERSION, PIKA_VERSION)
try:
testcase = PikaTestCase()
IOLoop.current().start()
assert testcase.result is None, 'Failed with result: %s' % testcase.result
except:
logging.exception('Test aborted')
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment