Skip to content

Instantly share code, notes, and snippets.

@diegofcornejo
Created March 12, 2021 05:29
Show Gist options
  • Save diegofcornejo/d31ef7c9af09afa4234a3990506e323d to your computer and use it in GitHub Desktop.
Save diegofcornejo/d31ef7c9af09afa4234a3990506e323d to your computer and use it in GitHub Desktop.
Python Flask SMPP Client
import logging
import sys
import smpplib.gsm
import smpplib.client
import smpplib.consts
from flask import Flask, request, jsonify
# from flask_mysqldb import MySQL
app = Flask(__name__)
# app.config['MYSQL_HOST'] = 'localhost'
# app.config['MYSQL_USER'] = 'root'
# app.config['MYSQL_PASSWORD'] = '123456'
# app.config['MYSQL_DB'] = 'flask'
# mysql = MySQL(app)
# if you want to know what's happening
logging.basicConfig(level='DEBUG')
client = smpplib.client.Client('190.148.68.22', 8206)
# Print when obtain message_id
client.set_message_sent_handler(
lambda pdu: sys.stdout.write('sent {} {}\n'.format(pdu.sequence, pdu.message_id)))
client.set_message_received_handler(
lambda pdu: sys.stdout.write('delivered {}\n'.format(pdu.receipted_message_id)))
client.connect()
client.bind_transceiver(system_id='systemid', password='password', system_type= 'smpp', interface_version=52)
@app.route("/")
def hello():
return "Hello World!"
@app.route("/ip")
def hi():
return request.environ.get('HTTP_X_REAL_IP', request.remote_addr)
# @app.route("/db")
# def insert():
# cur = mysql.connection.cursor()
# cur.execute("INSERT INTO users(email) VALUES (%s)", ('[email protected]',))
# mysql.connection.commit()
# cur.close()
# return 'success'
# @app.route("/get")
# def read():
# cur = mysql.connection.cursor()
# cur.execute("SELECT * FROM users")
# rv = cur.fetchall()
# return str(rv)
# @app.route("/sms", methods=['GET', 'POST'])
@app.route("/sms", methods=['POST'])
def send():
req_data = request.get_json()
to = req_data['to']
message = req_data['message']
parts, encoding_flag, msg_type_flag = smpplib.gsm.make_parts(message)
# to = request.form.get('to')
# message = request.form['message']
# print(to, message)
for part in parts:
pdu = client.send_message(
# source_addr_ton=smpplib.consts.SMPP_TON_INTL,
source_addr_ton=2,
#source_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
# Make sure it is a byte string, not unicode:
source_addr='+707',
# dest_addr_ton=smpplib.consts.SMPP_TON_INTL,
dest_addr_ton=2,
#dest_addr_npi=smpplib.consts.SMPP_NPI_ISDN,
# Make sure thease two params are byte strings, not unicode:
destination_addr=to,
short_message=part,
data_coding=encoding_flag,
esm_class=msg_type_flag,
registered_delivery=True,
)
print(pdu.sequence)
return str(req_data)
if __name__ == '__main__':
app.run(host = '0.0.0.0',port=4000, debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment