Last active
September 24, 2018 12:49
-
-
Save rcarmo/38ad71d8d0709beef7b4ce96798ff4e2 to your computer and use it in GitHub Desktop.
Quick and dirty SSDP/UPNP/Mediaroom discovery
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from http.client import HTTPResponse | |
from io import BytesIO | |
from config import log | |
from struct import pack | |
from socket import AF_INET, SOCK_DGRAM, INADDR_ANY, IPPROTO_IP, IPPROTO_UDP, IP_ADD_MEMBERSHIP, IP_MULTICAST_TTL, SOL_SOCKET, SO_REUSEADDR, SO_REUSEPORT, getaddrinfo, socket, setdefaulttimeout, inet_pton, timeout as SocketTimeout | |
from utils import etree_to_dict | |
from xml.etree import ElementTree | |
from time import time | |
from dateutil.parser import parse as parse_date | |
UPNP_PORT=1900 | |
MEDIAROOM_PORT=8082 | |
ADDRESS_GROUP="239.255.255.250" | |
class SSDPResponse(object): | |
class _wrapper(BytesIO): | |
def makefile(self, *args, **kw): | |
return self | |
def __init__(self, response, addr): | |
r = HTTPResponse(self._wrapper(response)) | |
r.begin() | |
self.location = r.getheader("location") | |
self.usn = r.getheader("usn") | |
self.st = r.getheader("st") | |
self.cache = r.getheader("cache-control").split("=")[1] | |
def __repr__(self): | |
return "<SSDPResponse({location}, {st}, {usn})>".format(**self.__dict__) | |
class MediaroomResponse(object): | |
class _wrapper(BytesIO): | |
def makefile(self, *args, **kw): | |
return self | |
def __init__(self, data, addr): | |
while data[-1:] == '\0': | |
data = data[:-1] | |
while data[:6] != b"NOTIFY": | |
data = data[1:] | |
self.data = data | |
log.debug(data) | |
self.location = "{}:{}".format(*addr) | |
for line in map(lambda x: x.strip(), data.decode().split('\n')): | |
if line.startswith("x-type"): | |
self.type = line[line.find(":")+2:] | |
elif line.startswith("x-filter"): | |
self.filter = line[line.find(":")+2:] | |
elif line.startswith("x-lastUserActivity"): | |
self.last_user_activity = parse_date(line[line.find(":")+2:]) | |
elif line.startswith("x-device"): | |
self.device = line[line.find(":")+2:] | |
elif line.startswith("x-debug") or\ | |
line.startswith("x-location") or\ | |
line.startswith("NOTIFY") or\ | |
line.startswith("\r"): | |
pass | |
elif line.startswith("<"): | |
self.status = etree_to_dict(ElementTree.fromstring(line))['node'] | |
self.tune = self.status['activities'].get('tune') | |
def __repr__(self): | |
return "<MediaroomResponse({location}, {last_user_activity}, {tune})>".format(**self.__dict__) | |
def discover(service='upnp:rootdevice', timeout=5, retries=1, mx=3, port=1900): | |
group = (ADDRESS_GROUP, port) | |
message = "\r\n".join([ | |
'M-SEARCH * HTTP/1.1', | |
'HOST:{0}:{1}', | |
'MAN:"ssdp:discover"', | |
'ST:{st}','MX:{mx}','','']) | |
setdefaulttimeout(timeout) | |
responses = {} | |
for _ in range(retries): | |
sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) | |
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | |
sock.setsockopt(IPPROTO_IP, IP_MULTICAST_TTL, 2) | |
try: | |
sock.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1) | |
except: | |
log.debug("cannot reuse port in this OS") | |
pass | |
if port == MEDIAROOM_PORT: | |
# build a multicast listener instead | |
addrinfo = getaddrinfo(ADDRESS_GROUP, None)[0] | |
group = inet_pton(addrinfo[0], addrinfo[4][0]) | |
membership = group + pack('=I', INADDR_ANY) | |
sock.setsockopt(IPPROTO_IP, IP_ADD_MEMBERSHIP, membership) | |
sock.bind(('', port)) | |
until = time() + timeout | |
while True: | |
if time() >= until: | |
break | |
try: | |
response = MediaroomResponse(*sock.recvfrom(1024)) | |
responses[response.location] = response | |
except SocketTimeout: | |
break | |
else: | |
message_bytes = message.format(*group, st=service, mx=mx).encode('utf-8') | |
sock.sendto(message_bytes, group) | |
while True: | |
try: | |
response = SSDPResponse(*sock.recvfrom(1024)) | |
responses[response.location] = response | |
except SocketTimeout: | |
break | |
return list(responses.values()) | |
if __name__ == '__main__': | |
#print(discover('urn:dial-multiscreen-org:service:dial:1')) | |
print(discover('upnp:rootdevice', retries=1, mx=2, port=MEDIAROOM_PORT)) | |
#print(discover('upnp:rootdevice', retries=5, mx=2, port=UPNP_PORT)) | |
#print(discover('urn:schemas-upnp-org:device:tvdevice:1')) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
def etree_to_dict(t, strip_namespaces=True): | |
if "}" in t.tag and strip_namespaces == True: | |
t.tag = t.tag.split('}',1)[1] | |
d = {t.tag: {} if t.attrib else None} | |
children = list(t) | |
if children: | |
dd = defaultdict(list) | |
for dc in map(etree_to_dict, children): | |
for k, v in dc.items(): | |
dd[k].append(v) | |
d = {t.tag: {k: v[0] if len(v) == 1 else v | |
for k, v in dd.items()}} | |
if t.attrib: | |
d[t.tag].update(('@' + k, v) | |
for k, v in t.attrib.items()) | |
if t.text: | |
text = t.text.strip() | |
if children or t.attrib: | |
if text: | |
d[t.tag]['#text'] = text | |
else: | |
d[t.tag] = text | |
return d |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment