Created
January 16, 2022 03:29
-
-
Save llamafilm/d9731f4f9b48b7c20c4b8eccaa0e52ef to your computer and use it in GitHub Desktop.
unfinished partial implementation of SAP (Session Announcement Protocol)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import time | |
import sdp_transform | |
# unfinished partial implementation of SAP (Session Announcement Protocol) | |
# https://datatracker.ietf.org/doc/html/rfc2974 | |
MCAST_GRP = '239.255.255.255' | |
MCAST_PORT = 9875 | |
MCAST_TTL = 255 | |
SDP_FILE = '''v=0 | |
o=- 3 0 IN IP4 10.49.51.51 | |
s=CP950A 1-8 | |
c=IN IP4 239.69.83.67/32 | |
t=0 0 | |
m=audio 6518 RTP/AVP 96 | |
a=clock-domain:PTPv2 0 | |
a=rtpmap:96 L24/48000/8 | |
a=recvonly | |
a=framecount:48 | |
a=sync-time:0 | |
a=ptime:1 | |
a=ts-refclk:ptp=IEEE1588-2008:00-60-DB-FF-FE-01-02-C8:0 | |
a=mediaclk:direct=0 | |
''' | |
def build_header(type, src_ip): | |
if type == 'announce': | |
# Flags | |
x = 32 | |
header = x.to_bytes(1, 'big') | |
else: | |
pass | |
# Authentication Length | |
x = 0 | |
header += x.to_bytes(1, 'big') | |
# header Identifier Hash | |
x = 1 | |
header += x.to_bytes(2, 'big') | |
# Originating Source | |
# TODO: handle IPv6 source | |
header += int(src_ip.split('.')[0]).to_bytes(1, 'big') | |
header += int(src_ip.split('.')[1]).to_bytes(1, 'big') | |
header += int(src_ip.split('.')[2]).to_bytes(1, 'big') | |
header += int(src_ip.split('.')[3]).to_bytes(1, 'big') | |
# Payload Type | |
header += bytes('application/sdp', 'ascii') | |
x = 0 | |
header += x.to_bytes(1, 'big') | |
return header | |
def main(): | |
# simp | |
try: | |
sdp_dict = sdp_transform.parse(SDP_FILE) | |
except Exception as e: | |
print('Failed to parse SDP file', SDP_FILE, e) | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, MCAST_TTL) | |
header = build_header('announce', sdp_dict['origin']['address']) | |
message = header | |
# SDP | |
message += SDP_FILE.encode('ascii') | |
while True: | |
print('Sending packet') | |
sock.sendto(message, (MCAST_GRP, MCAST_PORT)) | |
#break # only run once for testing | |
time.sleep(5) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment