Created
April 6, 2018 15:42
-
-
Save niccokunzmann/c9c47b4d6b6f3762ca87aeacf17ba8f6 to your computer and use it in GitHub Desktop.
Send olsr name service announcements with Python.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This module allows sending OLSR v1 messages to announce host names. | |
You can find more information here: | |
https://github.com/servalproject/olsr/blob/master/lib/nameservice/README_NAMESERVICE | |
""" | |
import struct | |
import socket | |
OLSR_PORT = 698 | |
def get_olsr_name_announcement_message(originator_ip, hostname, host_ip, sequence_number): | |
"""Return a message which announces a given name on the network. | |
- originator_ip the IP from which this message is supposed to be sent. | |
Since this service might be running in a docker container, this should be the client IP | |
in the community network where this service is reachable. | |
- host_name is the name of the host we want to announce. | |
- host_ip is the IP of the host we want to announce. | |
This should be the same as the originator_ip if the service wants to announce its hostname. | |
- sequence_number is the number of packets sent before. | |
""" | |
# assumptions | |
# ----------- | |
# 1. Each packet only transports one message. | |
# Therefore, the message sequence number and the packet sequence number are identical. | |
packet_sequence_number = message_sequence_number = sequence_number | |
padding = ((-len(hostname)) % 4) | |
# Wireshark field identifier | |
message = struct.pack(">H", 40 + len(hostname) + padding) # Packet Length | |
message += struct.pack(">H", packet_sequence_number & 65535) # Packet Sequence Number | |
message += b"\x82" # Message Type: Nameservice | |
message += b"\xce" # Validity Time: 1792,000 seconds | |
message += struct.pack(">H", 36 + len(hostname) + padding) # Message : length of the one contained message | |
message += socket.inet_aton(originator_ip) # Originator Address | |
message += b"\xff" # TTL : 255 | |
message += b"\x00" # Hop Count : 0 | |
message += struct.pack(">H", message_sequence_number & 65535) # : Message Sequence Number | |
message += b"\x00\x01" # Version : 1 | |
message += b"\x00\x01" # Count : 1 | |
# Nameservice Host embedded message | |
message += b"\x00\x00" # Message Type : Host (0) | |
message += struct.pack(">H", len(hostname)) # Length | |
message += socket.inet_aton(host_ip) # Address | |
message += b"\x00" * 12 | |
message += hostname.encode() # Content | |
message += b"\x00" * padding | |
return message | |
class OLSR_NameService: | |
"""This service announces the given names for the ip addresses.""" | |
def __init__(self, originator_ip, announce_to_ips): | |
"""Create a new OLSR v1 name service announcer with ips to announce it to.""" | |
self.originator_ip = originator_ip | |
self.announce_to_ips = announce_to_ips | |
self.packets_sent = 0 | |
# from https://stackoverflow.com/questions/12607516/python-udp-broadcast-not-sending#12607646 | |
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
self.announcement_to_message_id = set() | |
def add_entry(self, ip, hostname): | |
"""Add the ip and its hostname to the announcements.""" | |
self.announcement_to_message_id.add((ip, hostname)) | |
def remove_entry(self, ip, hostname): | |
"""Remove a specific announcement.""" | |
self.announcement_to_message_id.pop((ip, hostname), None) | |
def announce(self): | |
"""Announce the entries.""" | |
for ip, hostname in self.announcement_to_message_id: | |
packet = get_olsr_name_announcement_message(self.originator_ip, hostname, ip, self.packets_sent) | |
self.packets_sent += 1 | |
for destination in self.announce_to_ips: | |
self.socket.sendto(packet, ((destination, OLSR_PORT) if isinstance(destination, str) else destination)) | |
if __name__ == "__main__": | |
service = OLSR_NameService("10.22.71.211", ["10.22.71.202", "10.22.254.111", "10.22.71.211"]) | |
service.add_entry("10.22.71.211", "test.quelltext.eu") | |
while not input("Press ENTER to send, space and ENTER to quit: "): | |
service.announce() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment