Skip to content

Instantly share code, notes, and snippets.

@ismaelgaudioso
Forked from provegard/ssdp-test.py
Last active May 4, 2022 07:52
Show Gist options
  • Save ismaelgaudioso/4cff466459646e022332 to your computer and use it in GitHub Desktop.
Save ismaelgaudioso/4cff466459646e022332 to your computer and use it in GitHub Desktop.
Small SSDP server/client test in Python
import socket
import struct
import sys
from httplib import HTTPResponse
from BaseHTTPServer import BaseHTTPRequestHandler
from StringIO import StringIO
import gtk
import gobject
LIB_ID = 'my_library'
MCAST_GRP = '239.255.255.250'
MCAST_PORT = 1900
SERVICE_LOCS = {'id1': '127.0.0.1:7766', 'id2': '127.0.0.1:7766'}
DISCOVERY_MSG = ('M-SEARCH * HTTP/1.1\r\n' +
'ST: %(library)s:%(service)s\r\n' +
'MX: 3\r\n' +
'MAN: "ssdp:discover"\r\n' +
'HOST: 239.255.255.250:1900\r\n\r\n')
LOCATION_MSG = ('HTTP/1.1 200 OK\r\n' +
'ST: %(library)s:%(service)s\r\n'
'USN: %(service)s\r\n'
'Location: %(loc)s\r\n'
'Cache-Control: max-age=900\r\n\r\n')
class Request(BaseHTTPRequestHandler):
def __init__(self, request_text):
self.rfile = StringIO(request_text)
self.raw_requestline = self.rfile.readline()
self.error_code = self.error_message = None
self.parse_request()
def send_error(self, code, message):
self.error_code = code
self.error_message = message
class Response(HTTPResponse):
def __init__(self, response_text):
self.fp = StringIO(response_text)
self.debuglevel = 0
self.strict = 0
self.msg = None
self._method = None
self.begin()
def interface_addresses(family=socket.AF_INET):
for fam, _, _, _, sockaddr in socket.getaddrinfo('', None):
if family == fam:
yield sockaddr[0]
def client(timeout=1, retries=5):
socket.setdefaulttimeout(timeout)
for _ in xrange(retries):
for addr in interface_addresses():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.bind((addr, 0))
msg = DISCOVERY_MSG % dict(service='id1', library=LIB_ID)
for _ in xrange(2):
# sending it more than once will
# decrease the probability of a timeout
sock.sendto(msg, (MCAST_GRP, MCAST_PORT))
try:
data = sock.recv(1024)
except socket.timeout:
pass
else:
response = Response(data)
print response.getheader('Location')
return
def server(timeout=5):
socket.setdefaulttimeout(timeout)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.bind(('', MCAST_PORT))
mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
cond = gobject.IO_IN | gobject.IO_HUP
gobject.io_add_watch(sock, cond, handle_requests)
gtk.main()
def handle_requests(sock, _):
data, addr = sock.recvfrom(4096)
request = Request(data)
if not request.error_code and \
request.command == 'M-SEARCH' and \
request.path == '*' and \
request.headers['ST'].startswith(LIB_ID) and \
request.headers['MAN'] == '"ssdp:discover"':
service = request.headers['ST'].split(':', 2)[1]
if service in SERVICE_LOCS:
loc = SERVICE_LOCS[service]
msg = LOCATION_MSG % dict(service=service, loc=loc, library=LIB_ID)
sock.sendto(msg, addr)
return True
if __name__ == '__main__':
if len(sys.argv) > 1 and 'client' in sys.argv[1]:
client()
else:
server()
#!/usr/bin/python
# Python program that can send out M-SEARCH messages using SSDP (in server
# mode), or listen for SSDP messages (in client mode).
import sys
import socket
import string
import time
import random
import platform
from uuid import uuid4
from twisted.internet import reactor, task, error
from twisted.internet.protocol import DatagramProtocol
from twisted.web.http import datetimeToString
SSDP_ADDR = '239.255.255.250'
SSDP_PORT = 1900
SSDP_ST = "ssdp:all"
SERVER_VERSION = "0.1"
SERVER_ID = "/".join([platform.system(),platform.release(),'UPnP/1.0','Pyntuition UPnP',SERVER_VERSION])
MS = ('M-SEARCH * HTTP/1.1\r\n' +
'ST: %s\r\n' % (SSDP_ST) +
'MX: 3\r\n' +
'MAN: "ssdp:discover"\r\n ' +
'HOST: %s:%d\r\n\r\n' % (SSDP_ADDR,SSDP_PORT))
DEBUG_MODE = False
class ServerSSDP(DatagramProtocol):
def __init__(self, iface):
print self.version()
self.iface = "".join(iface)
self.known = {}
try:
self.port = reactor.listenMulticast(SSDP_PORT, self, listenMultiple=True)
self.port.joinGroup(SSDP_ADDR, interface=self.iface)
self.resend_notify_loop = task.LoopingCall(self.resendNotify)
self.resend_notify_loop.start(777.0, now=False)
self.check_valid_loop = task.LoopingCall(self.check_valid)
self.check_valid_loop.start(333.0, now=False)
except error.CannotListenError, err:
self.error("Error starting the SSDP-server: %s", err)
self.error("There seems to be already a SSDP server running on this host, no need starting a second one.")
self.active_calls = []
def version(self):
version = "************\nPyNTUITION ServerSSDP Alpha 0.1\nAuthor: Ismael Gaudioso\n\nSERVER_ID: "+SERVER_ID+"\n************\n"
return version
def stop(self):
pass
def resendNotify(self):
self.msg("Resend Notify")
for usn in self.known:
if self.known[usn]['MANIFESTATION'] == 'local':
self.doNotify(usn)
def check_valid(self):
""" check if the discovered devices are still ok, or
if we haven't received a new discovery response
"""
self.debug("Checking devices/services are still valid")
removable = []
for usn in self.known:
if self.known[usn]['MANIFESTATION'] != 'local':
_, expiry = self.known[usn]['CACHE-CONTROL'].split('=')
expiry = int(expiry)
now = time.time()
last_seen = self.known[usn]['last-seen']
self.debug("Checking if %r is still valid - last seen %d (+%d), now %d" % (self.known[usn]['USN'], last_seen, expiry, now))
if last_seen + expiry + 30 < now:
self.debug("Expiring: %r", self.known[usn])
if self.known[usn]['ST'] == 'upnp:rootdevice':
#louie.send('Coherence.UPnP.SSDP.removed_device', None, device_type=self.known[usn]['ST'], infos=self.known[usn])
self.msg("Linea 87... KK de la vaca")
removable.append(usn)
while len(removable) > 0:
usn = removable.pop(0)
del self.known[usn]
def datagramReceived(self, datagram, address):
try:
header, payload = datagram.split('\r\n\r\n')[:2]
except ValueError, err:
print err
print 'Arggg,', datagram
import pdb; pdb.set_trace()
lines = header.split('\r\n')
cmd = string.split(lines[0], ' ')
lines = map(lambda x: x.replace(': ', ':', 1), lines[1:])
lines = filter(lambda x: len(x) > 0, lines)
headers = [string.split(x, ':', 1) for x in lines]
headers = dict(map(lambda x: (x[0].lower(), x[1]), headers))
self.msg('SSDP command %s %s - from %s:%d' % (cmd[0], cmd[1], address[0], address[1]))
self.debug('with headers: %s' % headers)
if cmd[0] == 'M-SEARCH' and cmd[1] == '*':
# SSDP discovery
self.discoveryRequest(headers, address)
elif cmd[0] == 'NOTIFY' and cmd[1] == '*':
# SSDP presence
self.notifyReceived(headers, address)
else:
self.warning('Unknown SSDP command %s %s' % (cmd[0], cmd[1]))
def debug(self,msg):
if DEBUG_MODE:
print '[SERVER DEBUG] ' + msg
def warning(self,msg):
print '[SERVER WARNING] ' + msg
def msg(self,msg):
print '[SERVER INFO] '+ msg
def discoveryRequest(self, headers, (host, port)):
"""Process a discovery request. The response must be sent to
the address specified by (host, port)."""
self.msg('Discovery request from (%s,%d) for %s' % (host, port, headers['st']))
self.msg('Discovery request for %s'% (headers['st']))
if headers['man'] == '"ssdp:discover"' and headers['st'] == "ssdp:all":
new_usn = "uuid:"+str(uuid4())
response = []
response.append('HTTP/1.1 200 OK')
response.append('CACHE-CONTROL: max-age=1800')
response.append('DATE: %s' % datetimeToString())
response.append('ST: %s' % new_usn)
response.append('USN: %s' % new_usn)
response.append('EXT:')
response.append('SERVER: %s' % SERVER_ID)
response.append('LOCATION: http://192.168.1.36/upnp/rootDesc.xml')
delay = random.randint(0, int(headers['mx']))
location = "%s:%d" % (host,port)
self.register("local",new_usn,new_usn,location)
reactor.callLater(delay, self.send_it,'\r\n'.join(response), (host, port), delay, new_usn)
def send_it(self, response, destination, delay, usn):
self.msg('send discovery response delayed by %ds for %s to %r' %(delay, usn, destination))
try:
self.transport.write(response, destination)
except (AttributeError, socket.error), msg:
self.msg("failure sending out byebye notification: %r" % msg)
def notifyReceived(self, headers, (host, port)):
"""Process a presence announcement. We just remember the
details of the SSDP service announced."""
self.msg('Notification from (%s,%d) for %s' % (host, port, headers['nt']))
self.debug('Notification headers: %s' % (headers))
if headers['nts'] == 'ssdp:alive':
try:
self.known[headers['usn']]['last-seen'] = time.time()
self.debug('updating last-seen for %r' % (headers['usn']))
except KeyError:
try:
self.register('remote', headers['usn'], headers['nt'], headers['location'],headers['server'], headers['cache-control'], host=host)
except KeyError:
print "nada"
elif headers['nts'] == 'ssdp:byebye':
if self.isKnown(headers['usn']):
self.unRegister(headers['usn'])
else:
self.warning('Unknown subtype %s for notification type %s' % (headers['nts'], headers['nt']))
def register(self, manifestation, usn, st, location,
server=SERVER_ID,
cache_control='max-age=1800',
silent=False,
host=None):
"""Register a service or device that this SSDP server will
respond to."""
self.msg('Registering %s' %(st))
self.known[usn] = {}
self.known[usn]['USN'] = usn
self.known[usn]['LOCATION'] = location
self.known[usn]['ST'] = st
self.known[usn]['EXT'] = ''
self.known[usn]['SERVER'] = server
self.known[usn]['CACHE-CONTROL'] = cache_control
self.known[usn]['MANIFESTATION'] = manifestation
self.known[usn]['SILENT'] = silent
self.known[usn]['HOST'] = host
self.known[usn]['last-seen'] = time.time()
self.msg(",".join(self.known[usn]))
if manifestation == 'local':
self.doNotify(usn)
if st == 'upnp:rootdevice':
pass
#louie.send('Coherence.UPnP.SSDP.new_device', None, device_type=st, infos=self.known[usn])
#self.callback("new_device", st, self.known[usn])
def unRegister(self, usn):
self.msg("Un-registering %s" % usn)
st = self.known[usn]['ST']
if st == 'upnp:rootdevice':
pass
#louie.send('Coherence.UPnP.SSDP.removed_device', None, device_type=st, infos=self.known[usn])
#self.callback("removed_device", st, self.known[usn])
del self.known[usn]
def doNotify(self, usn):
"""Do notification"""
if self.known[usn]['SILENT'] == True:
return
self.msg('Sending alive notification for %s' % usn)
resp = ['NOTIFY * HTTP/1.1',
'HOST: %s:%d' % (SSDP_ADDR, SSDP_PORT),
'NTS: ssdp:alive',
]
stcpy = dict(self.known[usn].iteritems())
stcpy['NT'] = stcpy['ST']
del stcpy['ST']
del stcpy['MANIFESTATION']
del stcpy['SILENT']
del stcpy['HOST']
del stcpy['last-seen']
resp.extend(map(lambda x: ': '.join(x), stcpy.iteritems()))
resp.extend(('', ''))
self.debug('doNotify content %s' % resp)
try:
self.transport.write('\r\n'.join(resp), (SSDP_ADDR, SSDP_PORT))
self.transport.write('\r\n'.join(resp), (SSDP_ADDR, SSDP_PORT))
except (AttributeError, socket.error), msg:
self.msg("failure sending out alive notification: %r" % msg)
def isKnown(self, usn):
return self.known.has_key(usn)
if __name__ == "__main__":
if len(sys.argv) != 2:
print "Usage: %s <IP of interface>" % (sys.argv[0], )
sys.exit(1)
iface = sys.argv[1:]
obj = ServerSSDP(iface)
reactor.addSystemEventTrigger('before', 'shutdown', obj.stop)
reactor.run()
#!/usr/bin/python
# Python program that can send out M-SEARCH messages using SSDP (in server
# mode), or listen for SSDP messages (in client mode).
import sys
import socket
from twisted.internet import reactor, task, error
from twisted.internet.protocol import DatagramProtocol
SSDP_ADDR = '239.255.255.250'
#SSDP_PORT = 2869
SSDP_PORT = 1900
SSDP_ST = "ssdp:all"
#SSDP_ST = "urn:schemas-sony-com:service:ScalarWebAPI:1";
MS = ('M-SEARCH * HTTP/1.1\r\n' +
'ST: %s\r\n' % (SSDP_ST) +
'MX: 3\r\n' +
'MAN: "ssdp:discover"\r\n ' +
'HOST: %s:%d\r\n\r\n' % (SSDP_ADDR,SSDP_PORT))
class Base(DatagramProtocol):
def datagramReceived(self, datagram, address):
first_line = datagram.rsplit('\r\n')[0]
print "Received %s from %r" % (first_line, address, )
def stop(self):
pass
class Server(Base):
def __init__(self, iface):
self.iface = iface
#task.LoopingCall(self.send_msearch).start(6) # every 6th seconds
self.port = reactor.listenMulticast(SSDP_PORT, self, listenMultiple=True)
self.port.joinGroup(SSDP_ADDR, interface=interface)
#def send_msearch(self):
#port = reactor.listenUDP(0, self, interface=self.iface)
#print "Sending M-SEARCH..."
#port.write(MS, (SSDP_ADDR, SSDP_PORT))
#reactor.callLater(2.5, port.stopListening) # MX + a wait margin
class Client(Base):
def __init__(self, iface):
'''self.iface = iface
self.ssdp = reactor.listenMulticast(SSDP_PORT, self, listenMultiple=True)
self.ssdp.setLoopbackMode(1)
self.ssdp.joinGroup(SSDP_ADDR, interface=iface)
print MS
'''
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(MS, ('192.168.1.36', SSDP_PORT))
print "ENVIADO:\n"+MS+"\n\n\n"
print "RECIBIDO:\n"+sock.recv(1000)
def stop(self):
self.ssdp.leaveGroup(SSDP_ADDR, interface=self.iface)
self.ssdp.stopListening()
def main(mode, iface):
klass = Server if mode == 'server' else Client
obj = klass(iface)
reactor.addSystemEventTrigger('before', 'shutdown', obj.stop)
if __name__ == "__main__":
if len(sys.argv) != 3:
print "Usage: %s <server|client> <IP of interface>" % (sys.argv[0], )
sys.exit(1)
mode, iface = sys.argv[1:]
reactor.callWhenRunning(main, mode, iface)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment