Skip to content

Instantly share code, notes, and snippets.

@SoftPoison
Last active May 15, 2025 05:35
Show Gist options
  • Select an option

  • Save SoftPoison/2cd16c41149012964bcbd15f16cc7562 to your computer and use it in GitHub Desktop.

Select an option

Save SoftPoison/2cd16c41149012964bcbd15f16cc7562 to your computer and use it in GitHub Desktop.
a Python script for interacting with FOX(S) servers. auth NYI
from dataclasses import dataclass
from collections import deque
import socket
import ssl
import math
from datetime import datetime
def write_encoded(s: str|None, bs: bytearray):
if s == None:
bs.extend(b'#null;')
return
for c in s:
oc = ord(c)
if oc < ord(' ') or oc > 127:
bs.extend(b"#{0:02x};".format(oc))
else:
bs.append(oc)
class SocketBufferedReader:
_sock: socket.socket|ssl.SSLSocket
_buffer: deque[int]
def __init__(self, sock: socket.socket|ssl.SSLSocket):
self._sock = sock
self._buffer = deque()
def _recv_more(self):
bs = self._sock.recv()
assert len(bs) != 0
self._buffer.extend(bs)
def peek_byte_int(self) -> int:
if len(self._buffer) == 0:
self._recv_more()
return self._buffer[0]
def read_byte_int(self) -> int:
if len(self._buffer) == 0:
self._recv_more()
return self._buffer.popleft()
def read_byte(self) -> bytes:
return bytes([self.read_byte_int()])
def peek_byte(self) -> bytes:
return bytes([self.peek_byte_int()])
def read_bytes(self, num: int) -> bytes:
while len(self._buffer) < num:
self._recv_more()
bs = bytearray()
for _ in range(num):
bs.append(self._buffer.popleft())
return bytes(bs)
def peek_bytes(self, num: int) -> bytes:
while len(self._buffer) < num:
self._recv_more()
bs = bytearray()
for i in range(num):
bs.append(self._buffer[i])
return bytes(bs)
def read_line(self) -> bytes:
b = 0
bs = bytearray()
while b != 0x0a:
if len(self._buffer) == 0:
self._recv_more()
b = self._buffer.popleft()
bs.append(b)
return bytes(bs)
def read_int(self) -> int:
bs = bytearray()
while True:
b = self.peek_byte()
if b == b'-':
if len(bs) > 0:
raise ValueError()
self.read_byte_int()
bs.extend(b)
continue
if b < b'0' or b > b'9':
break
self.read_byte_int()
bs.extend(b)
return int(bs)
def read_float(self) -> float:
raise NotImplementedError()
def read_str(self) -> str|None:
if self.peek_bytes(6) == b'#null;':
self.read_bytes(6)
return None
bs = bytearray()
while True:
b = self.peek_byte_int()
if b < ord(' '):
return bs.decode()
if b == ord('#'):
self.read_byte_int()
c = bytes.fromhex(self.read_bytes(2).decode())
assert self.read_byte_int() == ord(';')
bs.extend(c)
else:
self.read_byte_int()
bs.append(b)
def read_name_str(self) -> str:
bs = bytearray()
while True:
b = self.peek_byte_int()
if b == ord('#'):
self.read_byte_int()
c = bytes.fromhex(self.read_bytes(2).decode())
assert self.read_byte_int() == ord(';')
bs.extend(c)
elif (b >= ord('a') and b <= ord('z')) or (b >= ord('A') and b <= ord('Z')) or (b >= ord('0') and b <= ord('9')) or b == ord('-') or b == ord('.') or b == ord('_') or b == ord('$'):
self.read_byte_int()
bs.append(b)
else:
return bs.decode()
def read_hex_int(self) -> int:
bs = bytearray()
while True:
b = self.peek_byte_int()
if b >= ord('a') and b <= ord('f'):
self.read_byte_int()
bs.append(b)
else:
return int(bs, 16)
class FoxTuple:
name: str
tp: str
value: object
def __init__(self, name: str, value):
self.name = name
self.value = value
def encode(self) -> bytearray:
raise NotImplementedError()
@staticmethod
def decode(name: str, tp: str, r: SocketBufferedReader) -> 'FoxTuple':
if tp == FoxBlob.tp:
return FoxBlob.decode(name, r)
if tp == FoxFloat.tp:
return FoxFloat.decode(name, r)
if tp == FoxInteger.tp:
return FoxInteger.decode(name, r)
if tp == FoxObject.tp:
return FoxObject.decode(name, r)
if tp == FoxString.tp:
return FoxString.decode(name, r)
if tp == FoxTime.tp:
return FoxTime.decode(name, r)
if tp == FoxBoolean.tp:
return FoxBoolean.decode(name, r)
if tp == FoxMessage.tp:
return FoxMessage.decode(name, r)
raise ValueError(tp)
class FoxBlob(FoxTuple):
value: bytes
tp = 'b'
def encode(self) -> bytearray:
bs = bytearray()
bs.extend(str(len(self.value)).encode())
bs.extend(b'[')
bs.extend(self.value)
bs.extend(b']')
return bs
@staticmethod
def decode(name: str, r: SocketBufferedReader) -> 'FoxBlob':
size = r.read_int()
assert r.read_byte() == b'['
value = r.read_bytes(size)
assert r.read_byte() == b']'
return FoxBlob(name, value)
class FoxFloat(FoxTuple):
value: float
tp = 'f'
def encode(self) -> bytearray:
if math.isnan(self.value):
return bytearray("nan".encode())
if math.isinf(self.value) and self.value > 0:
return bytearray("+inf".encode())
if math.isinf(self.value) and self.value < 0:
return bytearray("-inf".encode())
return bytearray(str(self.value).encode())
def decode(name: str, r: SocketBufferedReader) -> 'FoxFloat':
s = r.read_str()
assert s != None
if s == "nan":
return FoxFloat(name, math.nan)
if s == "+inf":
return FoxFloat(name, math.inf)
if s == "-inf":
return FoxFloat(name, -math.inf)
return FoxFloat(name, float(s))
class FoxInteger(FoxTuple):
value: int
tp = 'i'
def encode(self) -> bytearray:
return bytearray(str(self.value).encode())
def decode(name: str, r: SocketBufferedReader) -> 'FoxInteger':
return FoxInteger(name, r.read_int())
@dataclass
class FoxObjectInner:
encoding: str
data: bytes
class FoxObject(FoxTuple):
value: FoxObjectInner
tp = 'o'
def encode(self) -> bytearray:
bs = bytearray()
write_encoded(self.value.encoding, bs)
bs.extend(b' ')
bs.extend(str(len(self.value.data)).encode())
bs.extend(b'[')
bs.extend(self.value.data)
bs.extend(b']')
return bs
def decode(name: str, r: SocketBufferedReader) -> 'FoxObject':
encoding = r.read_name_str()
assert r.read_byte == b' '
size = r.read_int()
assert r.read_byte() == b'['
value = r.read_bytes(size)
assert r.read_byte() == b']'
return FoxObject(name, FoxObjectInner(encoding, value))
class FoxString(FoxTuple):
value: str|None
tp = 's'
def encode(self) -> bytearray:
bs = bytearray()
write_encoded(self.value, bs)
return bs
def decode(name: str, r: SocketBufferedReader) -> 'FoxString':
return FoxString(name, r.read_str())
class FoxTime(FoxTuple):
value: datetime
tp = 't'
def encode(self) -> bytearray:
return bytearray(hex(int(self.value.timestamp() * 1000))[2:].encode())
def decode(name: str, r: SocketBufferedReader) -> 'FoxTime':
return FoxTime(name, datetime.fromtimestamp(r.read_hex_int() / 1000))
class FoxBoolean(FoxTuple):
value: bool
tp = 'z'
def encode(self) -> bytearray:
return bytearray(b't' if self.value else b'f')
def decode(name: str, r: SocketBufferedReader) -> 'FoxBoolean':
v = r.read_byte()
if v == b't':
return FoxBoolean(name, True)
if v == b'f':
return FoxBoolean(name, False)
raise ValueError()
class FoxMessage(FoxTuple):
value: list[FoxTuple]
tp = 'm'
@staticmethod
def from_dict(name: str = "", d: dict[str, object] = {}) -> 'FoxMessage':
self = FoxMessage(name, [])
for k, v in d.items():
if isinstance(v, bytes):
self.value.append(FoxBlob(k, v))
elif isinstance(v, float):
self.value.append(FoxFloat(k, v))
elif isinstance(v, int):
self.value.append(FoxInteger(k, v))
elif isinstance(v, dict):
self.value.append(FoxMessage(k, v))
elif isinstance(v, FoxObjectInner):
self.value.append(FoxObject(k, v))
elif isinstance(v, str):
self.value.append(FoxString(k, v))
elif isinstance(v, datetime):
self.value.append(FoxTime(k, v))
elif isinstance(v, bool):
self.value.append(FoxBoolean)
else:
raise ValueError(f"Unsupported type for key \"{k}\": {type(v)}")
return self
def to_dict(self) -> dict[str, object]:
out = {}
for v in self.value:
if isinstance(v, FoxObject):
out[v.name] = v.value.value
elif isinstance(v, FoxMessage):
out[v.name] = v.to_dict()
else:
out[v.name] = v.value
return out
def encode(self) -> bytearray:
bs = bytearray()
bs.extend(b'{\n')
for v in self.value:
bs.extend(v.name.encode())
bs.extend(b'=')
bs.extend(v.tp.encode())
bs.extend(b':')
bs.extend(v.encode())
bs.extend(b'\n')
bs.extend(b'}')
return bs
def decode(name: str, r: SocketBufferedReader) -> 'FoxMessage':
value = []
assert r.read_bytes(2) == b'{\n'
while r.peek_byte() != b'}':
n = r.read_name_str()
r.read_byte_int() # assert == b'=' unnecessary
tp = r.read_byte().decode()
assert r.read_byte() == b':'
v = FoxTuple.decode(n, tp, r)
assert r.read_byte() == b'\n'
value.append(v)
r.read_byte_int() # b'}'
return FoxMessage(name, value)
@dataclass
class FoxFrame:
SYNC = b's'
ASYNC = b'a'
REPLY = b'r'
ERROR = b'e'
NULL = b'n'
KEEPALIVE = b'k'
CLOSE = b'c'
VALID_TYPES = [SYNC, ASYNC, REPLY, ERROR, NULL, KEEPALIVE, CLOSE]
frame_type: bytes
sequence_number: int
reply_number: int
channel: str
command: str
message: FoxMessage
def encode(self) -> bytearray:
b = bytearray()
b.extend(b'fox ')
b.extend(self.frame_type)
b.extend(b' ')
b.extend(str(self.sequence_number).encode())
b.extend(b' ')
b.extend(str(self.reply_number).encode())
b.extend(b' ')
b.extend(self.channel.encode())
b.extend(b' ')
b.extend(self.command.encode())
b.extend(b'\n')
b.extend(self.message.encode())
b.extend(b';;\n')
return b
@staticmethod
def decode(r: SocketBufferedReader) -> 'FoxFrame':
assert r.read_bytes(4) == b'fox '
frame_type = r.read_byte()
assert frame_type in FoxFrame.VALID_TYPES
assert r.read_byte() == b' '
sequence_number = r.read_int()
assert r.read_byte() == b' '
reply_number = r.read_int()
assert r.read_byte() == b' '
channel = r.read_name_str()
assert channel != ''
assert r.read_byte() == b' '
command = r.read_name_str()
assert command != ''
assert r.read_byte() == b'\n'
message = FoxMessage.decode('', r)
assert r.read_bytes(3) == b';;\n'
return FoxFrame(frame_type, sequence_number, reply_number, channel, command, message)
class FoxClient:
sock: socket.socket|ssl.SSLSocket
debug: bool
recv_buffer: SocketBufferedReader
def __init__(self, sock: socket.socket|ssl.SSLSocket, debug: bool = False):
self.sock = sock
self.debug = debug
self.recv_buffer = SocketBufferedReader(sock)
def send(self, frame: FoxFrame):
bs = frame.encode()
if self.debug:
print('>>>\t', frame)
print('\t\t', frame.message.to_dict())
print()
self.sock.sendall(bs)
def recv(self) -> FoxFrame:
frame = FoxFrame.decode(self.recv_buffer)
if self.debug:
print('<<<\t', frame)
print('\t\t', frame.message.to_dict())
print()
return frame
if __name__ == '__main__':
import sys
from urllib.parse import urlparse
if len(sys.argv) != 2:
print("Usage: foxclient.py fox[s]://ip[:port]")
exit(1)
uri = urlparse(sys.argv[1])
scheme = uri.scheme.lower()
assert scheme == "fox" or scheme == "foxs"
secure = scheme == "foxs"
if uri.port is not None:
port = uri.port
elif secure:
port = 4911
else:
port = 1911
sock = socket.create_connection((uri.hostname, port), timeout=5)
if secure:
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = context.wrap_socket(sock)
client = FoxClient(sock, debug=True)
message = FoxMessage.from_dict(d={
"fox.version": "1.0.2",
"id": 1,
})
frame = FoxFrame(FoxFrame.ASYNC, 1, -1, "fox", "hello", message)
client.send(frame)
_hello_reply = client.recv()
_challenge = client.recv()
sock.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment