Skip to content

Instantly share code, notes, and snippets.

@Vocaned
Last active July 14, 2025 12:08
Show Gist options
  • Save Vocaned/8789cfaa1a7bdfbed43f6cd10754351f to your computer and use it in GitHub Desktop.
Save Vocaned/8789cfaa1a7bdfbed43f6cd10754351f to your computer and use it in GitHub Desktop.
import socket
import struct
import json
class IPC(socket.socket):
def __init__(self, socket_path: str, client_id: str) -> None:
self.CLIENT_ID = client_id
super().__init__(socket.AF_UNIX, socket.SOCK_STREAM)
self.connect(socket_path)
def ipc_read(self) -> tuple[int, dict]:
header = self.recv(8)
op, length = struct.unpack('<II', header)
return (op, json.loads(self.recv(length)))
def ipc_write(self, op: int, payload: dict) -> None:
s = json.dumps(payload)
header = struct.pack('<II', op, len(s))
self.send(header + s.encode('utf-8'))
def ipc_connect(self) -> dict:
self.ipc_write(0, {'v': 1, 'client_id': self.CLIENT_ID})
_, data = self.ipc_read()
if not data or 'cmd' not in data or data['cmd'] != 'DISPATCH':
print(data)
raise SystemExit
return data
def ipc_close(self) -> tuple[int, dict]:
self.ipc_write(2, {'v': 1, 'client_id': self.CLIENT_ID})
op, data = self.ipc_read()
self.close()
return (op, data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment