Last active
May 15, 2025 05:35
-
-
Save SoftPoison/2cd16c41149012964bcbd15f16cc7562 to your computer and use it in GitHub Desktop.
a Python script for interacting with FOX(S) servers. auth NYI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from dataclasses import dataclass | |
| from collections import deque | |
| import socket | |
| import ssl | |
| import math | |
| from datetime import datetime | |
| def write_encoded(s: str|None, bs: bytearray): | |
| if s == None: | |
| bs.extend(b'#null;') | |
| return | |
| for c in s: | |
| oc = ord(c) | |
| if oc < ord(' ') or oc > 127: | |
| bs.extend(b"#{0:02x};".format(oc)) | |
| else: | |
| bs.append(oc) | |
| class SocketBufferedReader: | |
| _sock: socket.socket|ssl.SSLSocket | |
| _buffer: deque[int] | |
| def __init__(self, sock: socket.socket|ssl.SSLSocket): | |
| self._sock = sock | |
| self._buffer = deque() | |
| def _recv_more(self): | |
| bs = self._sock.recv() | |
| assert len(bs) != 0 | |
| self._buffer.extend(bs) | |
| def peek_byte_int(self) -> int: | |
| if len(self._buffer) == 0: | |
| self._recv_more() | |
| return self._buffer[0] | |
| def read_byte_int(self) -> int: | |
| if len(self._buffer) == 0: | |
| self._recv_more() | |
| return self._buffer.popleft() | |
| def read_byte(self) -> bytes: | |
| return bytes([self.read_byte_int()]) | |
| def peek_byte(self) -> bytes: | |
| return bytes([self.peek_byte_int()]) | |
| def read_bytes(self, num: int) -> bytes: | |
| while len(self._buffer) < num: | |
| self._recv_more() | |
| bs = bytearray() | |
| for _ in range(num): | |
| bs.append(self._buffer.popleft()) | |
| return bytes(bs) | |
| def peek_bytes(self, num: int) -> bytes: | |
| while len(self._buffer) < num: | |
| self._recv_more() | |
| bs = bytearray() | |
| for i in range(num): | |
| bs.append(self._buffer[i]) | |
| return bytes(bs) | |
| def read_line(self) -> bytes: | |
| b = 0 | |
| bs = bytearray() | |
| while b != 0x0a: | |
| if len(self._buffer) == 0: | |
| self._recv_more() | |
| b = self._buffer.popleft() | |
| bs.append(b) | |
| return bytes(bs) | |
| def read_int(self) -> int: | |
| bs = bytearray() | |
| while True: | |
| b = self.peek_byte() | |
| if b == b'-': | |
| if len(bs) > 0: | |
| raise ValueError() | |
| self.read_byte_int() | |
| bs.extend(b) | |
| continue | |
| if b < b'0' or b > b'9': | |
| break | |
| self.read_byte_int() | |
| bs.extend(b) | |
| return int(bs) | |
| def read_float(self) -> float: | |
| raise NotImplementedError() | |
| def read_str(self) -> str|None: | |
| if self.peek_bytes(6) == b'#null;': | |
| self.read_bytes(6) | |
| return None | |
| bs = bytearray() | |
| while True: | |
| b = self.peek_byte_int() | |
| if b < ord(' '): | |
| return bs.decode() | |
| if b == ord('#'): | |
| self.read_byte_int() | |
| c = bytes.fromhex(self.read_bytes(2).decode()) | |
| assert self.read_byte_int() == ord(';') | |
| bs.extend(c) | |
| else: | |
| self.read_byte_int() | |
| bs.append(b) | |
| def read_name_str(self) -> str: | |
| bs = bytearray() | |
| while True: | |
| b = self.peek_byte_int() | |
| if b == ord('#'): | |
| self.read_byte_int() | |
| c = bytes.fromhex(self.read_bytes(2).decode()) | |
| assert self.read_byte_int() == ord(';') | |
| bs.extend(c) | |
| elif (b >= ord('a') and b <= ord('z')) or (b >= ord('A') and b <= ord('Z')) or (b >= ord('0') and b <= ord('9')) or b == ord('-') or b == ord('.') or b == ord('_') or b == ord('$'): | |
| self.read_byte_int() | |
| bs.append(b) | |
| else: | |
| return bs.decode() | |
| def read_hex_int(self) -> int: | |
| bs = bytearray() | |
| while True: | |
| b = self.peek_byte_int() | |
| if b >= ord('a') and b <= ord('f'): | |
| self.read_byte_int() | |
| bs.append(b) | |
| else: | |
| return int(bs, 16) | |
| class FoxTuple: | |
| name: str | |
| tp: str | |
| value: object | |
| def __init__(self, name: str, value): | |
| self.name = name | |
| self.value = value | |
| def encode(self) -> bytearray: | |
| raise NotImplementedError() | |
| @staticmethod | |
| def decode(name: str, tp: str, r: SocketBufferedReader) -> 'FoxTuple': | |
| if tp == FoxBlob.tp: | |
| return FoxBlob.decode(name, r) | |
| if tp == FoxFloat.tp: | |
| return FoxFloat.decode(name, r) | |
| if tp == FoxInteger.tp: | |
| return FoxInteger.decode(name, r) | |
| if tp == FoxObject.tp: | |
| return FoxObject.decode(name, r) | |
| if tp == FoxString.tp: | |
| return FoxString.decode(name, r) | |
| if tp == FoxTime.tp: | |
| return FoxTime.decode(name, r) | |
| if tp == FoxBoolean.tp: | |
| return FoxBoolean.decode(name, r) | |
| if tp == FoxMessage.tp: | |
| return FoxMessage.decode(name, r) | |
| raise ValueError(tp) | |
| class FoxBlob(FoxTuple): | |
| value: bytes | |
| tp = 'b' | |
| def encode(self) -> bytearray: | |
| bs = bytearray() | |
| bs.extend(str(len(self.value)).encode()) | |
| bs.extend(b'[') | |
| bs.extend(self.value) | |
| bs.extend(b']') | |
| return bs | |
| @staticmethod | |
| def decode(name: str, r: SocketBufferedReader) -> 'FoxBlob': | |
| size = r.read_int() | |
| assert r.read_byte() == b'[' | |
| value = r.read_bytes(size) | |
| assert r.read_byte() == b']' | |
| return FoxBlob(name, value) | |
| class FoxFloat(FoxTuple): | |
| value: float | |
| tp = 'f' | |
| def encode(self) -> bytearray: | |
| if math.isnan(self.value): | |
| return bytearray("nan".encode()) | |
| if math.isinf(self.value) and self.value > 0: | |
| return bytearray("+inf".encode()) | |
| if math.isinf(self.value) and self.value < 0: | |
| return bytearray("-inf".encode()) | |
| return bytearray(str(self.value).encode()) | |
| def decode(name: str, r: SocketBufferedReader) -> 'FoxFloat': | |
| s = r.read_str() | |
| assert s != None | |
| if s == "nan": | |
| return FoxFloat(name, math.nan) | |
| if s == "+inf": | |
| return FoxFloat(name, math.inf) | |
| if s == "-inf": | |
| return FoxFloat(name, -math.inf) | |
| return FoxFloat(name, float(s)) | |
| class FoxInteger(FoxTuple): | |
| value: int | |
| tp = 'i' | |
| def encode(self) -> bytearray: | |
| return bytearray(str(self.value).encode()) | |
| def decode(name: str, r: SocketBufferedReader) -> 'FoxInteger': | |
| return FoxInteger(name, r.read_int()) | |
| @dataclass | |
| class FoxObjectInner: | |
| encoding: str | |
| data: bytes | |
| class FoxObject(FoxTuple): | |
| value: FoxObjectInner | |
| tp = 'o' | |
| def encode(self) -> bytearray: | |
| bs = bytearray() | |
| write_encoded(self.value.encoding, bs) | |
| bs.extend(b' ') | |
| bs.extend(str(len(self.value.data)).encode()) | |
| bs.extend(b'[') | |
| bs.extend(self.value.data) | |
| bs.extend(b']') | |
| return bs | |
| def decode(name: str, r: SocketBufferedReader) -> 'FoxObject': | |
| encoding = r.read_name_str() | |
| assert r.read_byte == b' ' | |
| size = r.read_int() | |
| assert r.read_byte() == b'[' | |
| value = r.read_bytes(size) | |
| assert r.read_byte() == b']' | |
| return FoxObject(name, FoxObjectInner(encoding, value)) | |
| class FoxString(FoxTuple): | |
| value: str|None | |
| tp = 's' | |
| def encode(self) -> bytearray: | |
| bs = bytearray() | |
| write_encoded(self.value, bs) | |
| return bs | |
| def decode(name: str, r: SocketBufferedReader) -> 'FoxString': | |
| return FoxString(name, r.read_str()) | |
| class FoxTime(FoxTuple): | |
| value: datetime | |
| tp = 't' | |
| def encode(self) -> bytearray: | |
| return bytearray(hex(int(self.value.timestamp() * 1000))[2:].encode()) | |
| def decode(name: str, r: SocketBufferedReader) -> 'FoxTime': | |
| return FoxTime(name, datetime.fromtimestamp(r.read_hex_int() / 1000)) | |
| class FoxBoolean(FoxTuple): | |
| value: bool | |
| tp = 'z' | |
| def encode(self) -> bytearray: | |
| return bytearray(b't' if self.value else b'f') | |
| def decode(name: str, r: SocketBufferedReader) -> 'FoxBoolean': | |
| v = r.read_byte() | |
| if v == b't': | |
| return FoxBoolean(name, True) | |
| if v == b'f': | |
| return FoxBoolean(name, False) | |
| raise ValueError() | |
| class FoxMessage(FoxTuple): | |
| value: list[FoxTuple] | |
| tp = 'm' | |
| @staticmethod | |
| def from_dict(name: str = "", d: dict[str, object] = {}) -> 'FoxMessage': | |
| self = FoxMessage(name, []) | |
| for k, v in d.items(): | |
| if isinstance(v, bytes): | |
| self.value.append(FoxBlob(k, v)) | |
| elif isinstance(v, float): | |
| self.value.append(FoxFloat(k, v)) | |
| elif isinstance(v, int): | |
| self.value.append(FoxInteger(k, v)) | |
| elif isinstance(v, dict): | |
| self.value.append(FoxMessage(k, v)) | |
| elif isinstance(v, FoxObjectInner): | |
| self.value.append(FoxObject(k, v)) | |
| elif isinstance(v, str): | |
| self.value.append(FoxString(k, v)) | |
| elif isinstance(v, datetime): | |
| self.value.append(FoxTime(k, v)) | |
| elif isinstance(v, bool): | |
| self.value.append(FoxBoolean) | |
| else: | |
| raise ValueError(f"Unsupported type for key \"{k}\": {type(v)}") | |
| return self | |
| def to_dict(self) -> dict[str, object]: | |
| out = {} | |
| for v in self.value: | |
| if isinstance(v, FoxObject): | |
| out[v.name] = v.value.value | |
| elif isinstance(v, FoxMessage): | |
| out[v.name] = v.to_dict() | |
| else: | |
| out[v.name] = v.value | |
| return out | |
| def encode(self) -> bytearray: | |
| bs = bytearray() | |
| bs.extend(b'{\n') | |
| for v in self.value: | |
| bs.extend(v.name.encode()) | |
| bs.extend(b'=') | |
| bs.extend(v.tp.encode()) | |
| bs.extend(b':') | |
| bs.extend(v.encode()) | |
| bs.extend(b'\n') | |
| bs.extend(b'}') | |
| return bs | |
| def decode(name: str, r: SocketBufferedReader) -> 'FoxMessage': | |
| value = [] | |
| assert r.read_bytes(2) == b'{\n' | |
| while r.peek_byte() != b'}': | |
| n = r.read_name_str() | |
| r.read_byte_int() # assert == b'=' unnecessary | |
| tp = r.read_byte().decode() | |
| assert r.read_byte() == b':' | |
| v = FoxTuple.decode(n, tp, r) | |
| assert r.read_byte() == b'\n' | |
| value.append(v) | |
| r.read_byte_int() # b'}' | |
| return FoxMessage(name, value) | |
| @dataclass | |
| class FoxFrame: | |
| SYNC = b's' | |
| ASYNC = b'a' | |
| REPLY = b'r' | |
| ERROR = b'e' | |
| NULL = b'n' | |
| KEEPALIVE = b'k' | |
| CLOSE = b'c' | |
| VALID_TYPES = [SYNC, ASYNC, REPLY, ERROR, NULL, KEEPALIVE, CLOSE] | |
| frame_type: bytes | |
| sequence_number: int | |
| reply_number: int | |
| channel: str | |
| command: str | |
| message: FoxMessage | |
| def encode(self) -> bytearray: | |
| b = bytearray() | |
| b.extend(b'fox ') | |
| b.extend(self.frame_type) | |
| b.extend(b' ') | |
| b.extend(str(self.sequence_number).encode()) | |
| b.extend(b' ') | |
| b.extend(str(self.reply_number).encode()) | |
| b.extend(b' ') | |
| b.extend(self.channel.encode()) | |
| b.extend(b' ') | |
| b.extend(self.command.encode()) | |
| b.extend(b'\n') | |
| b.extend(self.message.encode()) | |
| b.extend(b';;\n') | |
| return b | |
| @staticmethod | |
| def decode(r: SocketBufferedReader) -> 'FoxFrame': | |
| assert r.read_bytes(4) == b'fox ' | |
| frame_type = r.read_byte() | |
| assert frame_type in FoxFrame.VALID_TYPES | |
| assert r.read_byte() == b' ' | |
| sequence_number = r.read_int() | |
| assert r.read_byte() == b' ' | |
| reply_number = r.read_int() | |
| assert r.read_byte() == b' ' | |
| channel = r.read_name_str() | |
| assert channel != '' | |
| assert r.read_byte() == b' ' | |
| command = r.read_name_str() | |
| assert command != '' | |
| assert r.read_byte() == b'\n' | |
| message = FoxMessage.decode('', r) | |
| assert r.read_bytes(3) == b';;\n' | |
| return FoxFrame(frame_type, sequence_number, reply_number, channel, command, message) | |
| class FoxClient: | |
| sock: socket.socket|ssl.SSLSocket | |
| debug: bool | |
| recv_buffer: SocketBufferedReader | |
| def __init__(self, sock: socket.socket|ssl.SSLSocket, debug: bool = False): | |
| self.sock = sock | |
| self.debug = debug | |
| self.recv_buffer = SocketBufferedReader(sock) | |
| def send(self, frame: FoxFrame): | |
| bs = frame.encode() | |
| if self.debug: | |
| print('>>>\t', frame) | |
| print('\t\t', frame.message.to_dict()) | |
| print() | |
| self.sock.sendall(bs) | |
| def recv(self) -> FoxFrame: | |
| frame = FoxFrame.decode(self.recv_buffer) | |
| if self.debug: | |
| print('<<<\t', frame) | |
| print('\t\t', frame.message.to_dict()) | |
| print() | |
| return frame | |
| if __name__ == '__main__': | |
| import sys | |
| from urllib.parse import urlparse | |
| if len(sys.argv) != 2: | |
| print("Usage: foxclient.py fox[s]://ip[:port]") | |
| exit(1) | |
| uri = urlparse(sys.argv[1]) | |
| scheme = uri.scheme.lower() | |
| assert scheme == "fox" or scheme == "foxs" | |
| secure = scheme == "foxs" | |
| if uri.port is not None: | |
| port = uri.port | |
| elif secure: | |
| port = 4911 | |
| else: | |
| port = 1911 | |
| sock = socket.create_connection((uri.hostname, port), timeout=5) | |
| if secure: | |
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) | |
| context.check_hostname = False | |
| context.verify_mode = ssl.CERT_NONE | |
| sock = context.wrap_socket(sock) | |
| client = FoxClient(sock, debug=True) | |
| message = FoxMessage.from_dict(d={ | |
| "fox.version": "1.0.2", | |
| "id": 1, | |
| }) | |
| frame = FoxFrame(FoxFrame.ASYNC, 1, -1, "fox", "hello", message) | |
| client.send(frame) | |
| _hello_reply = client.recv() | |
| _challenge = client.recv() | |
| sock.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment