Skip to content

Instantly share code, notes, and snippets.

@jreyes1108
Created October 16, 2009 20:09
Show Gist options
  • Save jreyes1108/212037 to your computer and use it in GitHub Desktop.
Save jreyes1108/212037 to your computer and use it in GitHub Desktop.
Antelope 2 AMQP
#!/usr/bin/env python
"""
Read messages from the AMQP server using py-amqplib
2009-09-15 Juan Reyes <[email protected]>
"""
from amqplib import client_0_8 as amqp
import sys
import time
conn = amqp.Connection(host=" ", userid=" ", password=" ", virtual_host="/sqlstream", insist=True)
chan = conn.channel()
chan.queue_declare(queue='TEST', durable=False, exclusive=False, auto_delete=False)
chan.exchange_declare(exchange='HEX_DATA', type='fanout', durable=False, auto_delete=True)
chan.exchange_declare(exchange='BIN_DATA', type='fanout', durable=False, auto_delete=True)
chan.queue_bind(queue='TEST', exchange='HEX_DATA')
chan.queue_bind(queue='TEST', exchange='BIN_DATA')
def recv_callback(msg):
t = time.time()
print '\nReceived '+ str(t) + ': ' + str(msg.application_headers)
print '\t' + str(msg.body) + '\n'
chan.basic_consume(queue="TEST", no_ack=True, callback=recv_callback, consumer_tag="mysqlstream_consumer")
try:
while True:
chan.wait()
except KeyboardInterrupt:
chan.basic_cancel("mysqlstream_consumer")
chan.close()
conn.close()
sys.exit(0)
#!/usr/bin/env python
"""
Test ORB to AMQP message handler.
Grab ORB packets from antelope and push them to a destination AMQP
2009-09-15 Juan Reyes <[email protected]>
"""
import sys
import os
import re
#import time
import getopt
from struct import *
from amqplib import client_0_8 as amqp
sys.path.append( os.environ['ANTELOPE'] + '/data/python' )
try:
from antelope.orb import *
except:
print "orb2amqp requires the python 'antelope.orb' (module not found). "
raise SystemExit
try:
from antelope.Pkt import *
except:
print "orb2amqp requires the python 'antelope.Pkt' (module not found). "
raise SystemExit
try:
from antelope.stock import *
except:
print "orb2amqp requires the python 'antelope.stock' (module not found). "
raise SystemExit
usage = "Usage: orb2amqp [-v] orbname \n"
elog_init( sys.argv )
try:
opts, pargs = getopt.getopt(sys.argv[1:], "v")
except getopt.GetoptError, err:
print str(err)
raise SystemExit
elog_die( usage );
if len(pargs) != 1:
print usage
raise SystemExit
elog_die( usage )
else:
orbname = pargs[0]
pfname = 'orb2amqp'
verbose = False
for o, a in opts:
if o == "-v":
verbose = 'True'
def main():
orbfd = orbopen( orbname, "r")
version = orbfd.ping()
if verbose:
elog_notify( "Connecting to ORB: %s" % orbname )
elog_notify( "ORB version: %s" % version )
conn = amqp.Connection(host=" ", userid=" ", password=" ", virtual_host="/sqlstream", insist=True)
amqp_chan_1 = conn.channel()
amqp_chan_2 = conn.channel()
try:
while(True):
(pktid, srcname, time, packet, nbytes) = orbreap( orbfd )
(net, sta, chan, loc, suffix, subcode) = split_srcname(srcname)
(type,pkt) = unstuffPkt( srcname, time, packet, nbytes)
if verbose:
elog_notify( "\n\n" )
elog_notify( "%s %s %s %s" % (net,sta,chan,loc) )
if pkt.nchannels() != 1:
elog_complain( "Only 1 channel per paket permitted! (%s)" % pkt.nchannels() )
elog_complain( "srcname: %s" % pkt.srcname() )
elog_complain( "time: %s" % pkt.time() )
elog_complain( "PacketType: %s" % pkt.PacketType() )
elog_complain( "type: %s" % pkt.type() )
elog_complain( "nchannels: %s" % pkt.nchannels() )
elog_complain( "pf: %s" % pkt.pf() )
elog_complain( "string: %s" % pkt.string() )
elog_complain( "dbile: %s" % pkt.dfile() )
elog_complain( "version: %s" % pkt.version() )
elog_complain( "db: %s" % pkt.db() )
elog_complain( "\n\n" )
continue
pktchannel = pkt.channels(0)
sps = str(pktchannel.samprate())
s_type = str(pktchannel.segtype())
s_time = str(time)
text = {'pktid':pktid, 'time':s_time, 'net':net, 'sta':sta, 'chan':chan, 'loc':loc, 'samprate':sps, 'segtype':s_type}
if verbose:
elog_notify( str(text) )
packet_string = orbpkt_string( srcname, time, packet, nbytes )
msg1 = amqp.Message(packet_string, application_headers=text, content_encoding="text", content_type='text/plain')
msg2 = amqp.Message(packet, application_headers=text, content_encoding="binary", content_type='application/octet-stream')
# Make msgs persist after server restart
#msg1.properties["delivery_mode"]=2
#msg2.properties["delivery_mode"]=2
amqp_chan_1.basic_publish(msg1, exchange='HEX_DATA')
amqp_chan_2.basic_publish(msg2, exchange='BIN_DATA')
#amqp_chan.basic_publish(msg1, exchange='HEX_DATA', mandatory=False, immediate=False)
#amqp_chan.basic_publish(msg2, exchange='BIN_DATA', mandatory=False, immediate=False)
except KeyboardInterrupt:
amqp_chan_1.close()
amqp_chan_2.close()
conn.close()
orbclose(orbfd)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment