Skip to content

Instantly share code, notes, and snippets.

@yyuu
Created April 16, 2012 15:44
Show Gist options
  • Select an option

  • Save yyuu/2399563 to your computer and use it in GitHub Desktop.

Select an option

Save yyuu/2399563 to your computer and use it in GitHub Desktop.
mon monitor plugin to test amqp server
#!/usr/bin/env python
from __future__ import with_statement
import logging
import optparse
import pika
import sys
import timeouts
import traceback
timeouts.monkey_patch()
def monitor(host, port, queue_name, timeout):
def _monitor(host, port, queue_name):
val = 'ok'
def receive_check_callback(ch, method, properties, body):
logging.debug(" [x] Received %r" % (body,))
if body != val:
raise ValueError('failed to get item from amqp server')
connection = None
try:
logging.debug('Conntecting to %s:%d...' % (host, port))
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port))
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_publish(exchange='', routing_key=queue_name, body=val)
channel.basic_consume(receive_check_callback, queue=queue_name, no_ack=True)
return True
except Exception, e:
logging.error(e)
return False
finally:
if connection is not None:
connection.close()
try:
with timeouts.Timeout(timeout):
return _monitor(host, port, queue_name)
except timeouts.Timeout, error:
logging.error('Timed out in %d seconds.' % (timeout,))
return False
def main(args):
parser = optparse.OptionParser("usage %prog [OPTIONS]", add_help_option=False)
parser.add_option('--help', action='help', help='show this message and exit.')
parser.add_option('-p', '--port', type='int', default=[], action='append', dest='ports', help='amqp port')
parser.add_option('-q', '--queue_name', type='str', dest='queue_name', help='queue name to check status')
parser.add_option('-t', '--timeout', type='int', default=10, dest='timeout', help='timeout')
parser.add_option('-v', '--verbose', default=False, action='store_true', dest='verbose', help='show verbose message')
(options, args) = parser.parse_args(args)
if options.queue_name is None:
raise ValueError("no queue name given for test")
if options.verbose:
logging.basicConfig(level=logging.DEBUG)
f = 0
ports = options.ports if 0 < len(options.ports) else [ 5672 ]
for host, port, stat in [ (host, port, monitor(host, port, options.queue_name, options.timeout)) for host in args[1:] for port in ports ]:
logging.debug("%s:%d#%s ==> %s" % (host, port, options.queue_name, "ok" if stat else "ERROR"))
if not stat:
f += 1
sys.exit(0 if f == 0 else 1)
if __name__ == "__main__":
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment