Created
February 2, 2024 06:09
-
-
Save VinDuv/008cffa620bf09db68683eae17a5eaff to your computer and use it in GitHub Desktop.
ACTS time sync emulator for Atomic Clock 2.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Simulates a modem that will connect to an ACTS time sync server. | |
""" | |
# https://www.nist.gov/pml/time-and-frequency-division/time-distribution/ | |
# automated-computer-time-service-acts | |
# https://pdfs.semanticscholar.org/65b9/d62c0c0002781d22e00ff6456c8b1baa4420.pdf | |
from datetime import datetime, timedelta | |
import argparse | |
import re | |
import socket | |
import selectors | |
import time | |
import typing | |
DEFAULT_DATE = datetime(1988, 3, 2, 21, 39, 15) | |
LABEL = 'UTC(NIST)' | |
MJD_EPOCH = datetime(1858, 11, 17) | |
def run(): | |
""" | |
Program entry point | |
""" | |
parser = argparse.ArgumentParser(description=__doc__) | |
ModemIOBase.register_subparsers(parser) | |
args = parser.parse_args() | |
io_handler = ModemIOBase.create(args) | |
try: | |
emulator = ModemEmulator(io_handler) | |
emulator.run() | |
except KeyboardInterrupt: | |
print("") | |
finally: | |
io_handler.close() | |
class ModemEmulator: | |
""" | |
Emulates a modem connected to a time sync server. | |
""" | |
class Reset(Exception): | |
""" | |
Exception thrown when an AT Z is received. Used to reset the program | |
state to the beginning. | |
""" | |
class ACTS(typing.NamedTuple): | |
""" | |
Data contained in an ACTS time sync line. | |
""" | |
tstamp: datetime # UTC timestamp | |
dst: int # Number of days before/after US DST (roughly) | |
leap: bool # Indicates if a leap second occurs this month | |
dut1: float # UTC to UT1 conversion factor, between -.8 and +.8 | |
adv: int # Current time code advance (to compensate line delay), in ms | |
label: str # Label indicating the timezone and time server name | |
otm: str # Character sent when the timestamp occurs | |
def __init__(self, io_handler, label=LABEL, tstamp=DEFAULT_DATE): | |
self._io = io_handler | |
self._line_buf = b'' | |
self._label = label | |
self._tstamp = tstamp | |
def run(self): | |
""" | |
Sets the I/O handler up and runs the emulation. | |
""" | |
self._io.setup() | |
print("Waiting for initial AT Z...") | |
self._wait_reset() | |
while True: | |
try: | |
self._run() | |
self._wait_reset() | |
except self.Reset: | |
print("Got a reset command, restarting.") | |
self._send_line('OK') | |
def _run(self): | |
""" | |
Runs the emulation. Raises Reset if an AT Z is received. | |
""" | |
self._expect(r'AT &F') | |
self._send_line('OK') | |
self._expect(r'AT V1 E1 Q0 X4 &C1 &D2') | |
self._send_line('OK') | |
self._expect(r'AT &Q0') | |
self._send_line('OK') | |
self._expect(r'AT S95=44') | |
self._send_line('OK') | |
self._expect(r'AT M. B.') | |
self._send_line('OK') | |
number, = self._expect(r'ATDT (.*)') | |
print(f"Dialed phone number: {number}") | |
time.sleep(2) | |
self._send_line('CONNECT1200') | |
time.sleep(.5) | |
self._send_line('?=Help') | |
self._send_line('National Institute of Standards and Technology') | |
self._send_line('Telephone Time Service') | |
time.sleep(1) | |
self._send_line(' D L D') | |
self._send_line('MJD YR MO DA H M S ST S UT1 msADV OTM') | |
cur_tstamp = self._tstamp | |
incr = timedelta(seconds=1) | |
for _ in range(4): | |
acts = self.ACTS(cur_tstamp, dst=83, leap=False, dut1=.3, adv=45, | |
label=self._label, otm='*') | |
self._send_acts(acts) | |
cur_tstamp += incr | |
time.sleep(1) | |
for _ in range(4): | |
acts = self.ACTS(cur_tstamp, dst=83, leap=False, dut1=.3, adv=37.6, | |
label=self._label, otm='#') | |
self._send_acts(acts) | |
cur_tstamp += incr | |
time.sleep(1) | |
self._expect(r'\+\+\+ATH0') | |
def _expect(self, regex, timeout=None): | |
""" | |
Expects a specific value (as a regex) from the modem. | |
Returns the regular expression groups. | |
If the match fails, ignore the line (unless it’s AT Z, in which case | |
Reset is raised) | |
""" | |
while True: | |
line = self._get_line(timeout=timeout, echo=False) | |
match = re.match(regex, line) | |
if not match: | |
if line.replace(' ', '') == 'ATZ': | |
print(f"=> {line} (reset!)") | |
raise self.Reset() | |
print(f"=> {line} (unexpected)") | |
continue | |
print(f"=> {line}") | |
return match.groups() | |
def _send_acts(self, acts): | |
""" | |
Send an ACTS sync line. | |
""" | |
tstamp, dst, leap, dut1, adv, label, otm = acts | |
mjd = (tstamp - MJD_EPOCH).days | |
date_str = tstamp.strftime('%y-%m-%d %H:%M:%S') | |
dut1 = f"{'+' if dut1 >= 0 else '-'}.{round(abs(dut1 * 10)):1d}" | |
self._send_line(f'{mjd:05d} {date_str} {dst:02d} {leap:d} {dut1} ' | |
f'{adv:05.1f} {label} {otm}') | |
def _send_line(self, line): | |
""" | |
Sends a line, followed by \r\n. | |
""" | |
print(f"<= {line}") | |
self._io.write(line.encode('ascii') + b'\r\n') | |
def _get_line(self, timeout=None, echo=True): | |
""" | |
Receives a full lines from the I/O, with a timeout. | |
""" | |
while True: | |
crlf_pos = self._line_buf.find(b'\r') | |
if crlf_pos >= 0: | |
line = self._line_buf[0:crlf_pos].decode('ascii', 'replace') | |
self._line_buf = self._line_buf[crlf_pos + 2:] | |
if echo: | |
print(f"=> {line}") | |
return line | |
self._line_buf += self._io.read(timeout) | |
def _wait_reset(self): | |
""" | |
Wait for ATZ to be sent to the modem. | |
""" | |
while True: | |
line = self._get_line().replace(' ', '') | |
if line == 'ATZ': | |
self._send_line('OK') | |
break | |
def __repr__(self): | |
return f"ModemEmulator({self._io!r})" | |
class ModemIOBase: | |
""" | |
Base class for modem I/O. | |
""" | |
_classes = {} | |
def __init_subclass__(cls, name, title): | |
""" | |
Called for each subclass of ModemIOBase; registers the classes in a | |
dictionary so they can be registered in the command-line parser. | |
""" | |
super().__init_subclass__() | |
if name is None: | |
# Abstract subclass | |
return | |
assert name not in cls._classes | |
cls._classes[name] = (title, cls) | |
def setup(self): | |
""" | |
Prepares the instance. When this method returns, the read() and write() | |
method can be called. | |
""" | |
# The base implementation does nothing | |
def read(self, timeout=None): | |
""" | |
Receive data from the program writing to the COM port and returns it. | |
Raises TimeoutError if the timeout expires with no data received. | |
""" | |
raise NotImplementedError("Must be implemented in subclasses") | |
def write(self, data): | |
""" | |
Send data to the program reading from the COM port. | |
""" | |
raise NotImplementedError("Must be implemented in subclasses") | |
def close(self): | |
""" | |
Releases the resources used by the I/O. | |
""" | |
# The base implementation does nothing | |
@classmethod | |
def register_subparsers(cls, parser): | |
""" | |
Registers the I/O subclasses as sub-parsers. This allow each subclass | |
to have its own command-line options. | |
""" | |
subparsers = parser.add_subparsers( | |
metavar='port_type', required=True) | |
for name, (title, subclass) in cls._classes.items(): | |
subparser = subparsers.add_parser(name, help=title) | |
subclass.register_options(subparser) | |
subparser.set_defaults(cls=subclass) | |
@staticmethod | |
def create(args): | |
""" | |
Create an instance of the class from the parsed command-line arguments. | |
""" | |
return args.cls.create_from_args(args) | |
@classmethod | |
def create_from_args(cls, args): | |
""" | |
Create an instance of the sub-class from the parsed command-line | |
arguments. | |
""" | |
raise NotImplementedError("Must be implemented in subclasses") | |
@classmethod | |
def register_options(cls, subparser): | |
""" | |
Registers the command-line options used by the I/O subclass. | |
""" | |
# The base implementation does not register any options | |
class ModemIOTCPBase(ModemIOBase, name=None, title=None): | |
""" | |
Base class for TCP modem I/O. | |
""" | |
def __init__(self, host, port): | |
self._read_buf = b'' | |
self._host = host | |
self._port = port | |
self._socket = None | |
self._selector = None | |
def setup(self): | |
self.close() | |
conn = self._get_connection() | |
conn.setblocking(False) | |
selector = selectors.DefaultSelector() | |
selector.register(conn, selectors.EVENT_READ) | |
self._socket = conn | |
self._selector = selector | |
def read(self, timeout=None): | |
if not self._selector.select(timeout): | |
raise TimeoutError(f"No data received after {timeout} seconds") | |
data = self._socket.recv(4096) | |
if not data: | |
print("Connection closed by remote") | |
self.setup() | |
return data | |
def write(self, data): | |
self._socket.sendall(data) | |
def close(self): | |
if self._socket is not None: | |
self._socket.close() | |
self._selector.close() | |
self._socket = None | |
self._selector = None | |
@classmethod | |
def create_from_args(cls, args): | |
host, port = args.address | |
return cls(host, port) | |
@classmethod | |
def address(cls, value): | |
""" | |
Parses an address string into a (host, port) tuple. | |
""" | |
host, _, raw_port = value.rpartition(':') | |
if not host: | |
host = '127.0.0.1' | |
try: | |
port = int(raw_port) | |
except ValueError: | |
port = -1 | |
if port <= 0 or port >= 65536: | |
raise argparse.ArgumentTypeError(f"Invalid port {raw_port}") | |
return (host, port) | |
def _get_connection(self): | |
""" | |
Connect to the remote end, and return the connected socket. | |
""" | |
raise NotImplementedError("Must be implemented in subclasses") | |
class ModemIOTCPClient(ModemIOTCPBase, name='tcp', title="TCP Client"): | |
""" | |
Modem I/O that connects to a TCP server. Useful with qemu’s -serial tcp | |
option. | |
""" | |
def _get_connection(self): | |
print(f"Connecting to {self._host}:{self._port}... ", end="", | |
flush=True) | |
while True: | |
try: | |
conn = socket.create_connection((self._host, self._port)) | |
print("Connected.") | |
return conn | |
except ConnectionRefusedError: | |
time.sleep(2) | |
@classmethod | |
def register_options(cls, subparser): | |
subparser.add_argument('address', type=cls.address, | |
help="Connect address (<host>:<port> or <port> for localhost)") | |
class ModemIOTCPServer(ModemIOTCPBase, name='tcps', title="TCP Server"): | |
""" | |
Modem I/O that waits for connections on a TCP socket. Useful with qemu’s | |
-serial tcp option. | |
""" | |
def _get_connection(self): | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener: | |
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) | |
listener.bind((self._host, self._port)) | |
listener.listen(1) | |
print(f"Waiting for connection on {self._host}:{self._port}... ", | |
end="", flush=True) | |
conn, addr = listener.accept() | |
print(f"Got connection from {addr[0]}:{addr[1]}.") | |
return conn | |
@classmethod | |
def register_options(cls, subparser): | |
subparser.add_argument('address', type=cls.address, | |
help="Listen address (<host>:<port> or <port>)") | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment