Created
January 1, 2021 15:49
-
-
Save kobus-v-schoor/86d7b5ab6f52fafa1bd56846c2829ead to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import socket | |
import socketserver | |
import json | |
import logging | |
from ipaddress import ip_address, IPv4Network | |
# parse command-line arguments | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
def port_type(a): | |
a = int(a) | |
if not 1 <= a <= 65535: | |
raise argparse.ArgumentTypeError('network port must be between' | |
' 1 and 65535') | |
return a | |
def ip_type(a): | |
return str(ip_address(a)) | |
parser.add_argument('--port', type=port_type, default=9405) | |
parser.add_argument('--listen', type=ip_type, default='0.0.0.0') | |
parser.add_argument('--broadcast', type=ip_type, default='255.255.255.255') | |
parser.add_argument('--hostname', default=socket.gethostname()) | |
parser.add_argument('--verbose', '-v', action='count', default=0) | |
parser.add_argument('--hosts-file', default='/etc/hosts') | |
return parser.parse_args() | |
# determines this host's default ip address | |
def get_ip(): | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
try: | |
# doesn't have to be reachable | |
s.connect(('10.255.255.255', 1)) | |
ip = s.getsockname()[0] | |
except: | |
ip = '127.0.0.1' | |
finally: | |
s.close() | |
return ip | |
# constructs the message to send to other hosts | |
def get_msg(reply=False): | |
return json.dumps({ | |
'hostname': args.hostname, | |
'ip': get_ip(), | |
'reply': reply, | |
}) | |
def send_message(ip, port, message, broadcast=False): | |
# open a UDP socket | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | |
logging.debug(f'sending "{message}" to {ip}:{port}') | |
# enable broadcast messages if needed | |
if broadcast: | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
# send the packet on its way | |
sock.sendto(message.encode(), (ip, port)) | |
def add_host(hostname, ip): | |
global args | |
# check that we're not adding this host's hostname | |
if hostname == args.hostname: | |
logging.debug(f'not adding own hostname') | |
return False | |
# check that hostname is a valid unix hostname | |
if not all(h.isalnum() or h == '-' for h in hostname): | |
logging.warning(f'not adding invalid hostname "{hostname}"') | |
return False | |
# check that ip address is valid | |
try: | |
ip_address(ip) | |
except ValueError: | |
logging.warning(f'not adding invalid ip address {ip}') | |
return False | |
logging.info(f'adding {hostname} = {ip} to hosts file') | |
# read the old hosts file | |
with open(args.hosts_file, 'r') as f: | |
old = f.read().strip() | |
added = False | |
newlines = [] | |
# iterate through the old hosts file | |
for line in old.split('\n'): | |
# check if the line starts with the current ip, if it does, replace it | |
if line.startswith(ip): | |
newlines.append(f'{ip} {hostname}') | |
added = True | |
continue | |
# check if the line ends with the hostname, if it does, ignore it (it | |
# is a stale entry) | |
if line.endswith(hostname): | |
continue | |
# just a line we're not interested in, just add it again | |
newlines.append(line) | |
# check if we added the ip while iterating through the file, if not, add it | |
# now | |
if not added: | |
newlines.append(f'{ip} {hostname}') | |
# write the new hosts file | |
with open(args.hosts_file, 'w') as f: | |
f.write('\n'.join(newlines) + '\n') | |
return True | |
class ServerHandler(socketserver.BaseRequestHandler): | |
def handle(self): | |
data, socket = self.request | |
data = data.decode() | |
logging.debug(f'received "{data}" from {self.client_address}') | |
try: | |
data = json.loads(data) | |
hostname, ip = data['hostname'], data['ip'] | |
except (json.JSONDecodeError, KeyError): | |
logging.debug(f'received message was corrupt') | |
return | |
# if we added a new host, send them our ip as well | |
if add_host(hostname, ip) and data.get('reply', False): | |
ip, port = self.client_address[0], args.port | |
logging.debug(f'replying to {ip}') | |
send_message(ip, port, get_msg(reply=False)) | |
if __name__ == '__main__': | |
# parse cmd args | |
args = parse_args() | |
# set logging level | |
logging.basicConfig(format='%(levelname)s: %(message)s', | |
level=max(10, 30-(args.verbose * 10))) | |
logging.debug(f'called with arguments {vars(args)}') | |
host = args.listen | |
port = args.port | |
# start server | |
with socketserver.UDPServer((host, port), ServerHandler) as server: | |
# broadcast that we exist along with our hostname and ip | |
send_message(args.broadcast, port, get_msg(reply=True), broadcast=True) | |
# start handling requests on the server | |
server.serve_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment