Created
July 25, 2011 15:46
-
-
Save tai2/1104429 to your computer and use it in GitHub Desktop.
UPnP Port Mapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# UPnP Port Mapper | |
import os | |
import sys | |
import re | |
import urllib2 | |
from xml import sax | |
from xml.sax import handler | |
from xml.sax.handler import feature_namespaces | |
import socket | |
import errno | |
from urlparse import urljoin | |
from urlparse import urlparse | |
import gc | |
class SoapAction: | |
def __init__(self, action, type, url): | |
self.action = action | |
self.type = type | |
self.url = url | |
self.args = [] | |
def addArg(self, name, val=""): | |
self.args.append((name, val)) | |
def send(self): | |
data = """<?xml version="1.0"?> | |
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/" | |
SOAP-ENV:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"> | |
<SOAP-ENV:Body> | |
<m:%s xmlns:m="%s"> | |
"""%(self.action, self.type) | |
for a in self.args: | |
data += "<%s>%s</%s>\n"%(a[0], a[1], a[0]) | |
data += """</m:%s> | |
</SOAP-ENV:Body> | |
</SOAP-ENV:Envelope> | |
"""%(self.action) | |
req = urllib2.Request(self.url) | |
req.add_header('SoapAction', self.type + '#' + self.action) | |
req.add_data(data) | |
try: | |
r = urllib2.urlopen(req) | |
except urllib2.HTTPError: | |
r = None | |
return r | |
class FindControlURL(handler.ContentHandler): | |
def __init__(self, type, URLBase): | |
self.type = type | |
self.stack = [] | |
self.resultURL = "" | |
self.URLBase = URLBase | |
self.serviceType = "" | |
self.controlURL = "" | |
def isConnected(self, controlURL): | |
act = SoapAction('GetStatusInfo', self.type, controlURL) | |
ret = act.send() | |
if ret: | |
parser = sax.make_parser() | |
parser.setFeature(feature_namespaces, 1) | |
h = PackElement('GetStatusInfoResponse') | |
parser.setContentHandler(h) | |
parser.parse(ret) | |
return 'Connected' == h.dict.get('NewConnectionStatus', "") | |
else: | |
return False | |
def startElement(self, name, attrs): | |
self.stack.append(name) | |
def endElement(self, name): | |
self.stack.pop() | |
if 'service' == name and self.serviceType == self.type: | |
url = urljoin(self.URLBase, self.controlURL) | |
if '' == self.resultURL and self.isConnected(url): | |
self.resultURL = url | |
def characters(self, content): | |
tag = self.stack[-1] | |
if 'serviceType' == tag: | |
self.serviceType = content | |
elif 'controlURL' == tag: | |
self.controlURL = content | |
elif 'URLBase' == tag: | |
self.URLBase = content | |
class PackElement(handler.ContentHandler): | |
def __init__(self, parent): | |
self.stack = [] | |
self.dict = {} | |
self.parent = parent | |
def startElementNS(self, name, qname, attrs): | |
self.stack.append(name[1]) | |
def endElementNS(self, name, qname): | |
self.stack.pop() | |
def characters(self, content): | |
if 2 <= len(self.stack): | |
parent = self.stack[-2] | |
if self.parent == parent: | |
tag = self.stack[-1] | |
self.dict[tag] = content | |
class Igd: | |
def __init(self): | |
self.type = "" | |
self.controlURL = "" | |
def find(self, multicast_if, time): | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.settimeout(time) | |
port = 20000 | |
try: | |
s.bind(("", port)) | |
except socket.error, msg: | |
return False | |
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1) | |
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(multicast_if)) | |
ssdp_addr = ('239.255.255.250', 1900) | |
ssdp_msg = \ | |
'M-SEARCH * HTTP/1.1\r\n' +\ | |
'MX: 3\r\n' +\ | |
'HOST: 239.255.255.250:1900\r\n' +\ | |
'MAN: \"ssdp:discover\"\r\n' +\ | |
'ST: urn:schemas-upnp-org:service:%s:1\r\n' +\ | |
'\r\n' | |
s.sendto(ssdp_msg%('WANIPConnection'), ssdp_addr) | |
s.sendto(ssdp_msg%('WANPPPConnection'), ssdp_addr) | |
try: | |
str, addr = s.recvfrom(4096) | |
except socket.timeout: | |
print 'timeout. no igd response.' | |
return False | |
e = '^LOCATION:\s*(http:[.$,;:&=?!*~@#()/\w]+\.xml)' | |
m = re.search(e, str, re.M | re.I) | |
if m: | |
loc = m.group(1) | |
else: | |
return False | |
e = '^ST:\s*(urn:schemas-upnp-org:service:(WANIPConnection|WANPPPConnection):1)' | |
m = re.search(e, str, re.M | re.I) | |
if m: | |
type = m.group(1) | |
else: | |
return False | |
s.close() | |
parser = sax.make_parser() | |
parser.setFeature(feature_namespaces, 0) | |
url = urlparse(loc) | |
h = FindControlURL(type, url[0] + '://' + url[1]) | |
parser.setContentHandler(h) | |
parser.parse(urllib2.urlopen(loc)) | |
self.controlURL = h.resultURL | |
self.type = type | |
return True | |
def getMappingEntry(self, ext, prot): | |
assert( "" != self.type ) | |
assert( "" != self.controlURL ) | |
act = SoapAction("GetSpecificPortMappingEntry", self.type, self.controlURL) | |
act.addArg("NewRemoteHost") | |
act.addArg("NewExternalPort", ext[1]) | |
act.addArg("NewProtocol", prot) | |
ret = act.send() | |
if ret: | |
parser = sax.make_parser() | |
parser.setFeature(feature_namespaces, 1) | |
h = PackElement('GetSpecificPortMappingEntryResponse') | |
parser.setContentHandler(h) | |
parser.parse(ret) | |
return h.dict['NewInternalClient'], h.dict['NewInternalPort'] | |
else: | |
return None | |
def getGenericMappingEntry(self, index): | |
act = SoapAction('GetGenericPortMappingEntry', self.type, self.controlURL) | |
act.addArg('NewPortMappingIndex', index) | |
ret = act.send() | |
if ret: | |
parser = sax.make_parser() | |
parser.setFeature(feature_namespaces, 1) | |
h = PackElement('GetGenericPortMappingEntryResponse') | |
parser.setContentHandler(h) | |
parser.parse(ret) | |
return h.dict | |
else: | |
return None | |
def getExternalIP(self): | |
act = SoapAction("GetExternalIPAddress", self.type, self.controlURL) | |
ret = act.send() | |
if ret: | |
parser = sax.make_parser() | |
parser.setFeature(feature_namespaces, 1) | |
h = PackElement('GetExternalIPAddressResponse') | |
parser.setContentHandler(h) | |
parser.parse(ret) | |
return h.dict.get('NewExternalIPAddress', None) | |
else: | |
return None | |
def addMapping(self, ext, int, prot, desc): | |
assert( "" != self.type ) | |
assert( "" != self.controlURL ) | |
act = SoapAction("AddPortMapping", self.type, self.controlURL) | |
act.addArg("NewRemoteHost", ext[0]) | |
act.addArg("NewExternalPort", ext[1]) | |
act.addArg("NewProtocol", prot) | |
act.addArg("NewInternalPort", int[1]) | |
act.addArg("NewInternalClient", int[0]) | |
act.addArg("NewEnabled", "1") | |
act.addArg("NewPortMappingDescription", desc) | |
act.addArg("NewLeaseDuration", "0") | |
return None != act.send() | |
def deleteMapping(self, ext, prot): | |
assert( "" != self.type ) | |
assert( "" != self.controlURL ) | |
act = SoapAction("DeletePortMapping", self.type, self.controlURL) | |
act.addArg("NewRemoteHost") | |
act.addArg("NewExternalPort", ext[1]) | |
act.addArg("NewProtocol", prot) | |
return None != act.send() | |
def rewriteMapping(self, ext, int, prot, desc): | |
assert( "" != self.type ) | |
assert( "" != self.controlURL ) | |
ret = self.getMappingEntry(ext, prot) | |
if None != ret: | |
if int == ret: | |
return 1 | |
else: | |
if self.deleteMapping(ext, prot): | |
if self.addMapping(ext, int, prot, desc): | |
return 2 | |
else: | |
if self.addMapping(ext, int, prot, desc): | |
return 2 | |
return 0 | |
if __name__ == '__main__': | |
def add_port_mapping(hostaddr, external_port, internal_port): | |
socket.setdefaulttimeout(10) | |
try: | |
igd = Igd() | |
if not(igd.find(hostaddr, 3)): | |
return False | |
except Exception, info: | |
return False | |
ret = igd.rewriteMapping(('', external_port), (hostaddr, internal_port), 'TCP', 'UPnP test') | |
if 2 == ret: | |
print 'added a mapping entry' | |
elif 0 == ret: | |
print 'adding a mapping entry failed' | |
return False | |
gc.collect() # for python bug 1208304 | |
return True | |
def delete_port_mapping(hostaddr, external_port): | |
socket.setdefaulttimeout(10) | |
igd = Igd() | |
if not(igd.find(hostaddr, 3)): | |
print 'deleting a mapping entry failed' | |
return False | |
if not(igd.deleteMapping(('', external_port), 'TCP')): | |
print 'deleting a mapping entry failed' | |
return False | |
gc.collect() # for python bug 1208304 | |
print 'deleted a mapping entry' | |
return True | |
add_port_mapping('192.168.0.2', 8081, 8081) | |
delete_port_mapping('192.168.0.2', 8081) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment