Created
September 12, 2024 02:29
-
-
Save matmunn/f508c521fd5ee7031ea20bf2ef79210a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import asyncio | |
from typing import Optional | |
import asyncclick as click | |
from ftms_walkingpad.pad import Controller, Scanner | |
from bleak.exc import BleakError | |
@click.group() | |
@click.option("--device", help="Device to connect to.") | |
@click.pass_context | |
def cli(ctx, device): | |
ctx.ensure_object(dict) | |
ctx.obj["device"] = device | |
@cli.command() | |
@click.option("--timeout", type=float, default=3.0) | |
async def list_devices(timeout): | |
scanner = Scanner() | |
devices = await scanner.scan(timeout=timeout) | |
if not devices: | |
click.echo("No devices found.") | |
return | |
for device in devices: | |
click.echo(f"{device[1]}: {device[0]}") | |
async def start_device(device: str, speed: Optional[float]): | |
try: | |
controller = Controller() | |
await controller.run(device) | |
await controller.start_belt() | |
await controller.disconnect() | |
except BleakError: | |
click.echo("Device not found or couldn't connect.") | |
return | |
if speed: | |
await asyncio.sleep(3) | |
await set_device_speed(device, speed) | |
@cli.command() | |
@click.pass_context | |
@click.option("--speed", type=float, required=False) | |
async def start(ctx, speed): | |
device = ctx.obj.get("device") | |
if not device: | |
click.echo("Please specify device to connect to.") | |
return | |
await start_device(device, speed) | |
click.echo("Belt started") | |
async def stop_device(device): | |
try: | |
controller = Controller() | |
await controller.run(device) | |
await controller.stop_belt() | |
await controller.disconnect() | |
except BleakError: | |
click.echo("Device not found or couldn't connect.") | |
return | |
@cli.command() | |
@click.pass_context | |
async def stop(ctx): | |
device = ctx.obj.get("device") | |
if not device: | |
click.echo("Please specify device to connect to.") | |
return | |
await stop_device(device) | |
click.echo("Belt stopped") | |
async def set_device_speed(device, speed): | |
try: | |
controller = Controller() | |
await controller.run(device) | |
await controller.change_speed(speed) | |
await controller.disconnect() | |
except BleakError: | |
click.echo("Device not found or couldn't connect.") | |
return | |
@cli.command() | |
@click.argument("speed", type=float, required=True) | |
@click.pass_context | |
async def set_speed(ctx, speed): | |
device = ctx.obj.get("device") | |
if not device: | |
click.echo("Please specify device to connect to.") | |
return | |
await set_device_speed(device, speed) | |
click.echo(f"Speed set to {speed}km/h") | |
if __name__ == "__main__": | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment