Created
August 11, 2019 22:46
-
-
Save chrisconlon-klaviyo/6ab0996780e66d2640076857bc89124e to your computer and use it in GitHub Desktop.
A python3 script that can find + connect to an Ardunio and web socket, and relay commands between them.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import asyncio | |
import colored | |
import serial | |
from serial.tools import list_ports | |
import sys | |
import uuid | |
import websockets | |
parser = argparse.ArgumentParser( | |
description='Listen for finger game players and control hardware.', | |
) | |
parser.add_argument( | |
'--server', | |
dest='server_address', | |
action='store', | |
default='localhost:9000', | |
help='The URL and port of the coordinating server to connect to.', | |
) | |
parser.add_argument( | |
'--debug', | |
dest='debug', | |
action='store_true', | |
default=False, | |
help='Whether to print debugging information or not.', | |
) | |
args = parser.parse_args() | |
VALID_COMMANDS = [ | |
'BEGIN_TOUCHING_CUP', | |
'STOPPED_TOUCHING_CUP', | |
'HEALTH_CHECK', | |
] | |
BAUD_RATE = 9600 | |
def log(message, color): | |
color_code = { | |
'danger': 13, | |
'good': 40, | |
'info': 6, | |
}.get(color, 9) | |
print(colored.stylize(message, colored.bg(color_code))) | |
def find_device(): | |
arduino_devices = [] | |
for device in list_ports.comports(): | |
if 'arduino' in device.description.lower() or (device.manufacturer and 'arduino' in device.manufacturer.lower()): | |
arduino_devices.append(device) | |
if len(arduino_devices) > 1: | |
log( | |
' - more than one arduino found: {}'.format(','.join([device.product for device in arduino_devices])), | |
'danger', | |
) | |
if not arduino_devices: | |
log( | |
' - no arduino like devices found.', | |
'danger', | |
) | |
return | |
return arduino_devices[0] | |
async def client(identifier, arduino): | |
async with websockets.connect('wss://{}'.format(args.server_address)) as websocket: | |
await websocket.send('REGISTER:{}'.format(identifier)) | |
while True: | |
command = await websocket.recv() | |
if command not in VALID_COMMANDS: | |
if args.debug: | |
log('Unknown command: {}'.format(command), 'danger') | |
else: | |
if command == 'HEALTH_CHECK': | |
await websocket.send('HEALTHY:{}'.format(identifier)) | |
else: | |
log(command, 'good') | |
arduino.write('{}\n'.format(command).encode('utf-8')) | |
if __name__ == '__main__': | |
identifier = str(uuid.uuid4()) | |
arduino = find_device() | |
if not arduino: | |
sys.exit(1) | |
try: | |
arduino = serial.Serial(arduino.device, BAUD_RATE) | |
log(' - arduino found and connected', 'info') | |
log(' - connecting client to server as: {}'.format(identifier), 'info') | |
asyncio.get_event_loop().run_until_complete(client(identifier, arduino)) | |
except websockets.exceptions.ConnectionClosed: | |
log(' - connection closed by remote', 'danger') | |
except OSError as error: | |
log(' - cannot connect to remote server: {}'.format(error), 'danger') | |
except KeyboardInterrupt: | |
log(' - disconnecting', 'info') | |
except serial.serialutil.SerialException: | |
log(' - cannot connect to arduino, it is in use', 'danger') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment