Last active
July 9, 2018 04:28
-
-
Save scientificRat/21e953ad13f05e98a27a2a8d98553a44 to your computer and use it in GitHub Desktop.
Nat traversal (text communication)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket as sc | |
import argparse | |
import os, sys | |
class NatTraversalTool(object): | |
def __init__(self, server_address, local_name, remote_name): | |
self.server_address = server_address | |
self.local_name = local_name | |
self.remote_name = remote_name | |
self.socket = sc.socket(type=sc.SOCK_DGRAM) | |
self.socket.bind(('', 8903)) | |
self.remote_address = None | |
def traversal(self): | |
# create udp socket | |
send_str = "CONN:%s,%s" % (self.local_name, self.remote_name) | |
self.socket.sendto(send_str.encode(), self.server_address) | |
while True: | |
buffer, address = self.socket.recvfrom(1024) | |
if address == self.server_address: | |
data = buffer.decode() | |
command, info = data.split(':') | |
if command == 'DIG': | |
remote_addr = info.split(',') | |
if len(remote_addr) != 2: | |
raise ValueError('receive wrong command') | |
remote_addr[1] = int(remote_addr[1]) | |
self.remote_address = tuple(remote_addr) | |
print("start dig", self.remote_address) | |
self._start_dig() | |
break | |
else: | |
print("unknown command received") | |
break | |
else: | |
continue | |
self.socket.connect(self.remote_address) | |
return self.socket, self.remote_address | |
def _start_dig(self): | |
pid = os.fork() | |
if pid == 0: | |
# child process: listening recall | |
print("fork") | |
while True: | |
buffer, address = self.socket.recvfrom(1024) | |
print(address) | |
if address == self.remote_address: | |
command, info = buffer.decode().split(':') | |
if command == 'CONN': | |
print("CONN received!") | |
remote_name, connecting_name = info.split(',') | |
if remote_name == self.remote_name and connecting_name == self.local_name: | |
send_str = "CONN_ACK:%s" % self.local_name | |
self.socket.sendto(send_str.encode(), self.remote_address) | |
elif command == 'CONN_ACK': | |
if info == self.remote_name: | |
send_str = "CONN_ACK:%s" % self.local_name | |
self.socket.sendto(send_str.encode(), self.remote_address) | |
exit(0) | |
else: | |
for _ in range(2): | |
send_str = "CONN:%s,%s" % (self.local_name, self.remote_name) | |
# self.socket.sendto(send_str.encode(), '') | |
self.socket.sendto(send_str.encode(), self.remote_address) | |
print("CONN SEND") | |
os.waitpid(0, 0) | |
send_str = "SUCCESS:%s" % self.local_name | |
self.socket.sendto(send_str.encode(), self.server_address) | |
def start_communication(socket: sc.socket): | |
pid = os.fork() | |
if pid == 0: | |
# child process | |
while True: | |
data = socket.recv(1024) | |
if data is None: | |
break | |
print(data.decode(), file=sys.stderr) | |
else: | |
while True: | |
data = input() | |
if data == "q": | |
break | |
socket.send(data.encode()) | |
socket.close() | |
def main(): | |
parser = argparse.ArgumentParser(description='NAT Traversal') | |
parser.add_argument('--server', '-s', metavar='IP:PORT', default="115.28.68.253:8088", type=str) | |
parser.add_argument('local_name', metavar='<local_name>', type=str) | |
parser.add_argument('remote_name', metavar='<remote_name>', type=str) | |
args = parser.parse_args() | |
server_address = args.server.split(':') | |
if len(server_address) != 2: | |
raise ValueError('address format should be IP:PORT') | |
server_address[1] = int(server_address[1]) | |
server_address = tuple(server_address) | |
traversalTool = NatTraversalTool(server_address, args.local_name, args.remote_name) | |
socket, remote_address = traversalTool.traversal() | |
# start communication | |
print("traversal success, connection established") | |
start_communication(socket) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket as sc | |
def main(): | |
waiting_clients = {} | |
server = sc.socket(type=sc.SOCK_DGRAM) | |
server.bind(("0.0.0.0", 8088)) | |
while True: | |
buffer, address = server.recvfrom(1024) | |
data = buffer.decode() | |
command, info = data.split(':') | |
if command == 'CONN': | |
client_name, connecting_name = info.split(',') | |
connecting_client_info = waiting_clients.get(connecting_name) | |
if connecting_client_info is None: | |
waiting_clients[client_name] = connecting_name, address | |
else: | |
if connecting_client_info[0] != client_name: | |
break | |
send_str = "DIG:%s,%d" % connecting_client_info[1] | |
server.sendto(send_str.encode(), address) | |
send_str = "DIG:%s,%d" % address | |
server.sendto(send_str.encode(), connecting_client_info[1]) | |
elif command == 'SUCCESS': | |
success_client_info = waiting_clients.get(info) | |
if success_client_info is None: | |
break | |
if success_client_info[1] != address: | |
break | |
waiting_clients.pop(info) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment