Skip to content

Instantly share code, notes, and snippets.

@samuelcolvin
Created March 13, 2018 17:40
Show Gist options
  • Save samuelcolvin/d8b4241f19ffbd73c98ea3cfdc144a89 to your computer and use it in GitHub Desktop.
Save samuelcolvin/d8b4241f19ffbd73c98ea3cfdc144a89 to your computer and use it in GitHub Desktop.
watch a sip server and print the number of incoming calls using python 3.6 and asyncio
import asyncio
import hashlib
import os
import re
import secrets
from time import time
from multidict import CIMultiDict
from devtools import debug
HOST = 'sip.soho66.co.uk'
PORT = 8060
CONN = HOST, PORT
USERNAME = os.environ['SIP_USERNAME']
PASSWORD = os.environ['SIP_PASSWORD']
URI = f'sip:{HOST}:{PORT}'
AUTH_METHOD = 'REGISTER'
BRANCH = secrets.token_hex()[:16].upper()
def md5digest(*args):
return hashlib.md5(':'.join(args).encode()).hexdigest()
def parse_digest(header):
params = {}
for arg in header[7:].split(','):
k, v = arg.strip().split('=', 1)
if '="' in arg:
v = v[1:-1]
params[k] = v
return params
RESPONSE_DECODE = re.compile(r'SIP/2.0 (?P<status_code>[0-9]{3}) (?P<status_message>.+)')
REQUEST_DECODE = re.compile(r'(?P<method>[A-Za-z]+) (?P<to_uri>.+) SIP/2.0')
NUMBER = re.compile(r'sip:(\d+)@')
def parse_headers(raw_headers):
headers = CIMultiDict()
decoded_headers = raw_headers.decode().split('\r\n')
for line in decoded_headers[1:]:
k, v = line.split(': ', 1)
if k in headers:
o = headers.setdefault(k, [])
if not isinstance(o, list):
o = [o]
o.append(v)
headers[k] = o
else:
headers[k] = v
for regex in (REQUEST_DECODE, RESPONSE_DECODE):
m = regex.match(decoded_headers[0])
if m:
return m.groupdict(), headers
debug(raw_headers, headers)
raise RuntimeError('unable to decode response')
class EchoClientProtocol:
def __init__(self):
self.transport = None
self.auth_attempt = 0
self.local_ip = None
self.cseq = 1
self.call_id = secrets.token_hex()[:10]
self.last_invitation = 0
def connection_made(self, transport):
self.transport = transport
self.local_ip, _ = transport.get_extra_info('sockname')
print(f'connection established')
self.send(f"""\
{AUTH_METHOD} sip:{HOST}:{PORT} SIP/2.0
Via: SIP/2.0/UDP {self.local_ip}:5060;rport;branch={BRANCH}
From: <sip:{USERNAME}@{HOST}:{PORT}>;tag=1269824498
To: <sip:{USERNAME}@{HOST}:{PORT}>
Call-ID: {self.call_id}
CSeq: {self.cseq} {AUTH_METHOD}
Contact: <sip:{USERNAME}@{self.local_ip};line=9ad550fb9d87b0f>
Max-Forwards: 70
User-Agent: TutorCruncher Address Book
Expires: 60
Content-Length: 0""")
def datagram_received(self, data, addr):
# print('datagram:', data)
try:
self.process_response(data, addr)
except Exception:
debug(data)
debug(data.decode())
raise
def process_response(self, raw_data, addr):
if raw_data == b'\x00\x00\x00\x00':
return
headers, data = raw_data.split(b'\r\n\r\n', 1)
status, headers = parse_headers(headers)
status_code = int(status.get('status_code', 0))
if status_code == 401 and self.auth_attempt < 3:
self.auth_attempt += 1
auth = headers['WWW-Authenticate']
params = parse_digest(auth)
realm, nonce = params['realm'], params['nonce']
ha1 = md5digest(USERNAME, realm, PASSWORD)
ha2 = md5digest(AUTH_METHOD, URI)
self.send(f"""\
{AUTH_METHOD} sip:{HOST}:{PORT} SIP/2.0
Via: SIP/2.0/UDP {self.local_ip}:5060;rport;branch={BRANCH}
From: <sip:{USERNAME}@{HOST}:{PORT}>;tag=1269824498
To: <sip:{USERNAME}@{HOST}:{PORT}>
Call-ID: {self.call_id}
CSeq: {self.cseq} {AUTH_METHOD}
Contact: <sip:{USERNAME}@{self.local_ip};line=9ad550fb9d87b0f>
Authorization: Digest username="{USERNAME}", realm="{realm}", nonce="{nonce}", uri="{URI}", \
response="{md5digest(ha1, nonce, ha2)}", algorithm=MD5
Max-Forwards: 70
User-Agent: TutorCruncher Address Book
Expires: 60
Content-Length: 0""")
elif status_code == 200:
print('authenticated successfully')
elif status.get('method') == 'OPTIONS':
# don't care
pass
elif status.get('method') == 'INVITE':
n = time()
if (n - self.last_invitation) > 1:
self.process_invite(headers)
self.last_invitation = n
else:
debug(status, dict(headers))
try:
data_text = data.decode()
except UnicodeDecodeError:
print(data)
else:
print(data_text)
def send(self, data):
data = (data.strip('\n ') + '\n\n').replace('\n', '\r\n').encode()
self.transport.sendto(data)
self.cseq += 1
def error_received(self, exc):
print('Error received:', exc)
def process_invite(self, headers):
m = NUMBER.search(headers['From'])
if m:
number = m.groups()[0]
else:
number = 'unknown'
debug('no number found', dict(headers))
country = headers.get('X-Brand', None)
print(f'incoming call from {number}{" ({})".format(country) if country else ""}')
loop = asyncio.get_event_loop()
connect = loop.create_datagram_endpoint(lambda: EchoClientProtocol(), remote_addr=CONN)
transport, protocol = loop.run_until_complete(connect)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
transport.close()
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment