Created
March 13, 2018 17:40
-
-
Save samuelcolvin/d8b4241f19ffbd73c98ea3cfdc144a89 to your computer and use it in GitHub Desktop.
watch a sip server and print the number of incoming calls using python 3.6 and asyncio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import hashlib | |
import os | |
import re | |
import secrets | |
from time import time | |
from multidict import CIMultiDict | |
from devtools import debug | |
HOST = 'sip.soho66.co.uk' | |
PORT = 8060 | |
CONN = HOST, PORT | |
USERNAME = os.environ['SIP_USERNAME'] | |
PASSWORD = os.environ['SIP_PASSWORD'] | |
URI = f'sip:{HOST}:{PORT}' | |
AUTH_METHOD = 'REGISTER' | |
BRANCH = secrets.token_hex()[:16].upper() | |
def md5digest(*args): | |
return hashlib.md5(':'.join(args).encode()).hexdigest() | |
def parse_digest(header): | |
params = {} | |
for arg in header[7:].split(','): | |
k, v = arg.strip().split('=', 1) | |
if '="' in arg: | |
v = v[1:-1] | |
params[k] = v | |
return params | |
RESPONSE_DECODE = re.compile(r'SIP/2.0 (?P<status_code>[0-9]{3}) (?P<status_message>.+)') | |
REQUEST_DECODE = re.compile(r'(?P<method>[A-Za-z]+) (?P<to_uri>.+) SIP/2.0') | |
NUMBER = re.compile(r'sip:(\d+)@') | |
def parse_headers(raw_headers): | |
headers = CIMultiDict() | |
decoded_headers = raw_headers.decode().split('\r\n') | |
for line in decoded_headers[1:]: | |
k, v = line.split(': ', 1) | |
if k in headers: | |
o = headers.setdefault(k, []) | |
if not isinstance(o, list): | |
o = [o] | |
o.append(v) | |
headers[k] = o | |
else: | |
headers[k] = v | |
for regex in (REQUEST_DECODE, RESPONSE_DECODE): | |
m = regex.match(decoded_headers[0]) | |
if m: | |
return m.groupdict(), headers | |
debug(raw_headers, headers) | |
raise RuntimeError('unable to decode response') | |
class EchoClientProtocol: | |
def __init__(self): | |
self.transport = None | |
self.auth_attempt = 0 | |
self.local_ip = None | |
self.cseq = 1 | |
self.call_id = secrets.token_hex()[:10] | |
self.last_invitation = 0 | |
def connection_made(self, transport): | |
self.transport = transport | |
self.local_ip, _ = transport.get_extra_info('sockname') | |
print(f'connection established') | |
self.send(f"""\ | |
{AUTH_METHOD} sip:{HOST}:{PORT} SIP/2.0 | |
Via: SIP/2.0/UDP {self.local_ip}:5060;rport;branch={BRANCH} | |
From: <sip:{USERNAME}@{HOST}:{PORT}>;tag=1269824498 | |
To: <sip:{USERNAME}@{HOST}:{PORT}> | |
Call-ID: {self.call_id} | |
CSeq: {self.cseq} {AUTH_METHOD} | |
Contact: <sip:{USERNAME}@{self.local_ip};line=9ad550fb9d87b0f> | |
Max-Forwards: 70 | |
User-Agent: TutorCruncher Address Book | |
Expires: 60 | |
Content-Length: 0""") | |
def datagram_received(self, data, addr): | |
# print('datagram:', data) | |
try: | |
self.process_response(data, addr) | |
except Exception: | |
debug(data) | |
debug(data.decode()) | |
raise | |
def process_response(self, raw_data, addr): | |
if raw_data == b'\x00\x00\x00\x00': | |
return | |
headers, data = raw_data.split(b'\r\n\r\n', 1) | |
status, headers = parse_headers(headers) | |
status_code = int(status.get('status_code', 0)) | |
if status_code == 401 and self.auth_attempt < 3: | |
self.auth_attempt += 1 | |
auth = headers['WWW-Authenticate'] | |
params = parse_digest(auth) | |
realm, nonce = params['realm'], params['nonce'] | |
ha1 = md5digest(USERNAME, realm, PASSWORD) | |
ha2 = md5digest(AUTH_METHOD, URI) | |
self.send(f"""\ | |
{AUTH_METHOD} sip:{HOST}:{PORT} SIP/2.0 | |
Via: SIP/2.0/UDP {self.local_ip}:5060;rport;branch={BRANCH} | |
From: <sip:{USERNAME}@{HOST}:{PORT}>;tag=1269824498 | |
To: <sip:{USERNAME}@{HOST}:{PORT}> | |
Call-ID: {self.call_id} | |
CSeq: {self.cseq} {AUTH_METHOD} | |
Contact: <sip:{USERNAME}@{self.local_ip};line=9ad550fb9d87b0f> | |
Authorization: Digest username="{USERNAME}", realm="{realm}", nonce="{nonce}", uri="{URI}", \ | |
response="{md5digest(ha1, nonce, ha2)}", algorithm=MD5 | |
Max-Forwards: 70 | |
User-Agent: TutorCruncher Address Book | |
Expires: 60 | |
Content-Length: 0""") | |
elif status_code == 200: | |
print('authenticated successfully') | |
elif status.get('method') == 'OPTIONS': | |
# don't care | |
pass | |
elif status.get('method') == 'INVITE': | |
n = time() | |
if (n - self.last_invitation) > 1: | |
self.process_invite(headers) | |
self.last_invitation = n | |
else: | |
debug(status, dict(headers)) | |
try: | |
data_text = data.decode() | |
except UnicodeDecodeError: | |
print(data) | |
else: | |
print(data_text) | |
def send(self, data): | |
data = (data.strip('\n ') + '\n\n').replace('\n', '\r\n').encode() | |
self.transport.sendto(data) | |
self.cseq += 1 | |
def error_received(self, exc): | |
print('Error received:', exc) | |
def process_invite(self, headers): | |
m = NUMBER.search(headers['From']) | |
if m: | |
number = m.groups()[0] | |
else: | |
number = 'unknown' | |
debug('no number found', dict(headers)) | |
country = headers.get('X-Brand', None) | |
print(f'incoming call from {number}{" ({})".format(country) if country else ""}') | |
loop = asyncio.get_event_loop() | |
connect = loop.create_datagram_endpoint(lambda: EchoClientProtocol(), remote_addr=CONN) | |
transport, protocol = loop.run_until_complete(connect) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
finally: | |
transport.close() | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment