Created
November 10, 2022 00:54
-
-
Save Neradoc/64795d4c9b7b5ecd12217b773fdbad92 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import click | |
import discotool | |
import os | |
import shutil | |
import subprocess | |
current_boards = [] | |
def read_board_id(board): | |
if not board.mountpoint: | |
return None | |
if "BOOT" in board.mountpoint: | |
info = os.path.join(board.mountpoint, "INFO_UF2.TXT") | |
with open(info, "r") as fp: | |
while line := fp.readline(): | |
if line.startswith("Board-ID:"): | |
return line[9:].strip() | |
else: | |
info = os.path.join(board.mountpoint, "boot_out.txt") | |
with open(info, "r") as fp: | |
while line := fp.readline(): | |
if line.startswith("Board ID:"): | |
return line[9:].strip() | |
async def wait_for_drive(drive_names, new=False): | |
while True: | |
for board in discotool.get_identified_devices(): | |
# if we want a new board, skip boards with known location | |
if new and board.location in current_boards: | |
continue | |
# find the first drive that matches | |
if board.mountpoint: | |
for drive in drive_names: | |
if drive in board.mountpoint: | |
return board | |
await asyncio.sleep(1) | |
async def wait_for_location(location): | |
while True: | |
for board in discotool.get_identified_devices(): | |
if board["usb_location"] == location | |
return board | |
await asyncio.sleep(1) | |
async def copy_files_to(source_paths, drive_names=None, location=None): | |
print(f" Waiting for {repr(drive_names)} or {location}") | |
if not location: | |
board = await wait_for_drive(drive_names) | |
else: | |
board = await wait_for_location(location) | |
location = board["usb_location"] | |
board_id = read_board_id(board) | |
if board.mountpoint: | |
for source_path in source_paths: | |
source_name = os.path.basename(source_path) | |
print(f" Copying {source_name} to {board.mountpoint}") | |
target_path = os.path.join(board.mountpoint, source_name) | |
if os.path.isdir(source_path): | |
shutil.copytree(source_path, target_path) | |
else: | |
shutil.copyfile(source_path, target_path) | |
def eject(board): | |
volume_name = board.volume_name | |
command = ["osascript", "-e", f'tell application "Finder" to eject "{volume_name}"'] | |
subprocess.call(command) | |
async def update_one_board(board): | |
print("{board.location},{board.name} BOOT") | |
await copy_files_to(uf2, boot_names) | |
print("{board.location},{board.name} CIRCUITPY") | |
await copy_files_to(files, location=location) | |
print("{board.location},{board.name} EJECTING") | |
eject(board) | |
# forget the board | |
current_boards.remove(board.location) | |
async def wait_find_new_board(uf2, boot_names, files, circuitpy_names): | |
board = await wait_for_drive(boot_names) | |
current_boards.append(board) | |
return board | |
@click.group() | |
async def main(): | |
while True: | |
print("Insert next board and activate bootloader") | |
# wait for a new board to connect: | |
board = await wait_for_new_board( | |
"something.uf2", | |
["CLUEBOOT", "CPLAYBOOT", "BOOT"], | |
["source/code.py", "source/assets_dir", "source/lib"], | |
["CIRCUITPY"], | |
) | |
asyncio.create_task(update_one_board(board)) | |
if __name__ == "__main__": # pragma: no cover | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment