Skip to content

Instantly share code, notes, and snippets.

@iKlotho
Forked from pguillem/consume_MT_messages.py
Last active November 28, 2022 12:05
Show Gist options
  • Save iKlotho/77491460f6669043008a82aecc839b73 to your computer and use it in GitHub Desktop.
Save iKlotho/77491460f6669043008a82aecc839b73 to your computer and use it in GitHub Desktop.
Jasmin Consume MT messages Multipart Message
# -*- coding: utf-8 -*-
"""
Original file is the https://gist.github.com/pguillem/5750e8db352f001138f2
This version of the script uses psycopg2 instead of PySQLPool and handles multipart messages
"""
import sys
import pickle
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from twisted.python import log
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec
import psycopg2
from psycopg2 import pool
def udh_exists(msg):
"""
\x05\x00\x03 the default UDH for multipart messages
\x08\x25\x01 is for the shift table and default UDH for multipart message
\x03\x25\x01 is for the shift table and single message
"""
udh_list = ['\x05\x00\x03', '\x08\x25\x01', '\x03\x25\x01']
if msg[:3] in udh_list: # udh
return True
return False
def remove_udh(msg):
udh_length = ord(msg[0]) # Total length of udh
msg = msg[udh_length+1:]
return msg
@inlineCallbacks
def gotConnection(conn, username, password):
#print "Connected to broker."
yield conn.authenticate(username, password,'PLAIN')
print "Authenticated. Ready to receive messages"
chan = yield conn.channel(1)
yield chan.channel_open()
yield chan.queue_declare(queue="someQueueName")
# Bind to submit.sm.* and submit.sm.resp.* routes
yield chan.queue_bind(queue="someQueueName", exchange="messaging", routing_key='submit.sm.*')
yield chan.queue_bind(queue="someQueueName", exchange="messaging", routing_key='submit.sm.resp.*')
yield chan.basic_consume(queue='someQueueName', no_ack=True, consumer_tag="someTag")
queue = yield conn.queue("someTag")
ps_pool = psycopg2.pool.SimpleConnectionPool(
1,
20,
user='****',
password='**',
host='***',
database='****')
if ps_pool:
print "Pooling 20 connections"
conn = ps_pool.getconn()
if conn:
print "Connected to psql"
curr = conn.cursor()
# Wait for messages
# This can be done through a callback ...
while True:
msg = yield queue.get()
props = msg.content.properties
pdu = pickle.loads(msg.content.body)
if msg.routing_key[:15] == 'submit.sm.resp.':
#print 'SubmitSMResp: status: %s, msgid: %s' % (pdu.status,
# props['message-id'])
#Update a record in mysql according to your own table. This will fire upon receiving a PDU response.
#Make sure you already have a matching sms record to update.
curr.execute("UPDATE jasmin_messages SET status='%s' WHERE messageid='%s'" % (pdu.status,props['message-id'],))
conn.commit()
elif msg.routing_key[:10] == 'submit.sm.':
#print 'SubmitSM: from %s to %s, content: %s, msgid: %s' % (pdu.params['source_addr'],
# pdu.params['destination_addr'],
# pdu.params['short_message'],
# props['message-id'])
# This will fire every time a message is sent to the SumbitSM queue.
# Create a record with the messagesent msg
src_conn = props['headers']['source_connector']
carrier = props['reply-to'].split('.')[::-1][0]
content = str(pdu.params['short_message'])
if udh_exists(content):
content = remove_udh(content)
while hasattr(pdu, 'nextPdu'):
pdu = pdu.nextPdu
msg = pdu.params['short_message']
if udh_exists(msg):
content += remove_udh(msg)
else:
content += msg
query = """INSERT INTO jasmin_messages
(messageid,carrier,date,dst,src,status,accountcode,cost,sale,plan_name,amaflags,content,sms_source)
VALUES
('%s','%s',NOW(),'%s','%s','8','00000','0.0','0.0','planname','somestatus','%s','%s', '%s')""" %\
(props['message-id'],
carrier,
pdu.params['destination_addr'],
pdu.params['source_addr'],
content,
src_conn)
print("query is %s" % query)
curr.execute(query)
"""
The previous query works for the following table structure:
id INT primary_key auto_increment
messageid VARCHAR(128)
carrier VARCHAR
date DATETIME
dst VARCHAR(15)
src VARCHAR(15)
status VARCHAR(10)
accountcode INT
cost FLOAT
sale FLOAT
plan_name VARCHAR(25)
amaflags VARCHAR(10)
content VARCHAR(160)
"""
conn.commit()
else:
print 'unknown route'
# A clean way to tear down and stop
yield chan.basic_cancel("someTag")
yield chan.channel_close()
chan0 = yield conn.channel(0)
yield chan0.connection_close()
reactor.stop()
if __name__ == "__main__":
"""
This example will connect to RabbitMQ broker and consume from two route keys:
- submit.sm.*: All messages sent through SMPP Connectors
- submit.sm.resp.*: More relevant than SubmitSM because it contains the sending status
Note:
- Messages consumed from submit.sm.resp.* are not verbose enough, they contain only message-id and status
- Message content can be obtained from submit.sm.*, the message-id will be the same when consuming from submit.sm.resp.*,
it is used for mapping.
- Billing information is contained in messages consumed from submit.sm.*
- This is a proof of concept, saying anyone can consume from any topic in Jasmin's exchange hack a
third party business, more information here: http://docs.jasminsms.com/en/latest/messaging/index.html
"""
host = '127.0.0.1'
port = 5672
vhost = '/'
username = 'guest'
password = 'guest'
spec_file = '/etc/jasmin/resource/amqp0-9-1.xml'
spec = txamqp.spec.load(spec_file)
# Connect and authenticate
d = ClientCreator(reactor,
AMQClient,
delegate=TwistedDelegate(),
vhost=vhost,
spec=spec).connectTCP(host, port)
d.addCallback(gotConnection, username, password)
def whoops(err):
if reactor.running:
log.err(err)
reactor.stop()
d.addErrback(whoops)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment