Created
August 24, 2014 17:03
-
-
Save HuangFJ/0b5e0dd716d4ce4971a1 to your computer and use it in GitHub Desktop.
NAT Traversal via UPnP Port Mapping
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NAT Traversal via UPnP Port Mapping | |
# Written by Nikos Fotoulis <nikofot at gmx.com> | |
# This code is public domain. | |
# | |
# Tested on Thomsom TG858v7 modem router. | |
# UPnP is hairy. May not work with other routers | |
# Feedback is welcome. | |
# | |
# How to add multicast address on Mac OS please refer to | |
# http://blogs.agilefaqs.com/2009/11/08/enabling-multicast-on-your-macos-unix/ | |
import re, thread, socket, traceback as tb, random | |
from time import sleep | |
from urlparse import urlparse | |
from urllib import urlopen | |
import urllib2 | |
VERBOSE = VVERBOSE = False | |
DEFAULT_ADDR = UPNPS = None | |
# regexes | |
rWANIP = re.compile(r"ST:[^\n]*(WAN(IP|PPP)Connection:\d+)", re.I).search | |
rLOCATION = re.compile(r"LoCaTiON:([^\n]+)", re.I).search | |
def rTAG(t): | |
return re.compile("<%s>(.+?)</%s>" % (t, t), re.I | re.DOTALL) | |
rSERVICE = rTAG("service").findall | |
for tag in ["controlURL", "URLBase", "NewExternalIPAddress", "NewLeaseDuration", "NewProtocol", | |
"NewInternalClient", "NewExternalPort", "NewInternalPort"]: | |
def f(txt, r=rTAG(tag).search): | |
x = r(txt) | |
if x: | |
return x.groups()[0].strip() | |
if tag.startswith("New"): | |
tag = tag[3:] | |
globals()["r" + tag.upper()] = f | |
# multicast and discover UPnP gateways | |
# Returns a dictionary where the keys are our "external IP" addresses | |
def DiscoverUPnP(): | |
global UPNPS, DEFAULTGW, DEFAULTIFACE, DEFAULT_ADDR | |
S = {} | |
UPNPS = {} | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) | |
# s.setsockopt (socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
R = "M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: ssdp:discover\r\nMX: 10\r\nST: ssdp:all\r\n\r\n" | |
try: | |
s.sendto(R, ("239.255.255.250", 1900)) | |
except: | |
print "UPnP gateways unreachable" | |
return | |
timeout = 30 | |
while 1: | |
s.settimeout(timeout) | |
try: | |
data, addr = s.recvfrom(4096) | |
except: | |
break | |
timeout = max(timeout * 0.5, 0.01) | |
r = rWANIP(data) | |
if r: | |
service = r.groups()[0] | |
r = rLOCATION(data) | |
if r: | |
location = r.groups()[0].strip() | |
if VERBOSE: | |
print "server:", addr, "supports", service, "at", location | |
S[addr] = service, location | |
if VVERBOSE: print "+" | |
for userver, (service, location) in S.items(): | |
up = urlparse(location) | |
netloc = up.netloc | |
if ":" in netloc: | |
server, _, port = netloc.partition(":") | |
else: | |
server, port = netloc, "80" | |
data = urlopen(location).read() | |
URLBase = rURLBASE(data) or "http://%s:%s" % (server, port) | |
controlURL = None | |
for x in rSERVICE(data): | |
if service in x: | |
controlURL = rCONTROLURL(x) | |
break | |
if controlURL: | |
addr = GetExternalIP(service, URLBase + controlURL) | |
if addr: | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.connect((server, int(port))) | |
thishost = s.getsockname()[0] | |
s.close() | |
UPNPS[server] = addr, service, URLBase + controlURL, thishost | |
if VERBOSE: | |
print "for server:", server, "controlURL:", controlURL | |
else: | |
print "No controlURL found for server:", server | |
# set defaults | |
if len(UPNPS) == 1: | |
k = UPNPS.items()[0] | |
DEFAULT_ADDR, DEFAULTGW, DEFAULTIFACE = k[1][0], k[0], k[1][3] | |
else: | |
print "Multiple UPnP gateways!" | |
return UPNPS | |
# generic request POST data | |
def envelope(request, service, **kw): | |
return """<?xml version="1.0"?> | |
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"> | |
<s:Body> | |
<u:%s xmlns:u="urn:schemas-upnp-org:service:%s"> | |
""" % (request, service) + "\n".join(["<%s>%s</%s>" % (k, v, k) for k, v in kw.items()]) + """ </u:%s> | |
</s:Body> | |
</s:Envelope>""" % request | |
def Request(service, URL, request, **kw): | |
req = urllib2.Request(URL) | |
req.add_header("content-type", 'text/xml; charset="utf-8"') | |
req.add_header("SOAPACTION", '"urn:schemas-upnp-org:service:%s#%s"' % (service, request)) | |
req.add_data(envelope(request, service, **kw)) | |
try: | |
return urllib2.build_opener().open(req).read() | |
except: | |
return | |
def GetExternalIP(service, URL): | |
answer = Request(service, URL, "GetExternalIPAddress") | |
addr = answer and rEXTERNALIPADDRESS(answer) | |
if not addr: | |
print "Couldn't get external IP address!" | |
return addr | |
# # The 3 basic actions of UPnP : list entries, add a mapping, delete a mapping | |
## Notes (tested on Thomson TG585v7): | |
## - Some times AddMapping returns a fail code (500) but the | |
## mapping *is* done and that can be seen by listing the entries (?!) | |
## So, the only way to be sure is to: list entries, add mapping, list entries | |
## and see the difference. | |
## - Returned LeaseDuration seems to be in deci-seconds | |
def getEntries(service, URL): | |
pmi = 0 | |
while 1: | |
answer = Request(service, URL, "GetGenericPortMappingEntry", NewPortMappingIndex=pmi) | |
if not answer: | |
break | |
yield answer | |
pmi += 1 | |
def listMappings(gw=None): | |
_, service, URL, iface = UPNPS[gw or DEFAULTGW] | |
L = [] | |
for a in getEntries(service, URL): | |
if rPROTOCOL(a) == "TCP" and rINTERNALCLIENT(a) == iface: | |
L.append((int(rEXTERNALPORT(a)), int(rINTERNALPORT(a)), | |
int(rLEASEDURATION(a)) / 10.0)) | |
else: | |
print "strange entry response!", a | |
return L | |
def addMapping(local_port, public_port, ttl, gw=None): | |
_, service, URL, iface = UPNPS[gw or DEFAULTGW] | |
# test if port already mapped. Result of AddMapping is unreliable | |
for eport, iport, _ in listMappings(gw): | |
if eport == public_port and iport != local_port: | |
return | |
answer = Request(service, URL, "AddPortMapping", | |
NewEnabled="1", NewRemoteHost="", NewLeaseDuration=ttl, NewInternalPort=local_port, | |
NewExternalPort=public_port, NewProtocol="TCP", NewInternalClient=iface, | |
NewPortMappingDescription="IndependNet") | |
if answer: | |
return True | |
# test if mapped. Result of AddMapping is unreliable | |
for eport, iport, _ in listMappings(gw): | |
if eport == public_port and iport == local_port: | |
return True | |
def delMapping(public_port, gw=None): | |
_, service, URL, _ = UPNPS[gw or DEFAULTGW] | |
if public_port != "all": | |
Request(service, URL, "DeletePortMapping", | |
NewRemoteHost="", NewExternalPort=public_port, NewProtocol="TCP") | |
else: | |
for public_port, _, _ in listMappings(gw): | |
Request(service, URL, "DeletePortMapping", | |
NewRemoteHost="", NewExternalPort=public_port, NewProtocol="TCP") | |
## | |
## Socket compatible interface for accepting connections on an external port. | |
## Does mapping keepalive every 60sec to make sure the mapping is not kept | |
## indefinately if our application crashes and didn't manage to remove it. | |
## | |
LEASE_DURATION = 60 | |
def Accept(port): | |
if not port: | |
port = random.randint(2000, 60000) | |
if UPNPS is None: | |
DiscoverUPnP() | |
if not UPNPS: | |
raise Error("No UPnP gateway found. Can't listen ouside the modem") | |
s = socket.socket() | |
s.bind((DEFAULTIFACE, 0)) | |
inport = s.getsockname()[1] | |
if not addMapping(inport, port, LEASE_DURATION): | |
raise Error("Port Mapping to external port %i Failed" % port) | |
s.listen(2) | |
return Acceptor(s, port, inport) | |
class UPnPError: | |
pass | |
class Acceptor: | |
def __init__(self, sock, eport, iport): | |
self.sock, self.eport, self.iport = sock, eport, iport | |
self.port = eport | |
self.active = True | |
thread.start_new_thread(self.keepalive, ()) | |
def __iter__(self): | |
while self.active: | |
yield self.sock.accept() | |
def keepalive(self): | |
while 1: | |
ttl = None | |
for eport, iport, ttl in listMappings(): | |
if eport == self.eport and iport == self.iport: | |
break | |
##print "Lease up for:", ttl | |
st = 0.1 | |
if ttl is not None: | |
st = max(ttl - 0.1, 0.1) | |
sleep(st) | |
if not self.active: break | |
if not addMapping(self.iport, self.eport, LEASE_DURATION): | |
if ttl is None: | |
self.active = False | |
print "Failed to Keepalive the lease" | |
def __del__(self): | |
self.active = False | |
self.sock.close() | |
delMapping(self.eport) | |
## main. UPnP manager & testing | |
USAGE = """UPnP NAT Traversal (port mapping) test | |
Usage: python upnp.py [-gw gw] {list|bind|del} <arguments> | |
upnp list | |
list mappings | |
upnp bind internal-port external-port time-to-live | |
map public port to local port for some time | |
upnp del external-port|"all" | |
remove a port mapping | |
upnp | |
discover gateways and external IP addresses | |
Common options: | |
-gw : select UPnP gateway (if more than one -- NOT IMPLEMENTED) | |
""" | |
if __name__ == "__main__": | |
import sys | |
args = sys.argv[1:] | |
if "--help" in args: | |
print USAGE | |
exit() | |
VERBOSE = True | |
print "Discovering UPnP gateways..." | |
DiscoverUPnP() | |
for gw, v in UPNPS.items(): | |
ip, service, URL, iface = v | |
print "External IP:", ip | |
print "\tgateway:", gw | |
print "\tservice:", service | |
print "\tcontrol URL:", URL | |
print "\tinterface:", iface | |
if not UPNPS: | |
exit("No UPnP gateway found") | |
if not args: | |
exit() | |
cmd = args.pop(0) | |
gw = None | |
if cmd == "list": | |
print "Port Mappings:" | |
for ep, ip, ttl in listMappings(gw): | |
print "\t%i <- %i (ttl=%i)" % (ip, ep, ttl) | |
elif cmd == "bind": | |
iport, eport, ttl = args | |
iport, eport, ttl = int(iport), int(eport), int(ttl) | |
if addMapping(iport, eport, ttl, gw): | |
print "OK" | |
else: | |
print "Failed. Port already used, or implementation error" | |
elif cmd == "del": | |
eport, = args | |
delMapping(eport, gw) | |
else: | |
print USAGE |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment