Skip to content

Instantly share code, notes, and snippets.

@saghul
Created August 22, 2011 21:53
Show Gist options
  • Save saghul/1163719 to your computer and use it in GitHub Desktop.
Save saghul/1163719 to your computer and use it in GitHub Desktop.
OpenSIPS socket sender with Twisted
# coding=utf-8
# Copyright (C) 2011 Saúl Ibarra Corretgé <[email protected]>
#
# Based on callcontrol/opensips.py, Copyright (C) 2006-2011 AG Projects.
# OpenSIPS configuration example:
# loadmodule "mi_datagram.so"
# modparam("mi_datagram", "socket_name", "/var/run/opensips/socket")
# modparam("mi_datagram", "unix_socket_user", "opensips")
# modparam("mi_datagram", "unix_socket_group", "opensips")
import os
import socket
from application.process import process
from application.system import unlink
from itertools import count
from twisted.internet import reactor, defer, stdio
from twisted.internet.protocol import DatagramProtocol
from twisted.internet.error import CannotListenError
from twisted.protocols.basic import LineReceiver
from twisted.python.failure import Failure
class Error(Exception): pass
class CommandError(Error): pass
class TimeoutError(Error): pass
class NegativeReplyError(Error): pass
class Request(object):
def __init__(self, command):
self.command = command
self.deferred = defer.Deferred()
class UNIXSocketProtocol(DatagramProtocol):
noisy = False
def datagramReceived(self, data, address):
deferred = self.transport.deferred
if deferred is None or deferred.called:
return
# accumulate in a buffer until message end (do this later when implemented by opensips) -Dan
if not data:
failure = Failure(CommandError("Empty reply from OpenSIPS"))
deferred.errback(failure)
return
try:
status, msg = data.split('\n', 1)
except ValueError:
failure = Failure(CommandError("Missing line terminator after status line in OpenSIPS reply"))
deferred.errback(failure)
return
if status.upper() == '200 OK':
deferred.callback(msg)
else:
deferred.errback(Failure(NegativeReplyError(status)))
class UNIXSocketConnection(object):
timeout = 3
def __init__(self, opensips_socket_path):
self._initialized = False
self.opensips_socket_path = opensips_socket_path
self.path, self.transport = self.initialize_local_socket()
reactor.addSystemEventTrigger('during', 'shutdown', self.close)
self.transport.deferred = None ## placeholder for the deferred used by a request
self._initialized = True
def initialize_local_socket(self):
counter = count()
while True:
i = counter.next()
socket_name = "opensips_%02d.sock" % i
socket_path = process.runtime_file(socket_name)
if os.path.exists(socket_path):
continue
try:
transport = reactor.listenUNIXDatagram(socket_path, UNIXSocketProtocol())
except CannotListenError:
pass
else:
return socket_path, transport
def close(self):
if self._initialized:
self.transport.stopListening()
unlink(self.path)
def _get_deferred(self):
return self.transport.deferred
def _set_deferred(self, d):
self.transport.deferred = d
deferred = property(_get_deferred, _set_deferred)
def _did_timeout(self, deferred):
if deferred.called:
return
deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout")))
def send(self, request):
self.deferred = request.deferred
try:
self.transport.write(request.command, self.opensips_socket_path)
except socket.error, e:
self.deferred.errback(Failure(CommandError("Cannot send request to OpenSIPS: %s" % e)))
else:
reactor.callLater(self.timeout, self._did_timeout, self.deferred)
class ManagementInterface(object):
def __init__(self, opensips_socket_path):
self.conn = UNIXSocketConnection(opensips_socket_path)
def _RH_command(self, result):
if isinstance(result, Failure):
print "failed to execute command: %s" % result.value
def send_command(self, command, callback=None):
_parts = command.split()
cmd, rest = _parts[0], _parts[1:]
cmd = ':%s:\n%s\n' % (cmd, '\n'.join(rest))
request = Request(cmd)
request.deferred.addBoth(callback or self._RH_command)
self.conn.send(request)
class StandardIOProtocol(LineReceiver):
delimiter = os.linesep
def __init__(self, opensips_socket):
self.opensips_socket = opensips_socket
def connectionMade(self):
self.management = ManagementInterface(self.opensips_socket)
self.transport.write('>>> ')
def lineReceived(self, line):
if line:
self.management.send_command(line, self._command_cb)
def _command_cb(self, result):
if isinstance(result, Failure):
data = "failed to execute command: %s" % result.value
else:
data = result
self.transport.write(data+self.delimiter)
self.transport.write('>>> ')
if __name__ == '__main__':
from optparse import OptionParser
parser = OptionParser()
parser.add_option('--socket', dest='socket_file', default='/var/run/opensips/socket', metavar='File')
options, args = parser.parse_args()
stdio.StandardIO(StandardIOProtocol(options.socket_file))
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment