Created
August 22, 2011 21:53
-
-
Save saghul/1163719 to your computer and use it in GitHub Desktop.
OpenSIPS socket sender with Twisted
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
# Copyright (C) 2011 Saúl Ibarra Corretgé <[email protected]> | |
# | |
# Based on callcontrol/opensips.py, Copyright (C) 2006-2011 AG Projects. | |
# OpenSIPS configuration example: | |
# loadmodule "mi_datagram.so" | |
# modparam("mi_datagram", "socket_name", "/var/run/opensips/socket") | |
# modparam("mi_datagram", "unix_socket_user", "opensips") | |
# modparam("mi_datagram", "unix_socket_group", "opensips") | |
import os | |
import socket | |
from application.process import process | |
from application.system import unlink | |
from itertools import count | |
from twisted.internet import reactor, defer, stdio | |
from twisted.internet.protocol import DatagramProtocol | |
from twisted.internet.error import CannotListenError | |
from twisted.protocols.basic import LineReceiver | |
from twisted.python.failure import Failure | |
class Error(Exception): pass | |
class CommandError(Error): pass | |
class TimeoutError(Error): pass | |
class NegativeReplyError(Error): pass | |
class Request(object): | |
def __init__(self, command): | |
self.command = command | |
self.deferred = defer.Deferred() | |
class UNIXSocketProtocol(DatagramProtocol): | |
noisy = False | |
def datagramReceived(self, data, address): | |
deferred = self.transport.deferred | |
if deferred is None or deferred.called: | |
return | |
# accumulate in a buffer until message end (do this later when implemented by opensips) -Dan | |
if not data: | |
failure = Failure(CommandError("Empty reply from OpenSIPS")) | |
deferred.errback(failure) | |
return | |
try: | |
status, msg = data.split('\n', 1) | |
except ValueError: | |
failure = Failure(CommandError("Missing line terminator after status line in OpenSIPS reply")) | |
deferred.errback(failure) | |
return | |
if status.upper() == '200 OK': | |
deferred.callback(msg) | |
else: | |
deferred.errback(Failure(NegativeReplyError(status))) | |
class UNIXSocketConnection(object): | |
timeout = 3 | |
def __init__(self, opensips_socket_path): | |
self._initialized = False | |
self.opensips_socket_path = opensips_socket_path | |
self.path, self.transport = self.initialize_local_socket() | |
reactor.addSystemEventTrigger('during', 'shutdown', self.close) | |
self.transport.deferred = None ## placeholder for the deferred used by a request | |
self._initialized = True | |
def initialize_local_socket(self): | |
counter = count() | |
while True: | |
i = counter.next() | |
socket_name = "opensips_%02d.sock" % i | |
socket_path = process.runtime_file(socket_name) | |
if os.path.exists(socket_path): | |
continue | |
try: | |
transport = reactor.listenUNIXDatagram(socket_path, UNIXSocketProtocol()) | |
except CannotListenError: | |
pass | |
else: | |
return socket_path, transport | |
def close(self): | |
if self._initialized: | |
self.transport.stopListening() | |
unlink(self.path) | |
def _get_deferred(self): | |
return self.transport.deferred | |
def _set_deferred(self, d): | |
self.transport.deferred = d | |
deferred = property(_get_deferred, _set_deferred) | |
def _did_timeout(self, deferred): | |
if deferred.called: | |
return | |
deferred.errback(Failure(TimeoutError("OpenSIPS command did timeout"))) | |
def send(self, request): | |
self.deferred = request.deferred | |
try: | |
self.transport.write(request.command, self.opensips_socket_path) | |
except socket.error, e: | |
self.deferred.errback(Failure(CommandError("Cannot send request to OpenSIPS: %s" % e))) | |
else: | |
reactor.callLater(self.timeout, self._did_timeout, self.deferred) | |
class ManagementInterface(object): | |
def __init__(self, opensips_socket_path): | |
self.conn = UNIXSocketConnection(opensips_socket_path) | |
def _RH_command(self, result): | |
if isinstance(result, Failure): | |
print "failed to execute command: %s" % result.value | |
def send_command(self, command, callback=None): | |
_parts = command.split() | |
cmd, rest = _parts[0], _parts[1:] | |
cmd = ':%s:\n%s\n' % (cmd, '\n'.join(rest)) | |
request = Request(cmd) | |
request.deferred.addBoth(callback or self._RH_command) | |
self.conn.send(request) | |
class StandardIOProtocol(LineReceiver): | |
delimiter = os.linesep | |
def __init__(self, opensips_socket): | |
self.opensips_socket = opensips_socket | |
def connectionMade(self): | |
self.management = ManagementInterface(self.opensips_socket) | |
self.transport.write('>>> ') | |
def lineReceived(self, line): | |
if line: | |
self.management.send_command(line, self._command_cb) | |
def _command_cb(self, result): | |
if isinstance(result, Failure): | |
data = "failed to execute command: %s" % result.value | |
else: | |
data = result | |
self.transport.write(data+self.delimiter) | |
self.transport.write('>>> ') | |
if __name__ == '__main__': | |
from optparse import OptionParser | |
parser = OptionParser() | |
parser.add_option('--socket', dest='socket_file', default='/var/run/opensips/socket', metavar='File') | |
options, args = parser.parse_args() | |
stdio.StandardIO(StandardIOProtocol(options.socket_file)) | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment