Created
September 13, 2019 00:38
-
-
Save linxGnu/b488997a0e62b3f6a7060ba2af6391ea to your computer and use it in GitHub Desktop.
Connect to SMSC for sending MT and receiving MO via Session Manager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package daemon | |
import ( | |
"context" | |
"fmt" | |
"math" | |
"os" | |
"strconv" | |
"sync" | |
"time" | |
"github.com/linxGnu/gosmpp" | |
"github.com/linxGnu/gosmpp/Data" | |
"github.com/linxGnu/gosmpp/Exception" | |
"github.com/linxGnu/gosmpp/PDU" | |
"github.com/linxGnu/gosmpp/Utils" | |
) | |
const ( | |
stateUnBind = iota | |
stateBind | |
) | |
const ( | |
statusSendMTFail = iota | |
statusSendMTSuccess | |
) | |
const dataCodingASCII byte = 0 | |
var instance SessionInterface | |
var lock sync.RWMutex | |
// PDU config | |
var ( | |
pduSrcAddrTon byte = 5 | |
pduSrcAddrNpi byte | |
pduDesAddrTon byte = 1 | |
pduDesAddrNpi byte = 1 | |
pduProtocolID byte | |
pduRegisterDelivery byte | |
pduReplaceIfPresentFlag byte | |
pduEsmClass byte | |
) | |
// SessionInterface ... | |
type SessionInterface interface { | |
Bind() error | |
Rebind() error | |
Destroy() | |
UnBind() (unbindResp *PDU.UnbindResp, err *Exception.Exception) | |
} | |
// SessionManager ... | |
type SessionManager struct { | |
session *gosmpp.Session | |
sessionLock sync.RWMutex | |
config config.SMPPConnection | |
lg logger.Logger | |
moHandler *handler.MOHandler | |
// PDU event handle channel | |
eventChan chan *gosmpp.ServerPDUEvent | |
wg *sync.WaitGroup | |
firstTimeBind bool | |
seq int | |
seqLock sync.RWMutex | |
// use for stop workers | |
ctx context.Context | |
cancel context.CancelFunc | |
} | |
// Bind ... | |
func (s *SessionManager) Bind() error { | |
s.sessionLock.Lock() | |
defer s.sessionLock.Unlock() | |
s.ctx, s.cancel = context.WithCancel(context.Background()) | |
// retry if err | |
var connection *gosmpp.TCPIPConnection | |
var err error | |
for i := 0; i < 3; i++ { | |
connection, err = gosmpp.NewTCPIPConnectionWithAddrPort(s.config.Address, s.config.Port) | |
if err == nil { | |
break | |
} | |
time.Sleep(50 * time.Millisecond) | |
} | |
if err != nil { | |
s.lg.Error(&model.LogFormat{Action: "Bind", Message: "failed to connect SMPP", Err: err}) | |
time.Sleep(10 * time.Millisecond) | |
os.Exit(2) | |
} | |
s.session = gosmpp.NewSessionWithConnection(connection) | |
s.session.EnableStateChecking() | |
request := PDU.NewBindTransceiver() | |
request.SetSystemId(s.config.SystemID) | |
request.SetPassword(s.config.Password) | |
request.SetSystemType(s.config.SystemType) | |
// try to bind | |
if resp, e := s.session.BindWithListener(request, s); e != nil || resp.GetCommandStatus() != 0 { | |
s.lg.Error(&model.LogFormat{Action: "Bind", Message: "Try to bind SMPP", Err: e}) | |
connection.Close() | |
s.session = nil | |
panic(e.Error.Error()) // let panic so that webserver daemon is also closed | |
} else if s.config.DebugMode { | |
s.lg.Debug(&model.LogFormat{Action: "Bind", Message: "Try to bind SMPP", Data: resp}) | |
} | |
s.lg.Info(&model.LogFormat{Action: "Bind", Message: "Successful"}) | |
// start pdu event handlers buffered | |
for i := 0; i < s.config.NumberOfMOHandler; i++ { | |
s.wg.Add(1) | |
go s.handleEventWorker() | |
} | |
// start submitSM worker | |
for i := 0; i < s.config.NumberOfMTHandler; i++ { | |
s.wg.Add(1) | |
go s.submitSMWorker() | |
} | |
// start enquire link | |
if s.firstTimeBind { | |
s.firstTimeBind = false | |
if s.config.EnquireLinkIntervalSec > 0 { | |
go s.doEnquireLink() | |
} | |
} | |
return nil | |
} | |
// Rebind Unbind then bind SMSC | |
func (s *SessionManager) Rebind() error { | |
SessionManagerCounter.Incr("rebind") | |
s.UnBind() | |
return s.Bind() | |
} | |
// Destroy Unbind then close connection | |
func (s *SessionManager) Destroy() { | |
s.UnBind() | |
} | |
// UnBind Unbind SMPP session | |
func (s *SessionManager) UnBind() (unbindResp *PDU.UnbindResp, err *Exception.Exception) { | |
s.lg.Info(&model.LogFormat{Action: "start Unbind SMPP"}) | |
s.sessionLock.RLock() | |
if s.session == nil { | |
s.sessionLock.RUnlock() | |
return | |
} | |
s.sessionLock.RUnlock() | |
s.lg.Info(&model.LogFormat{Action: "Unbind SMPP"}) | |
// notify all worker to stop | |
s.cancel() | |
s.wg.Wait() | |
s.lg.Info(&model.LogFormat{Action: "Close chan done"}) | |
// Do session lock | |
s.sessionLock.Lock() | |
unbindResp, err = s.session.Unbind() | |
if err != nil { | |
s.lg.Error(&model.LogFormat{Action: "Unbind SMPP", Err: err}) | |
} else if s.config.DebugMode { | |
s.lg.Debug(&model.LogFormat{Action: "Unbind SMPP", Data: unbindResp}) | |
} | |
s.lg.Info(&model.LogFormat{Action: "Close chan done"}) | |
s.session = nil | |
s.sessionLock.Unlock() | |
return | |
} | |
// handleEventWorker handle signal sent by SMPP | |
func (s *SessionManager) handleEventWorker() { | |
defer s.wg.Done() | |
for { | |
select { | |
case <-s.ctx.Done(): | |
return | |
case event := <-s.eventChan: | |
s.handleEvent(event) | |
} | |
} | |
} | |
func (s *SessionManager) handleEvent(event *gosmpp.ServerPDUEvent) { | |
var ( | |
// msgID = int64(-1) | |
commandStatus int32 | |
// status int | |
moContent string | |
) | |
if event == nil { | |
return | |
} | |
t := event.GetPDU() | |
if t == nil { | |
return | |
} | |
commandStatus = t.GetCommandStatus() | |
if commandStatus == Data.ESME_RTHROTTLED { | |
s.lg.Warn(&model.LogFormat{Action: "handleEventWorker", Message: "Throttled", Data: t}) | |
mtCounter.Incr("throttled") | |
} else if commandStatus == Data.ESME_RMSGQFUL { | |
s.lg.Warn(&model.LogFormat{Action: "handleEventWorker", Message: "Queue full", Data: t}) | |
mtCounter.Incr("queue_full") | |
} | |
switch v := t.(type) { | |
case *PDU.DeliverSM: | |
if s.config.DebugMode { | |
s.lg.Debug(&model.LogFormat{Action: "DeliverSM", Data: v}) | |
} | |
if response, err := v.GetResponse(); err == nil { | |
s.session.Respond(response) | |
} | |
if v.GetEsmClass() == 0 { // receive MO from user | |
moContent, _ = v.GetShortMessageWithEncoding(Data.ENC_UTF8) | |
if v.GetSourceAddr() != nil { | |
isdn := v.GetSourceAddr().GetAddress() | |
s.lg.Info(&model.LogFormat{Action: "ReceiveMO", Data: map[string]interface{}{ | |
"isdn": isdn, | |
"content": moContent, | |
}}) | |
if err := s.moHandler.ProcessMO(isdn, moContent); err != nil { | |
s.lg.Error(&model.LogFormat{Action: "ProcessMO", Err: err}) | |
} | |
} | |
} | |
case *PDU.Unbind: | |
s.lg.Info(&model.LogFormat{Action: "Unbind SMSC", Message: "Received"}) | |
if response, err := v.GetResponse(); err == nil { | |
s.lg.Info(response) | |
s.lg.Info(err) | |
s.session.Respond(response) | |
} | |
case *PDU.EnquireLink: | |
s.lg.Info(&model.LogFormat{Action: "EnquireLink SMSC", Message: "Received"}) | |
if response, err := v.GetResponse(); err == nil { | |
s.lg.Info(response) | |
s.lg.Info(err) | |
s.session.Respond(response) | |
} | |
case *PDU.SubmitSMResp: | |
// we dont care submitSMResp at time | |
s.lg.Info(&model.LogFormat{Action: "SubmitSMResp SMSC", Message: "Received"}) | |
} | |
} | |
func (s *SessionManager) submitSMWorker() { | |
defer s.wg.Done() | |
// receives MT from webserver | |
for { | |
select { | |
case <-s.ctx.Done(): | |
return | |
case v := <-core.GetHTTPMTChan(): | |
s.lg.Info(&model.LogFormat{Action: "Receive MT from Channel", Data: v}) | |
core.MTChanGauge.Dec() | |
s.handleMT(v) | |
if s.config.MTIntervalMiliSec > 0 { | |
time.Sleep(time.Duration(s.config.MTIntervalMiliSec) * time.Millisecond) | |
} | |
} | |
} | |
} | |
func (s *SessionManager) handleMT(v *model.MT) { | |
var submitSM *PDU.SubmitSM | |
ln, firstMTInSeq := 0, true | |
ln = len(v.Info) | |
// we dont care submitSMResp now. So we dont save seq to Redis | |
if ln < 140 { | |
submitSM = s.createSubmitSM(v.EncryptedIsdn, v.Info, v.Alias) | |
// cache this seq | |
seq := int32(s.incrementSequence()) | |
// expiredTime := time.Duration(s.config.) * time.Second | |
// keyName := "seq_" + strconv.Itoa(int(seq)) | |
// if err := s.redisClient.Set(keyName, v, expiredTime).Err(); err != nil { | |
// logger.WithFields(map[string]interface{}{ | |
// "seq": seq, | |
// "mt": v, | |
// "error": err, | |
// }).Error("Can not set sequence number") | |
// } | |
// set sequence number | |
submitSM.SetSequenceNumber(seq) | |
if s.config.DebugMode { | |
s.lg.Debug(&model.LogFormat{Action: "SUBMIT_SM", Data: submitSM}) | |
} | |
s.sessionLock.RLock() | |
if _, e := s.session.Submit(submitSM); e != nil { | |
s.lg.Error(&model.LogFormat{Action: "SUBMIT_SM", Data: submitSM, Err: e}) | |
mtCounter.Incr("error") | |
} else { | |
s.lg.Info(&model.LogFormat{Action: "SUBMIT_SM", Message: "Success", Data: submitSM}) | |
mtCounter.Incr("success") | |
} | |
s.sessionLock.RUnlock() | |
} else { | |
s.lg.Info(&model.LogFormat{Action: "SUBMIT_SM_MULTI", Data: v}) | |
firstMTInSeq = true | |
smRunes := []byte(v.Info) | |
runeLen := len(v.Info) | |
totalPart := byte(int(math.Ceil(float64(runeLen) / 134))) | |
partNum := 1 | |
uuID := s.incrementSequence() | |
for i := 0; i < runeLen; i += 134 { | |
start, end := i, i+134 | |
if end > runeLen { | |
end = runeLen | |
} | |
// https://help.nexmo.com/hc/en-us/articles/204015653-Sending-Concatenated-Messages-via-SMPP | |
part := []byte{5, 0, 3, byte(uuID), totalPart, byte(partNum)} | |
part = append(part, []byte(smRunes[start:end])...) | |
submitSM = s.createSubmitSM(v.EncryptedIsdn, "", v.Alias) | |
submitSM.SetShortMessageData(Utils.NewBuffer(part)) | |
submitSM.SetEsmClass(byte(64)) | |
if s.config.DebugMode { | |
s.lg.Debug(&model.LogFormat{Action: "SUBMIT_SM", Message: "Req", Data: submitSM}) | |
} | |
var seq int32 | |
if firstMTInSeq { | |
seq = submitSM.GetSequenceNumber() | |
} else { | |
seq = int32(s.incrementSequence()) | |
} | |
submitSM.SetSequenceNumber(seq) | |
s.sessionLock.RLock() | |
s.lg.Info(&model.LogFormat{Action: "SUBMIT_SM_MULTI", Message: "Part #" + strconv.Itoa(partNum)}) | |
if resp, e := s.session.Submit(submitSM); e == nil { | |
if firstMTInSeq { | |
firstMTInSeq = false | |
} | |
s.lg.Info(&model.LogFormat{Action: "SUBMIT_SM_MULTI", Message: "Part #" + strconv.Itoa(partNum) + ". Success"}) | |
mtCounter.Incr("success") | |
} else { | |
s.lg.Error(&model.LogFormat{Action: "SUBMIT_SM_MULTI", Message: "Part #" + strconv.Itoa(partNum), Err: e, Data: resp}) | |
mtCounter.Incr("error") | |
} | |
s.sessionLock.RUnlock() | |
partNum++ | |
if i < int(totalPart)-1 && s.config.MTIntervalMiliSec > 0 { | |
time.Sleep(time.Duration(s.config.MTIntervalMiliSec) * time.Millisecond) | |
} | |
} | |
} | |
} | |
func (s *SessionManager) createSubmitSM(receiver, content, alias string) *PDU.SubmitSM { | |
sm := PDU.NewSubmitSM() | |
var err *Exception.Exception | |
//srcAddr, err := PDU.NewAddressWithAddr(smscShortCode) | |
srcAddr := PDU.NewAddress() | |
if alias == "" { | |
err = srcAddr.SetAddress(s.config.ShortCode) | |
if err != nil { | |
s.lg.Error(&model.LogFormat{Action: "CREATE_SUBMIT_SM_SHORTCODE", Message: "INIT SrcAddr " + s.config.ShortCode, Err: err}) | |
} | |
} else { | |
err := srcAddr.SetAddress(alias) | |
if err != nil { | |
s.lg.Error(&model.LogFormat{Action: "CREATE_SUBMIT_SM_ALIAS", Message: "INIT SrcAddr " + alias, Err: err}) | |
} | |
} | |
srcAddr.SetTon(pduSrcAddrTon) | |
srcAddr.SetNpi(pduSrcAddrNpi) | |
sm.SetSourceAddr(srcAddr) | |
//desAddr, err := PDU.NewAddressWithAddr(receiver) | |
desAddr := PDU.NewAddress() | |
err = desAddr.SetAddress(receiver) | |
if err != nil { | |
s.lg.Error(&model.LogFormat{Action: "CREATE_SUBMIT_SM", Message: "INIT DestAddr", Err: err}) | |
} | |
desAddr.SetTon(pduDesAddrTon) | |
desAddr.SetNpi(pduDesAddrNpi) | |
sm.SetDestAddr(desAddr) | |
sm.SetProtocolId(pduProtocolID) | |
sm.SetRegisteredDelivery(pduRegisterDelivery) | |
sm.SetReplaceIfPresentFlag(pduReplaceIfPresentFlag) | |
sm.SetEsmClass(pduEsmClass) | |
sm.SetDataCoding(dataCodingASCII) | |
if len(content) > 0 { | |
sm.SetShortMessageWithEncoding(content, Data.ENC_UTF8) | |
} | |
return sm | |
} | |
func (s *SessionManager) incrementSequence() int { | |
s.seqLock.Lock() | |
defer s.seqLock.Unlock() | |
s.seq += s.config.SeqMod | |
if s.seq > 2000000000 { | |
s.seq = 1 | |
} | |
return s.seq | |
} | |
func (s *SessionManager) doEnquireLink() { | |
defer func() { | |
s.lg.Info(&model.LogFormat{Action: "doEnquireLink", Message: "Done"}) | |
}() | |
for { | |
select { | |
case <-s.ctx.Done(): | |
return | |
default: | |
// do enquire link | |
s.sessionLock.RLock() | |
if s.session != nil { | |
resp, err := s.session.EnquireLink(PDU.NewEnquireLink()) | |
if err != nil { | |
s.sessionLock.RUnlock() | |
if s.config.DebugMode { | |
s.lg.Debug(&model.LogFormat{Action: "doEnquireLink", Err: err, Data: resp}) | |
} | |
// trigger rebind | |
_ = s.Rebind() | |
return | |
} else if s.config.DebugMode && s.config.HeartBeat { | |
s.lg.Info(&model.LogFormat{Action: "SMSC Hertbeat", Message: "Resp", Data: resp}) | |
} | |
s.lg.Info(&model.LogFormat{Action: "enquire link", Err: err, Data: resp}) | |
} | |
s.sessionLock.RUnlock() | |
timeSleep := time.Duration(s.config.EnquireLinkIntervalSec) * time.Second | |
time.Sleep(timeSleep) | |
} | |
} | |
} | |
// HandleEvent ... | |
func (s *SessionManager) HandleEvent(event *gosmpp.ServerPDUEvent) (ex *Exception.Exception) { | |
defer func() { | |
if e := recover(); e != nil { | |
s.lg.Error(&model.LogFormat{Action: "HandleEvent", Err: e}) | |
ex = Exception.NewException(fmt.Errorf("%v", e)) | |
} | |
}() | |
s.eventChan <- event | |
return | |
} | |
// GetSMPPInstace return private SMPP instance | |
func GetSMPPInstace() SessionInterface { | |
return instance | |
} | |
// NewSMPPSessionManager ... | |
func NewSMPPSessionManager(ctx context.Context) (fn model.Daemon, err error) { | |
// init session | |
lock.Lock() | |
defer lock.Unlock() | |
if instance != nil { | |
instance.UnBind() | |
} | |
conf := config.Get().SMPPConnection | |
lg := logger.MustGet("SMPP-Session") | |
core.NewHTTPMTChan() | |
instance = &SessionManager{ | |
firstTimeBind: true, | |
config: conf, | |
moHandler: handler.NewMOHandler(), | |
wg: &sync.WaitGroup{}, | |
lg: lg, | |
eventChan: make(chan *gosmpp.ServerPDUEvent, conf.NumberOfMOHandler), | |
} | |
go func() { | |
if err := GetSMPPInstace().Bind(); err != nil { | |
lg.Error(err) | |
} | |
}() | |
fn = func() { | |
<-ctx.Done() | |
lg.Warn("start destroy") | |
GetSMPPInstace().Destroy() | |
lg.Warn("Gracefully stop SessionManager") | |
} | |
return | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment