Skip to content

Instantly share code, notes, and snippets.

@byyam
Created October 24, 2019 14:58
Show Gist options
  • Save byyam/7f58e9775defc64b9f1048a7ae36654d to your computer and use it in GitHub Desktop.
Save byyam/7f58e9775defc64b9f1048a7ae36654d to your computer and use it in GitHub Desktop.
STUN demo
[server]
ip = 10.0.2.15
port = 11000
[client]
seq = 123456
#!/usr/bin/python
import ConfigParser
class read_config(object):
def __init__(self, config_file="config.ini"):
self.config = ConfigParser.ConfigParser()
self.config.read(config_file)
self.server_ip = self.config.get('server', 'ip')
self.server_port = int(self.config.get('server', 'port'))
self.client_seq = int(self.config.get('client', 'seq'))
def __del__(self):
pass
def main():
obj = read_config("config.ini")
if __name__=="__main__":
main()
#!/usr/bin/python
import socket
import json
import read_config
class stun_client(read_config.read_config):
def __init__(self):
read_config.read_config.__init__(self)
domain = socket.getfqdn()
host = socket.gethostbyname(domain)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((host, 0))
def __del__(self):
self.sock.close()
def send_info(self):
ip, port = self.sock.getsockname()
print "addr: %s:%d" % (ip, port)
data = {'desc':'sendaddr'}
data['in_ip'] = ip
data['in_port'] = port
data['seq'] = self.client_seq
json_str = json.dumps(data)
self.sock.sendto(json_str, (self.server_ip, self.server_port))
print 'sendto: %s:%d' % (self.server_ip, self.server_port)
print data
def handle_notify(self, peer_info):
data = {'desc':'ping'}
data['seq'] = self.client_seq
json_str = json.dumps(data)
self.sock.sendto(json_str, (peer_info['out_ip'], peer_info['out_port']))
def handle_ping(self, ping):
if ping['seq'] == self.client_seq:
print 'recv peer ping!!!'
def handle_pkg(self):
while True:
json_str, (from_ip, from_port) = self.sock.recvfrom(1024)
data = json.loads(json_str)
print 'recvfrom: %s' % (data)
if data['desc'] == 'sendnotify':
self.handle_notify(data)
if data['desc'] == 'ping':
self.handle_ping(data)
def main():
obj = stun_client()
obj.send_info()
obj.handle_pkg()
if __name__ == "__main__":
main()
#!/usr/bin/python
import json
import socket
import read_config
class stun_server(read_config.read_config):
def __init__(self):
read_config.read_config.__init__(self)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((self.server_ip, self.server_port))
print 'bind: %s:%d' % (self.server_ip, self.server_port)
self.peerinfo = {'0':'0'}
def __del__(self):
self.sock.close()
def handle_sendaddr(self, data):
if data['seq'] in self.peerinfo.keys():
peer_info = self.peerinfo[data['seq']]
self.send_notify(data, peer_info)
self.send_notify(peer_info, data)
print 'send notify'
else:
self.peerinfo[data['seq']] = data
print 'store peerinfo'
def send_notify(self, self_info, peer_info):
data = {'desc':'sendnotify'}
data['out_ip'] = peer_info['out_ip']
data['out_port'] = peer_info['out_port']
json_str = json.dumps(data)
self.sock.sendto(json_str, (self_info['out_ip'], self_info['out_port']))
def handle_pkg(self):
while True:
json_str, (from_ip, from_port) = self.sock.recvfrom(1024)
data = json.loads(json_str)
data['out_ip'] = from_ip
data['out_port'] = from_port
print 'recvfrom: %s' % (data)
if data['desc'] == 'sendaddr':
self.handle_sendaddr(data)
def main():
obj = stun_server()
obj.handle_pkg()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment