Created
October 16, 2009 20:09
-
-
Save jreyes1108/212037 to your computer and use it in GitHub Desktop.
Antelope 2 AMQP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Read messages from the AMQP server using py-amqplib | |
2009-09-15 Juan Reyes <[email protected]> | |
""" | |
from amqplib import client_0_8 as amqp | |
import sys | |
import time | |
conn = amqp.Connection(host=" ", userid=" ", password=" ", virtual_host="/sqlstream", insist=True) | |
chan = conn.channel() | |
chan.queue_declare(queue='TEST', durable=False, exclusive=False, auto_delete=False) | |
chan.exchange_declare(exchange='HEX_DATA', type='fanout', durable=False, auto_delete=True) | |
chan.exchange_declare(exchange='BIN_DATA', type='fanout', durable=False, auto_delete=True) | |
chan.queue_bind(queue='TEST', exchange='HEX_DATA') | |
chan.queue_bind(queue='TEST', exchange='BIN_DATA') | |
def recv_callback(msg): | |
t = time.time() | |
print '\nReceived '+ str(t) + ': ' + str(msg.application_headers) | |
print '\t' + str(msg.body) + '\n' | |
chan.basic_consume(queue="TEST", no_ack=True, callback=recv_callback, consumer_tag="mysqlstream_consumer") | |
try: | |
while True: | |
chan.wait() | |
except KeyboardInterrupt: | |
chan.basic_cancel("mysqlstream_consumer") | |
chan.close() | |
conn.close() | |
sys.exit(0) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Test ORB to AMQP message handler. | |
Grab ORB packets from antelope and push them to a destination AMQP | |
2009-09-15 Juan Reyes <[email protected]> | |
""" | |
import sys | |
import os | |
import re | |
#import time | |
import getopt | |
from struct import * | |
from amqplib import client_0_8 as amqp | |
sys.path.append( os.environ['ANTELOPE'] + '/data/python' ) | |
try: | |
from antelope.orb import * | |
except: | |
print "orb2amqp requires the python 'antelope.orb' (module not found). " | |
raise SystemExit | |
try: | |
from antelope.Pkt import * | |
except: | |
print "orb2amqp requires the python 'antelope.Pkt' (module not found). " | |
raise SystemExit | |
try: | |
from antelope.stock import * | |
except: | |
print "orb2amqp requires the python 'antelope.stock' (module not found). " | |
raise SystemExit | |
usage = "Usage: orb2amqp [-v] orbname \n" | |
elog_init( sys.argv ) | |
try: | |
opts, pargs = getopt.getopt(sys.argv[1:], "v") | |
except getopt.GetoptError, err: | |
print str(err) | |
raise SystemExit | |
elog_die( usage ); | |
if len(pargs) != 1: | |
print usage | |
raise SystemExit | |
elog_die( usage ) | |
else: | |
orbname = pargs[0] | |
pfname = 'orb2amqp' | |
verbose = False | |
for o, a in opts: | |
if o == "-v": | |
verbose = 'True' | |
def main(): | |
orbfd = orbopen( orbname, "r") | |
version = orbfd.ping() | |
if verbose: | |
elog_notify( "Connecting to ORB: %s" % orbname ) | |
elog_notify( "ORB version: %s" % version ) | |
conn = amqp.Connection(host=" ", userid=" ", password=" ", virtual_host="/sqlstream", insist=True) | |
amqp_chan_1 = conn.channel() | |
amqp_chan_2 = conn.channel() | |
try: | |
while(True): | |
(pktid, srcname, time, packet, nbytes) = orbreap( orbfd ) | |
(net, sta, chan, loc, suffix, subcode) = split_srcname(srcname) | |
(type,pkt) = unstuffPkt( srcname, time, packet, nbytes) | |
if verbose: | |
elog_notify( "\n\n" ) | |
elog_notify( "%s %s %s %s" % (net,sta,chan,loc) ) | |
if pkt.nchannels() != 1: | |
elog_complain( "Only 1 channel per paket permitted! (%s)" % pkt.nchannels() ) | |
elog_complain( "srcname: %s" % pkt.srcname() ) | |
elog_complain( "time: %s" % pkt.time() ) | |
elog_complain( "PacketType: %s" % pkt.PacketType() ) | |
elog_complain( "type: %s" % pkt.type() ) | |
elog_complain( "nchannels: %s" % pkt.nchannels() ) | |
elog_complain( "pf: %s" % pkt.pf() ) | |
elog_complain( "string: %s" % pkt.string() ) | |
elog_complain( "dbile: %s" % pkt.dfile() ) | |
elog_complain( "version: %s" % pkt.version() ) | |
elog_complain( "db: %s" % pkt.db() ) | |
elog_complain( "\n\n" ) | |
continue | |
pktchannel = pkt.channels(0) | |
sps = str(pktchannel.samprate()) | |
s_type = str(pktchannel.segtype()) | |
s_time = str(time) | |
text = {'pktid':pktid, 'time':s_time, 'net':net, 'sta':sta, 'chan':chan, 'loc':loc, 'samprate':sps, 'segtype':s_type} | |
if verbose: | |
elog_notify( str(text) ) | |
packet_string = orbpkt_string( srcname, time, packet, nbytes ) | |
msg1 = amqp.Message(packet_string, application_headers=text, content_encoding="text", content_type='text/plain') | |
msg2 = amqp.Message(packet, application_headers=text, content_encoding="binary", content_type='application/octet-stream') | |
# Make msgs persist after server restart | |
#msg1.properties["delivery_mode"]=2 | |
#msg2.properties["delivery_mode"]=2 | |
amqp_chan_1.basic_publish(msg1, exchange='HEX_DATA') | |
amqp_chan_2.basic_publish(msg2, exchange='BIN_DATA') | |
#amqp_chan.basic_publish(msg1, exchange='HEX_DATA', mandatory=False, immediate=False) | |
#amqp_chan.basic_publish(msg2, exchange='BIN_DATA', mandatory=False, immediate=False) | |
except KeyboardInterrupt: | |
amqp_chan_1.close() | |
amqp_chan_2.close() | |
conn.close() | |
orbclose(orbfd) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment