Skip to content

Instantly share code, notes, and snippets.

@oogali
Created April 12, 2015 00:37
Show Gist options
  • Save oogali/c7e864ac8728660e3c21 to your computer and use it in GitHub Desktop.
Save oogali/c7e864ac8728660e3c21 to your computer and use it in GitHub Desktop.
A test in sending HipChat messages using the v2 API and RabbitMQ
#!/usr/bin/env python
import sys
import json
import pika
import requests
def queue_callback(channel, method, properties, body):
msg = json.loads(body)
print msg
try:
to = msg['to']
api_token = msg['api_token']
except Exception, e:
print e
want_requeue = method.redelivered is False
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=want_requeue)
print '* rejected'
return
data = {
'color': msg.get('color', 'yellow'),
'notify': msg.get('notify', False),
'message_format': msg.get('format', 'html'),
'message': ''
}
if 'subject' in msg and len(msg['subject']) > 0:
if data['message_format'] == 'html':
data['message'] += '<b>{0}</b><br/>\n'.format(msg['subject'])
else:
data['message'] += '*** {0} ***\n'.format(msg['subject'])
data['message'] += msg['message']
_url = 'https://api.hipchat.com/v2/room/{0}/notification'.format(to)
r = requests.post(_url, params={'auth_token': api_token}, data=data)
if r.status_code >= 400:
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
print '* nacked'
else:
channel.basic_ack(delivery_tag=method.delivery_tag)
print '* acked'
print
print channel
print method
print properties
print body
print '* received'
print
def main(argv=None):
if argv is None:
argv = sys.argv
conn = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = conn.channel()
channel.queue_declare(queue='hipchat')
channel.basic_consume(queue_callback, queue='hipchat')
channel.start_consuming()
return 0
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python
import sys
import json
import pika
def main(argv=None):
if argv is None:
argv = sys.argv
conn = pika.BlockingConnection(pika.URLParameters('amqp://localhost'))
channel = conn.channel()
channel.queue_declare(queue='hipchat')
msg = {
'api_token': 'HIPCHAT API V2 TOKEN HERE',
'to': 'Development',
'format': 'html',
'message': 'Hello, this is a test message!',
'color': 'random',
'notify': False
}
channel.basic_publish(exchange='', routing_key='hipchat', body=json.dumps(msg))
print '* sent'
conn.close()
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment