Skip to content

Instantly share code, notes, and snippets.

@Vocaned
Last active October 13, 2025 00:38
Show Gist options
  • Select an option

  • Save Vocaned/8789cfaa1a7bdfbed43f6cd10754351f to your computer and use it in GitHub Desktop.

Select an option

Save Vocaned/8789cfaa1a7bdfbed43f6cd10754351f to your computer and use it in GitHub Desktop.
import socket
import struct
import json
class IPC(socket.socket):
def __init__(self, socket_path: str, client_id: str) -> None:
self.CLIENT_ID = client_id
super().__init__(socket.AF_UNIX, socket.SOCK_STREAM)
self.settimeout(5)
self.connect(socket_path)
def ipc_read(self) -> tuple[int, dict]:
header = self.recv(8)
op, length = struct.unpack('<II', header)
d = json.loads(self.recv(length))
if 'code' in d and d['code'] == 1000:
# Unknown error (typically user logout), assume connection is closed
raise RuntimeError(str(d))
return (op, d)
def ipc_write(self, op: int, payload: dict) -> None:
s = json.dumps(payload)
header = struct.pack('<II', op, len(s))
self.send(header + s.encode('utf-8'))
def ipc_connect(self) -> dict:
self.ipc_write(0, {'v': 1, 'client_id': self.CLIENT_ID})
_, data = self.ipc_read()
if not data or 'cmd' not in data or data['cmd'] != 'DISPATCH':
raise ValueError(f'Encountered unexpected data when logging in, no clue what to do: {data}')
return data
def ipc_close(self) -> tuple[int, dict]:
self.ipc_write(2, {'v': 1, 'client_id': self.CLIENT_ID})
op, data = self.ipc_read()
self.close()
return (op, data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment