Skip to content

Instantly share code, notes, and snippets.

@ali1234
Created February 27, 2021 21:06
Show Gist options
  • Save ali1234/8789a49b86d0d9bc8f93985cd65469a7 to your computer and use it in GitHub Desktop.
Save ali1234/8789a49b86d0d9bc8f93985cd65469a7 to your computer and use it in GitHub Desktop.
32blit multiplayer bridge (doesn't work)
import click
import pyudev
import socket
import serial
import serial.threaded
class Worker(serial.threaded.Protocol):
def __init__(self, socket):
self.socket = socket
def __call__(self):
return self
def data_received(self, data):
if self.socket is not None:
print('S->I', data)
self.socket.sendall(data)
else:
print('S->_', data)
class SerialBridge:
"""serial->socket"""
def __init__(self, tty, port=0x32B1):
self.serial = serial.serial_for_url(tty, do_not_open=True)
self.port = port
def start_server(self):
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(('', self.port))
print("Listening")
srv.listen(1)
client_socket, addr = srv.accept()
try:
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 1)
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 1)
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)
client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
except AttributeError:
pass # XXX not available on windows
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
print("Accept")
self.socket = client_socket
def start_client(self):
client_socket = socket.socket()
client_socket.connect(('localhost', int(self.port)))
client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
print("Connecting")
self.socket = client_socket
def __enter__(self):
try:
self.start_client()
except socket.error:
self.start_server()
self.serial.open()
worker = Worker(self.socket)
serial_worker = serial.threaded.ReaderThread(self.serial, worker)
serial_worker.start()
return self
def communicate(self):
while True:
data = self.socket.recv(1024)
if not data:
break
print('I->S', data)
self.serial.write(data) # get a bunch of bytes and send them
def __exit__(self, exc_type, exc_val, exc_tb):
self.serial.close()
socket = self.socket
self.socket = None
socket.close()
def get_next_blit():
context = pyudev.Context()
for device in context.list_devices(subsystem='tty'):
if device.get('ID_MODEL', None) == '32Blit_CDC':
return device
print("Waiting for 32Blit to connect on USB.")
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by('tty')
for device in iter(monitor.poll, None):
if device.get('ID_MODEL', None) == '32Blit_CDC' and device.get('ACTION', None) == 'add':
return device
@click.command()
def main():
while True:
device = get_next_blit()
print(device['DEVNAME'])
try:
with SerialBridge(device['DEVNAME']) as br:
print("Ready")
br.communicate()
except socket.error:
print("Disconnected")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment