Skip to content

Instantly share code, notes, and snippets.

@tai2
Created July 25, 2011 15:46
Show Gist options
  • Save tai2/1104429 to your computer and use it in GitHub Desktop.
Save tai2/1104429 to your computer and use it in GitHub Desktop.
UPnP Port Mapper
# UPnP Port Mapper
import os
import sys
import re
import urllib2
from xml import sax
from xml.sax import handler
from xml.sax.handler import feature_namespaces
import socket
import errno
from urlparse import urljoin
from urlparse import urlparse
import gc
class SoapAction:
def __init__(self, action, type, url):
self.action = action
self.type = type
self.url = url
self.args = []
def addArg(self, name, val=""):
self.args.append((name, val))
def send(self):
data = """<?xml version="1.0"?>
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/"
SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<SOAP-ENV:Body>
<m:%s xmlns:m="%s">
"""%(self.action, self.type)
for a in self.args:
data += "<%s>%s</%s>\n"%(a[0], a[1], a[0])
data += """</m:%s>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>
"""%(self.action)
req = urllib2.Request(self.url)
req.add_header('SoapAction', self.type + '#' + self.action)
req.add_data(data)
try:
r = urllib2.urlopen(req)
except urllib2.HTTPError:
r = None
return r
class FindControlURL(handler.ContentHandler):
def __init__(self, type, URLBase):
self.type = type
self.stack = []
self.resultURL = ""
self.URLBase = URLBase
self.serviceType = ""
self.controlURL = ""
def isConnected(self, controlURL):
act = SoapAction('GetStatusInfo', self.type, controlURL)
ret = act.send()
if ret:
parser = sax.make_parser()
parser.setFeature(feature_namespaces, 1)
h = PackElement('GetStatusInfoResponse')
parser.setContentHandler(h)
parser.parse(ret)
return 'Connected' == h.dict.get('NewConnectionStatus', "")
else:
return False
def startElement(self, name, attrs):
self.stack.append(name)
def endElement(self, name):
self.stack.pop()
if 'service' == name and self.serviceType == self.type:
url = urljoin(self.URLBase, self.controlURL)
if '' == self.resultURL and self.isConnected(url):
self.resultURL = url
def characters(self, content):
tag = self.stack[-1]
if 'serviceType' == tag:
self.serviceType = content
elif 'controlURL' == tag:
self.controlURL = content
elif 'URLBase' == tag:
self.URLBase = content
class PackElement(handler.ContentHandler):
def __init__(self, parent):
self.stack = []
self.dict = {}
self.parent = parent
def startElementNS(self, name, qname, attrs):
self.stack.append(name[1])
def endElementNS(self, name, qname):
self.stack.pop()
def characters(self, content):
if 2 <= len(self.stack):
parent = self.stack[-2]
if self.parent == parent:
tag = self.stack[-1]
self.dict[tag] = content
class Igd:
def __init(self):
self.type = ""
self.controlURL = ""
def find(self, multicast_if, time):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(time)
port = 20000
try:
s.bind(("", port))
except socket.error, msg:
return False
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(multicast_if))
ssdp_addr = ('239.255.255.250', 1900)
ssdp_msg = \
'M-SEARCH * HTTP/1.1\r\n' +\
'MX: 3\r\n' +\
'HOST: 239.255.255.250:1900\r\n' +\
'MAN: \"ssdp:discover\"\r\n' +\
'ST: urn:schemas-upnp-org:service:%s:1\r\n' +\
'\r\n'
s.sendto(ssdp_msg%('WANIPConnection'), ssdp_addr)
s.sendto(ssdp_msg%('WANPPPConnection'), ssdp_addr)
try:
str, addr = s.recvfrom(4096)
except socket.timeout:
print 'timeout. no igd response.'
return False
e = '^LOCATION:\s*(http:[.$,;:&=?!*~@#()/\w]+\.xml)'
m = re.search(e, str, re.M | re.I)
if m:
loc = m.group(1)
else:
return False
e = '^ST:\s*(urn:schemas-upnp-org:service:(WANIPConnection|WANPPPConnection):1)'
m = re.search(e, str, re.M | re.I)
if m:
type = m.group(1)
else:
return False
s.close()
parser = sax.make_parser()
parser.setFeature(feature_namespaces, 0)
url = urlparse(loc)
h = FindControlURL(type, url[0] + '://' + url[1])
parser.setContentHandler(h)
parser.parse(urllib2.urlopen(loc))
self.controlURL = h.resultURL
self.type = type
return True
def getMappingEntry(self, ext, prot):
assert( "" != self.type )
assert( "" != self.controlURL )
act = SoapAction("GetSpecificPortMappingEntry", self.type, self.controlURL)
act.addArg("NewRemoteHost")
act.addArg("NewExternalPort", ext[1])
act.addArg("NewProtocol", prot)
ret = act.send()
if ret:
parser = sax.make_parser()
parser.setFeature(feature_namespaces, 1)
h = PackElement('GetSpecificPortMappingEntryResponse')
parser.setContentHandler(h)
parser.parse(ret)
return h.dict['NewInternalClient'], h.dict['NewInternalPort']
else:
return None
def getGenericMappingEntry(self, index):
act = SoapAction('GetGenericPortMappingEntry', self.type, self.controlURL)
act.addArg('NewPortMappingIndex', index)
ret = act.send()
if ret:
parser = sax.make_parser()
parser.setFeature(feature_namespaces, 1)
h = PackElement('GetGenericPortMappingEntryResponse')
parser.setContentHandler(h)
parser.parse(ret)
return h.dict
else:
return None
def getExternalIP(self):
act = SoapAction("GetExternalIPAddress", self.type, self.controlURL)
ret = act.send()
if ret:
parser = sax.make_parser()
parser.setFeature(feature_namespaces, 1)
h = PackElement('GetExternalIPAddressResponse')
parser.setContentHandler(h)
parser.parse(ret)
return h.dict.get('NewExternalIPAddress', None)
else:
return None
def addMapping(self, ext, int, prot, desc):
assert( "" != self.type )
assert( "" != self.controlURL )
act = SoapAction("AddPortMapping", self.type, self.controlURL)
act.addArg("NewRemoteHost", ext[0])
act.addArg("NewExternalPort", ext[1])
act.addArg("NewProtocol", prot)
act.addArg("NewInternalPort", int[1])
act.addArg("NewInternalClient", int[0])
act.addArg("NewEnabled", "1")
act.addArg("NewPortMappingDescription", desc)
act.addArg("NewLeaseDuration", "0")
return None != act.send()
def deleteMapping(self, ext, prot):
assert( "" != self.type )
assert( "" != self.controlURL )
act = SoapAction("DeletePortMapping", self.type, self.controlURL)
act.addArg("NewRemoteHost")
act.addArg("NewExternalPort", ext[1])
act.addArg("NewProtocol", prot)
return None != act.send()
def rewriteMapping(self, ext, int, prot, desc):
assert( "" != self.type )
assert( "" != self.controlURL )
ret = self.getMappingEntry(ext, prot)
if None != ret:
if int == ret:
return 1
else:
if self.deleteMapping(ext, prot):
if self.addMapping(ext, int, prot, desc):
return 2
else:
if self.addMapping(ext, int, prot, desc):
return 2
return 0
if __name__ == '__main__':
def add_port_mapping(hostaddr, external_port, internal_port):
socket.setdefaulttimeout(10)
try:
igd = Igd()
if not(igd.find(hostaddr, 3)):
return False
except Exception, info:
return False
ret = igd.rewriteMapping(('', external_port), (hostaddr, internal_port), 'TCP', 'UPnP test')
if 2 == ret:
print 'added a mapping entry'
elif 0 == ret:
print 'adding a mapping entry failed'
return False
gc.collect() # for python bug 1208304
return True
def delete_port_mapping(hostaddr, external_port):
socket.setdefaulttimeout(10)
igd = Igd()
if not(igd.find(hostaddr, 3)):
print 'deleting a mapping entry failed'
return False
if not(igd.deleteMapping(('', external_port), 'TCP')):
print 'deleting a mapping entry failed'
return False
gc.collect() # for python bug 1208304
print 'deleted a mapping entry'
return True
add_port_mapping('192.168.0.2', 8081, 8081)
delete_port_mapping('192.168.0.2', 8081)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment