Skip to content

Instantly share code, notes, and snippets.

@kobus-v-schoor
Created January 1, 2021 15:49
Show Gist options
  • Save kobus-v-schoor/86d7b5ab6f52fafa1bd56846c2829ead to your computer and use it in GitHub Desktop.
Save kobus-v-schoor/86d7b5ab6f52fafa1bd56846c2829ead to your computer and use it in GitHub Desktop.
import argparse
import socket
import socketserver
import json
import logging
from ipaddress import ip_address, IPv4Network
# parse command-line arguments
def parse_args():
parser = argparse.ArgumentParser()
def port_type(a):
a = int(a)
if not 1 <= a <= 65535:
raise argparse.ArgumentTypeError('network port must be between'
' 1 and 65535')
return a
def ip_type(a):
return str(ip_address(a))
parser.add_argument('--port', type=port_type, default=9405)
parser.add_argument('--listen', type=ip_type, default='0.0.0.0')
parser.add_argument('--broadcast', type=ip_type, default='255.255.255.255')
parser.add_argument('--hostname', default=socket.gethostname())
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--hosts-file', default='/etc/hosts')
return parser.parse_args()
# determines this host's default ip address
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't have to be reachable
s.connect(('10.255.255.255', 1))
ip = s.getsockname()[0]
except:
ip = '127.0.0.1'
finally:
s.close()
return ip
# constructs the message to send to other hosts
def get_msg(reply=False):
return json.dumps({
'hostname': args.hostname,
'ip': get_ip(),
'reply': reply,
})
def send_message(ip, port, message, broadcast=False):
# open a UDP socket
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
logging.debug(f'sending "{message}" to {ip}:{port}')
# enable broadcast messages if needed
if broadcast:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# send the packet on its way
sock.sendto(message.encode(), (ip, port))
def add_host(hostname, ip):
global args
# check that we're not adding this host's hostname
if hostname == args.hostname:
logging.debug(f'not adding own hostname')
return False
# check that hostname is a valid unix hostname
if not all(h.isalnum() or h == '-' for h in hostname):
logging.warning(f'not adding invalid hostname "{hostname}"')
return False
# check that ip address is valid
try:
ip_address(ip)
except ValueError:
logging.warning(f'not adding invalid ip address {ip}')
return False
logging.info(f'adding {hostname} = {ip} to hosts file')
# read the old hosts file
with open(args.hosts_file, 'r') as f:
old = f.read().strip()
added = False
newlines = []
# iterate through the old hosts file
for line in old.split('\n'):
# check if the line starts with the current ip, if it does, replace it
if line.startswith(ip):
newlines.append(f'{ip} {hostname}')
added = True
continue
# check if the line ends with the hostname, if it does, ignore it (it
# is a stale entry)
if line.endswith(hostname):
continue
# just a line we're not interested in, just add it again
newlines.append(line)
# check if we added the ip while iterating through the file, if not, add it
# now
if not added:
newlines.append(f'{ip} {hostname}')
# write the new hosts file
with open(args.hosts_file, 'w') as f:
f.write('\n'.join(newlines) + '\n')
return True
class ServerHandler(socketserver.BaseRequestHandler):
def handle(self):
data, socket = self.request
data = data.decode()
logging.debug(f'received "{data}" from {self.client_address}')
try:
data = json.loads(data)
hostname, ip = data['hostname'], data['ip']
except (json.JSONDecodeError, KeyError):
logging.debug(f'received message was corrupt')
return
# if we added a new host, send them our ip as well
if add_host(hostname, ip) and data.get('reply', False):
ip, port = self.client_address[0], args.port
logging.debug(f'replying to {ip}')
send_message(ip, port, get_msg(reply=False))
if __name__ == '__main__':
# parse cmd args
args = parse_args()
# set logging level
logging.basicConfig(format='%(levelname)s: %(message)s',
level=max(10, 30-(args.verbose * 10)))
logging.debug(f'called with arguments {vars(args)}')
host = args.listen
port = args.port
# start server
with socketserver.UDPServer((host, port), ServerHandler) as server:
# broadcast that we exist along with our hostname and ip
send_message(args.broadcast, port, get_msg(reply=True), broadcast=True)
# start handling requests on the server
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment