Created
November 2, 2023 01:56
-
-
Save flaviut/c038ffb0aaff73defc9c93aa9de47cad to your computer and use it in GitHub Desktop.
A simple, experimental NTP client written in Python.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import struct | |
import time | |
from dataclasses import dataclass | |
from enum import IntEnum | |
from typing import Optional | |
NTP_SERVER = "pool.ntp.org" | |
NTP_PORT = 123 | |
TIME1970 = 2208988800 # 1970-01-01 in NTP epoch | |
class LeapIndicator(IntEnum): | |
NO_WARNING = 0 | |
LAST_MINUTE_61 = 1 | |
LAST_MINUTE_59 = 2 | |
ALARM_CONDITION = 3 | |
class Mode(IntEnum): | |
RESERVED = 0 | |
SYMMETRIC_ACTIVE = 1 | |
SYMMETRIC_PASSIVE = 2 | |
CLIENT = 3 | |
SERVER = 4 | |
BROADCAST = 5 | |
RESERVED_NTP_CONTROL = 6 | |
RESERVED_PRIVATE = 7 | |
def unpack_pop(fmt, data_iter: iter): | |
size = struct.calcsize(fmt) | |
data = b"" | |
for _ in range(size): | |
data += next(data_iter).to_bytes(1, "big") | |
return struct.unpack(fmt, data) | |
@dataclass | |
class NtpPacket: | |
leap: LeapIndicator | |
version: int # This could be an enum as well, depending on your requirements | |
mode: Mode | |
stratum: int = 0 | |
poll: int = 0 | |
precision: int = 0 | |
root_delay: float = 0.0 | |
root_dispersion: float = 0.0 | |
reference_id: int = 0 | |
reference_timestamp: float = 0.0 | |
originate_timestamp: float = 0.0 | |
receive_timestamp: float = 0.0 | |
transmit_timestamp: float = 0.0 | |
kiss_of_death: Optional[str] = None | |
@staticmethod | |
def decode(data: bytes) -> 'NtpPacket': | |
data_iter = iter(data) | |
flags = unpack_pop('!B', data_iter)[0] | |
leap = LeapIndicator((flags >> 6) & 0x03) | |
version = (flags >> 3) & 0x07 | |
mode = Mode(flags & 0x07) | |
stratum = unpack_pop('!B', data_iter)[0] | |
if stratum == 0: | |
# Kiss of death message | |
kiss_of_death = unpack_pop('!4s', data_iter)[0].decode('utf-8') | |
return NtpPacket(leap, version, mode, stratum, kiss_of_death=kiss_of_death) | |
poll, precision = unpack_pop('!BB', data_iter) | |
root_delay, root_dispersion = unpack_pop('!2I', data_iter) | |
reference_id = unpack_pop('!I', data_iter)[0] | |
# Timestamps are in fixed-point format, with the integer part in the first 32 bits | |
# and the fractional part in the last 32 bits | |
fields = {} | |
for ts_name in ['reference_timestamp', 'originate_timestamp', | |
'receive_timestamp', 'transmit_timestamp']: | |
seconds, seconds_frac = unpack_pop('!2I', data_iter) | |
fields[ts_name] = seconds + (seconds_frac / 2.0 ** 32.0) - TIME1970 | |
return NtpPacket(leap, version, mode, stratum, poll, precision, | |
root_delay / 2 ** 16, root_dispersion / 2 ** 16, reference_id, | |
**fields) | |
def encode(self) -> bytes: | |
flags = (self.leap.value << 6) | (self.version << 3) | self.mode.value | |
root_delay = int(self.root_delay * 2 ** 16) | |
root_dispersion = int(self.root_dispersion * 2 ** 16) | |
packed_data = struct.pack('!B3B3I', flags, self.stratum, self.poll, | |
self.precision, root_delay, root_dispersion, | |
self.reference_id) | |
# Encode timestamps | |
for ts in [self.reference_timestamp, self.originate_timestamp, | |
self.receive_timestamp, self.transmit_timestamp]: | |
offset_ts = ts + TIME1970 | |
int_part = int(offset_ts) | |
frac_part = int((offset_ts - int_part) * 2 ** 32) | |
packed_data += struct.pack('!2I', int_part, frac_part) | |
return packed_data | |
def send_ntp_request(client): | |
# Getting the current time as T1 (originate timestamp) | |
originate_timestamp = time.time() | |
request_data = NtpPacket( | |
leap=LeapIndicator.NO_WARNING, | |
version=3, | |
mode=Mode.CLIENT, | |
originate_timestamp=originate_timestamp | |
).encode() | |
client.send(request_data) | |
return client, originate_timestamp | |
def receive_ntp_response(client, t1): | |
data, address = client.recvfrom(1024) | |
t4 = time.time() | |
if len(data) != 48: | |
print(f"Unexpected packet size: {len(data)}") | |
return None, None, None | |
parsed = NtpPacket.decode(data) | |
print(parsed) | |
t2 = parsed.receive_timestamp # T2 | |
t3 = parsed.transmit_timestamp # T3 | |
# Compute the round trip delay and local clock offset | |
round_trip_delay = (t4 - t1) - (t3 - t2) | |
local_clock_offset = ((t2 - t1) + (t3 - t4)) / 2 | |
# Corrected time using local clock offset | |
corrected_time = t4 + local_clock_offset | |
print(f"Server's Time: {time.ctime(corrected_time)}") | |
print(f"Round Trip Delay: {round_trip_delay} seconds") | |
print(f"Local Clock Offset: {local_clock_offset} seconds") | |
def main(): | |
for _ in range(5): | |
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
client.settimeout(5) | |
client.connect((NTP_SERVER, NTP_PORT)) | |
for _ in range(5): | |
client, t1 = send_ntp_request(client) | |
receive_ntp_response(client, t1) | |
client.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment