-
-
Save ismaelgaudioso/4cff466459646e022332 to your computer and use it in GitHub Desktop.
Small SSDP server/client test in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import struct | |
import sys | |
from httplib import HTTPResponse | |
from BaseHTTPServer import BaseHTTPRequestHandler | |
from StringIO import StringIO | |
import gtk | |
import gobject | |
LIB_ID = 'my_library' | |
MCAST_GRP = '239.255.255.250' | |
MCAST_PORT = 1900 | |
SERVICE_LOCS = {'id1': '127.0.0.1:7766', 'id2': '127.0.0.1:7766'} | |
DISCOVERY_MSG = ('M-SEARCH * HTTP/1.1\r\n' + | |
'ST: %(library)s:%(service)s\r\n' + | |
'MX: 3\r\n' + | |
'MAN: "ssdp:discover"\r\n' + | |
'HOST: 239.255.255.250:1900\r\n\r\n') | |
LOCATION_MSG = ('HTTP/1.1 200 OK\r\n' + | |
'ST: %(library)s:%(service)s\r\n' | |
'USN: %(service)s\r\n' | |
'Location: %(loc)s\r\n' | |
'Cache-Control: max-age=900\r\n\r\n') | |
class Request(BaseHTTPRequestHandler): | |
def __init__(self, request_text): | |
self.rfile = StringIO(request_text) | |
self.raw_requestline = self.rfile.readline() | |
self.error_code = self.error_message = None | |
self.parse_request() | |
def send_error(self, code, message): | |
self.error_code = code | |
self.error_message = message | |
class Response(HTTPResponse): | |
def __init__(self, response_text): | |
self.fp = StringIO(response_text) | |
self.debuglevel = 0 | |
self.strict = 0 | |
self.msg = None | |
self._method = None | |
self.begin() | |
def interface_addresses(family=socket.AF_INET): | |
for fam, _, _, _, sockaddr in socket.getaddrinfo('', None): | |
if family == fam: | |
yield sockaddr[0] | |
def client(timeout=1, retries=5): | |
socket.setdefaulttimeout(timeout) | |
for _ in xrange(retries): | |
for addr in interface_addresses(): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) | |
sock.bind((addr, 0)) | |
msg = DISCOVERY_MSG % dict(service='id1', library=LIB_ID) | |
for _ in xrange(2): | |
# sending it more than once will | |
# decrease the probability of a timeout | |
sock.sendto(msg, (MCAST_GRP, MCAST_PORT)) | |
try: | |
data = sock.recv(1024) | |
except socket.timeout: | |
pass | |
else: | |
response = Response(data) | |
print response.getheader('Location') | |
return | |
def server(timeout=5): | |
socket.setdefaulttimeout(timeout) | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) | |
sock.bind(('', MCAST_PORT)) | |
mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) | |
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) | |
cond = gobject.IO_IN | gobject.IO_HUP | |
gobject.io_add_watch(sock, cond, handle_requests) | |
gtk.main() | |
def handle_requests(sock, _): | |
data, addr = sock.recvfrom(4096) | |
request = Request(data) | |
if not request.error_code and \ | |
request.command == 'M-SEARCH' and \ | |
request.path == '*' and \ | |
request.headers['ST'].startswith(LIB_ID) and \ | |
request.headers['MAN'] == '"ssdp:discover"': | |
service = request.headers['ST'].split(':', 2)[1] | |
if service in SERVICE_LOCS: | |
loc = SERVICE_LOCS[service] | |
msg = LOCATION_MSG % dict(service=service, loc=loc, library=LIB_ID) | |
sock.sendto(msg, addr) | |
return True | |
if __name__ == '__main__': | |
if len(sys.argv) > 1 and 'client' in sys.argv[1]: | |
client() | |
else: | |
server() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Python program that can send out M-SEARCH messages using SSDP (in server | |
# mode), or listen for SSDP messages (in client mode). | |
import sys | |
import socket | |
import string | |
import time | |
import random | |
import platform | |
from uuid import uuid4 | |
from twisted.internet import reactor, task, error | |
from twisted.internet.protocol import DatagramProtocol | |
from twisted.web.http import datetimeToString | |
SSDP_ADDR = '239.255.255.250' | |
SSDP_PORT = 1900 | |
SSDP_ST = "ssdp:all" | |
SERVER_VERSION = "0.1" | |
SERVER_ID = "/".join([platform.system(),platform.release(),'UPnP/1.0','Pyntuition UPnP',SERVER_VERSION]) | |
MS = ('M-SEARCH * HTTP/1.1\r\n' + | |
'ST: %s\r\n' % (SSDP_ST) + | |
'MX: 3\r\n' + | |
'MAN: "ssdp:discover"\r\n ' + | |
'HOST: %s:%d\r\n\r\n' % (SSDP_ADDR,SSDP_PORT)) | |
DEBUG_MODE = False | |
class ServerSSDP(DatagramProtocol): | |
def __init__(self, iface): | |
print self.version() | |
self.iface = "".join(iface) | |
self.known = {} | |
try: | |
self.port = reactor.listenMulticast(SSDP_PORT, self, listenMultiple=True) | |
self.port.joinGroup(SSDP_ADDR, interface=self.iface) | |
self.resend_notify_loop = task.LoopingCall(self.resendNotify) | |
self.resend_notify_loop.start(777.0, now=False) | |
self.check_valid_loop = task.LoopingCall(self.check_valid) | |
self.check_valid_loop.start(333.0, now=False) | |
except error.CannotListenError, err: | |
self.error("Error starting the SSDP-server: %s", err) | |
self.error("There seems to be already a SSDP server running on this host, no need starting a second one.") | |
self.active_calls = [] | |
def version(self): | |
version = "************\nPyNTUITION ServerSSDP Alpha 0.1\nAuthor: Ismael Gaudioso\n\nSERVER_ID: "+SERVER_ID+"\n************\n" | |
return version | |
def stop(self): | |
pass | |
def resendNotify(self): | |
self.msg("Resend Notify") | |
for usn in self.known: | |
if self.known[usn]['MANIFESTATION'] == 'local': | |
self.doNotify(usn) | |
def check_valid(self): | |
""" check if the discovered devices are still ok, or | |
if we haven't received a new discovery response | |
""" | |
self.debug("Checking devices/services are still valid") | |
removable = [] | |
for usn in self.known: | |
if self.known[usn]['MANIFESTATION'] != 'local': | |
_, expiry = self.known[usn]['CACHE-CONTROL'].split('=') | |
expiry = int(expiry) | |
now = time.time() | |
last_seen = self.known[usn]['last-seen'] | |
self.debug("Checking if %r is still valid - last seen %d (+%d), now %d" % (self.known[usn]['USN'], last_seen, expiry, now)) | |
if last_seen + expiry + 30 < now: | |
self.debug("Expiring: %r", self.known[usn]) | |
if self.known[usn]['ST'] == 'upnp:rootdevice': | |
#louie.send('Coherence.UPnP.SSDP.removed_device', None, device_type=self.known[usn]['ST'], infos=self.known[usn]) | |
self.msg("Linea 87... KK de la vaca") | |
removable.append(usn) | |
while len(removable) > 0: | |
usn = removable.pop(0) | |
del self.known[usn] | |
def datagramReceived(self, datagram, address): | |
try: | |
header, payload = datagram.split('\r\n\r\n')[:2] | |
except ValueError, err: | |
print err | |
print 'Arggg,', datagram | |
import pdb; pdb.set_trace() | |
lines = header.split('\r\n') | |
cmd = string.split(lines[0], ' ') | |
lines = map(lambda x: x.replace(': ', ':', 1), lines[1:]) | |
lines = filter(lambda x: len(x) > 0, lines) | |
headers = [string.split(x, ':', 1) for x in lines] | |
headers = dict(map(lambda x: (x[0].lower(), x[1]), headers)) | |
self.msg('SSDP command %s %s - from %s:%d' % (cmd[0], cmd[1], address[0], address[1])) | |
self.debug('with headers: %s' % headers) | |
if cmd[0] == 'M-SEARCH' and cmd[1] == '*': | |
# SSDP discovery | |
self.discoveryRequest(headers, address) | |
elif cmd[0] == 'NOTIFY' and cmd[1] == '*': | |
# SSDP presence | |
self.notifyReceived(headers, address) | |
else: | |
self.warning('Unknown SSDP command %s %s' % (cmd[0], cmd[1])) | |
def debug(self,msg): | |
if DEBUG_MODE: | |
print '[SERVER DEBUG] ' + msg | |
def warning(self,msg): | |
print '[SERVER WARNING] ' + msg | |
def msg(self,msg): | |
print '[SERVER INFO] '+ msg | |
def discoveryRequest(self, headers, (host, port)): | |
"""Process a discovery request. The response must be sent to | |
the address specified by (host, port).""" | |
self.msg('Discovery request from (%s,%d) for %s' % (host, port, headers['st'])) | |
self.msg('Discovery request for %s'% (headers['st'])) | |
if headers['man'] == '"ssdp:discover"' and headers['st'] == "ssdp:all": | |
new_usn = "uuid:"+str(uuid4()) | |
response = [] | |
response.append('HTTP/1.1 200 OK') | |
response.append('CACHE-CONTROL: max-age=1800') | |
response.append('DATE: %s' % datetimeToString()) | |
response.append('ST: %s' % new_usn) | |
response.append('USN: %s' % new_usn) | |
response.append('EXT:') | |
response.append('SERVER: %s' % SERVER_ID) | |
response.append('LOCATION: http://192.168.1.36/upnp/rootDesc.xml') | |
delay = random.randint(0, int(headers['mx'])) | |
location = "%s:%d" % (host,port) | |
self.register("local",new_usn,new_usn,location) | |
reactor.callLater(delay, self.send_it,'\r\n'.join(response), (host, port), delay, new_usn) | |
def send_it(self, response, destination, delay, usn): | |
self.msg('send discovery response delayed by %ds for %s to %r' %(delay, usn, destination)) | |
try: | |
self.transport.write(response, destination) | |
except (AttributeError, socket.error), msg: | |
self.msg("failure sending out byebye notification: %r" % msg) | |
def notifyReceived(self, headers, (host, port)): | |
"""Process a presence announcement. We just remember the | |
details of the SSDP service announced.""" | |
self.msg('Notification from (%s,%d) for %s' % (host, port, headers['nt'])) | |
self.debug('Notification headers: %s' % (headers)) | |
if headers['nts'] == 'ssdp:alive': | |
try: | |
self.known[headers['usn']]['last-seen'] = time.time() | |
self.debug('updating last-seen for %r' % (headers['usn'])) | |
except KeyError: | |
try: | |
self.register('remote', headers['usn'], headers['nt'], headers['location'],headers['server'], headers['cache-control'], host=host) | |
except KeyError: | |
print "nada" | |
elif headers['nts'] == 'ssdp:byebye': | |
if self.isKnown(headers['usn']): | |
self.unRegister(headers['usn']) | |
else: | |
self.warning('Unknown subtype %s for notification type %s' % (headers['nts'], headers['nt'])) | |
def register(self, manifestation, usn, st, location, | |
server=SERVER_ID, | |
cache_control='max-age=1800', | |
silent=False, | |
host=None): | |
"""Register a service or device that this SSDP server will | |
respond to.""" | |
self.msg('Registering %s' %(st)) | |
self.known[usn] = {} | |
self.known[usn]['USN'] = usn | |
self.known[usn]['LOCATION'] = location | |
self.known[usn]['ST'] = st | |
self.known[usn]['EXT'] = '' | |
self.known[usn]['SERVER'] = server | |
self.known[usn]['CACHE-CONTROL'] = cache_control | |
self.known[usn]['MANIFESTATION'] = manifestation | |
self.known[usn]['SILENT'] = silent | |
self.known[usn]['HOST'] = host | |
self.known[usn]['last-seen'] = time.time() | |
self.msg(",".join(self.known[usn])) | |
if manifestation == 'local': | |
self.doNotify(usn) | |
if st == 'upnp:rootdevice': | |
pass | |
#louie.send('Coherence.UPnP.SSDP.new_device', None, device_type=st, infos=self.known[usn]) | |
#self.callback("new_device", st, self.known[usn]) | |
def unRegister(self, usn): | |
self.msg("Un-registering %s" % usn) | |
st = self.known[usn]['ST'] | |
if st == 'upnp:rootdevice': | |
pass | |
#louie.send('Coherence.UPnP.SSDP.removed_device', None, device_type=st, infos=self.known[usn]) | |
#self.callback("removed_device", st, self.known[usn]) | |
del self.known[usn] | |
def doNotify(self, usn): | |
"""Do notification""" | |
if self.known[usn]['SILENT'] == True: | |
return | |
self.msg('Sending alive notification for %s' % usn) | |
resp = ['NOTIFY * HTTP/1.1', | |
'HOST: %s:%d' % (SSDP_ADDR, SSDP_PORT), | |
'NTS: ssdp:alive', | |
] | |
stcpy = dict(self.known[usn].iteritems()) | |
stcpy['NT'] = stcpy['ST'] | |
del stcpy['ST'] | |
del stcpy['MANIFESTATION'] | |
del stcpy['SILENT'] | |
del stcpy['HOST'] | |
del stcpy['last-seen'] | |
resp.extend(map(lambda x: ': '.join(x), stcpy.iteritems())) | |
resp.extend(('', '')) | |
self.debug('doNotify content %s' % resp) | |
try: | |
self.transport.write('\r\n'.join(resp), (SSDP_ADDR, SSDP_PORT)) | |
self.transport.write('\r\n'.join(resp), (SSDP_ADDR, SSDP_PORT)) | |
except (AttributeError, socket.error), msg: | |
self.msg("failure sending out alive notification: %r" % msg) | |
def isKnown(self, usn): | |
return self.known.has_key(usn) | |
if __name__ == "__main__": | |
if len(sys.argv) != 2: | |
print "Usage: %s <IP of interface>" % (sys.argv[0], ) | |
sys.exit(1) | |
iface = sys.argv[1:] | |
obj = ServerSSDP(iface) | |
reactor.addSystemEventTrigger('before', 'shutdown', obj.stop) | |
reactor.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Python program that can send out M-SEARCH messages using SSDP (in server | |
# mode), or listen for SSDP messages (in client mode). | |
import sys | |
import socket | |
from twisted.internet import reactor, task, error | |
from twisted.internet.protocol import DatagramProtocol | |
SSDP_ADDR = '239.255.255.250' | |
#SSDP_PORT = 2869 | |
SSDP_PORT = 1900 | |
SSDP_ST = "ssdp:all" | |
#SSDP_ST = "urn:schemas-sony-com:service:ScalarWebAPI:1"; | |
MS = ('M-SEARCH * HTTP/1.1\r\n' + | |
'ST: %s\r\n' % (SSDP_ST) + | |
'MX: 3\r\n' + | |
'MAN: "ssdp:discover"\r\n ' + | |
'HOST: %s:%d\r\n\r\n' % (SSDP_ADDR,SSDP_PORT)) | |
class Base(DatagramProtocol): | |
def datagramReceived(self, datagram, address): | |
first_line = datagram.rsplit('\r\n')[0] | |
print "Received %s from %r" % (first_line, address, ) | |
def stop(self): | |
pass | |
class Server(Base): | |
def __init__(self, iface): | |
self.iface = iface | |
#task.LoopingCall(self.send_msearch).start(6) # every 6th seconds | |
self.port = reactor.listenMulticast(SSDP_PORT, self, listenMultiple=True) | |
self.port.joinGroup(SSDP_ADDR, interface=interface) | |
#def send_msearch(self): | |
#port = reactor.listenUDP(0, self, interface=self.iface) | |
#print "Sending M-SEARCH..." | |
#port.write(MS, (SSDP_ADDR, SSDP_PORT)) | |
#reactor.callLater(2.5, port.stopListening) # MX + a wait margin | |
class Client(Base): | |
def __init__(self, iface): | |
'''self.iface = iface | |
self.ssdp = reactor.listenMulticast(SSDP_PORT, self, listenMultiple=True) | |
self.ssdp.setLoopbackMode(1) | |
self.ssdp.joinGroup(SSDP_ADDR, interface=iface) | |
print MS | |
''' | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.sendto(MS, ('192.168.1.36', SSDP_PORT)) | |
print "ENVIADO:\n"+MS+"\n\n\n" | |
print "RECIBIDO:\n"+sock.recv(1000) | |
def stop(self): | |
self.ssdp.leaveGroup(SSDP_ADDR, interface=self.iface) | |
self.ssdp.stopListening() | |
def main(mode, iface): | |
klass = Server if mode == 'server' else Client | |
obj = klass(iface) | |
reactor.addSystemEventTrigger('before', 'shutdown', obj.stop) | |
if __name__ == "__main__": | |
if len(sys.argv) != 3: | |
print "Usage: %s <server|client> <IP of interface>" % (sys.argv[0], ) | |
sys.exit(1) | |
mode, iface = sys.argv[1:] | |
reactor.callWhenRunning(main, mode, iface) | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment