Last active
November 22, 2023 00:54
-
-
Save Lokno/3a131d0c9e67c097e539e604f73961d7 to your computer and use it in GitHub Desktop.
Websocket Server that Interfaces with PyFirmata intended to be used with Pixel Composer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import websockets | |
import pyfirmata | |
import json | |
import platform | |
major,minor,_ = platform.python_version_tuple() | |
if major != '3': | |
logging.error('ERROR: Python 3 required to run.') | |
sys.exit(-1) | |
if minor >= '11': | |
# 3.11 fix for property name change in inspect for pyfirmata | |
import inspect | |
if not hasattr(inspect, 'getargspec'): | |
inspect.getargspec = inspect.getfullargspec | |
class BoardControl: | |
def __init__(self): | |
self.port = None | |
self.board = None | |
self.io = {} | |
def clamp(self, x, min_val, max_val): | |
return max(min_val, min(x, max_val)) | |
def update(self, port, data): | |
if port != self.port: | |
if self.board is not None: | |
self.board.exit() | |
self.port = port | |
self.board = pyfirmata.Arduino(self.port) | |
self.io = {} | |
gdata = {} | |
for key, value in data.items(): | |
if key == "port": | |
continue | |
attrib,name = key.split('_') | |
if name not in gdata: | |
gdata[name] = {} | |
gdata[name][attrib] = value | |
for io_name,io_attrib in gdata.items(): | |
pin = int(io_attrib['pin']) | |
value = io_attrib['value'] | |
io_type = io_attrib['type'] | |
if pin not in self.io: | |
if io_type == 'servo': | |
self.io[pin] = self.board.get_pin(f'd:{pin}:s') | |
elif io_type == 'digital': | |
self.io[pin] = self.board.get_pin(f'd:{pin}:o') | |
elif io_type == 'analog': | |
self.io[pin] = self.board.get_pin(f'a:{pin}:o') | |
elif io_type == 'digital_pwm': | |
self.io[pin] = self.board.get_pin(f'd:{pin}:p') | |
elif io_type == 'analog_pwm': | |
self.io[pin] = self.board.get_pin(f'a:{pin}:p') | |
if io_type == 'servo': | |
self.io[pin].write(self.clamp(int(value),0,180)) | |
elif io_type == 'digital': | |
self.io[pin].write(self.clamp(int(value),0,1)) | |
elif io_type == 'analog': | |
self.io[pin].write(self.clamp(int(value),0,1)) | |
elif io_type == 'digital_pwm': | |
self.io[pin].write(self.clamp(value,0.0,1.0)) | |
elif io_type == 'analog_pwm': | |
self.io[pin].write(self.clamp(value,0.0,1.0)) | |
connected_clients = set() | |
servo_ctrl = BoardControl() | |
async def register(websocket): | |
connected_clients.add(websocket) | |
print(f"Client connected: {websocket.remote_address}") | |
async def unregister(websocket): | |
connected_clients.remove(websocket) | |
print(f"Client disconnected: {websocket.remote_address}") | |
async def echo(websocket, path): | |
await register(websocket) | |
try: | |
async for message in websocket: | |
#print(f"Received message from {websocket.remote_address}: {message}") | |
data = json.loads(message) | |
servo_ctrl.update(data['port'],data) | |
except websockets.exceptions.ConnectionClosed as e: | |
print(f"Connection closed with {websocket.remote_address}: {e.reason}") | |
finally: | |
await unregister(websocket) | |
async def main(): | |
async with websockets.serve(echo, "127.0.0.1", 22300): | |
await asyncio.Future() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment