Skip to content

Instantly share code, notes, and snippets.

@Neradoc
Created November 10, 2022 00:54
Show Gist options
  • Save Neradoc/64795d4c9b7b5ecd12217b773fdbad92 to your computer and use it in GitHub Desktop.
Save Neradoc/64795d4c9b7b5ecd12217b773fdbad92 to your computer and use it in GitHub Desktop.
import asyncio
import click
import discotool
import os
import shutil
import subprocess
current_boards = []
def read_board_id(board):
if not board.mountpoint:
return None
if "BOOT" in board.mountpoint:
info = os.path.join(board.mountpoint, "INFO_UF2.TXT")
with open(info, "r") as fp:
while line := fp.readline():
if line.startswith("Board-ID:"):
return line[9:].strip()
else:
info = os.path.join(board.mountpoint, "boot_out.txt")
with open(info, "r") as fp:
while line := fp.readline():
if line.startswith("Board ID:"):
return line[9:].strip()
async def wait_for_drive(drive_names, new=False):
while True:
for board in discotool.get_identified_devices():
# if we want a new board, skip boards with known location
if new and board.location in current_boards:
continue
# find the first drive that matches
if board.mountpoint:
for drive in drive_names:
if drive in board.mountpoint:
return board
await asyncio.sleep(1)
async def wait_for_location(location):
while True:
for board in discotool.get_identified_devices():
if board["usb_location"] == location
return board
await asyncio.sleep(1)
async def copy_files_to(source_paths, drive_names=None, location=None):
print(f" Waiting for {repr(drive_names)} or {location}")
if not location:
board = await wait_for_drive(drive_names)
else:
board = await wait_for_location(location)
location = board["usb_location"]
board_id = read_board_id(board)
if board.mountpoint:
for source_path in source_paths:
source_name = os.path.basename(source_path)
print(f" Copying {source_name} to {board.mountpoint}")
target_path = os.path.join(board.mountpoint, source_name)
if os.path.isdir(source_path):
shutil.copytree(source_path, target_path)
else:
shutil.copyfile(source_path, target_path)
def eject(board):
volume_name = board.volume_name
command = ["osascript", "-e", f'tell application "Finder" to eject "{volume_name}"']
subprocess.call(command)
async def update_one_board(board):
print("{board.location},{board.name} BOOT")
await copy_files_to(uf2, boot_names)
print("{board.location},{board.name} CIRCUITPY")
await copy_files_to(files, location=location)
print("{board.location},{board.name} EJECTING")
eject(board)
# forget the board
current_boards.remove(board.location)
async def wait_find_new_board(uf2, boot_names, files, circuitpy_names):
board = await wait_for_drive(boot_names)
current_boards.append(board)
return board
@click.group()
async def main():
while True:
print("Insert next board and activate bootloader")
# wait for a new board to connect:
board = await wait_for_new_board(
"something.uf2",
["CLUEBOOT", "CPLAYBOOT", "BOOT"],
["source/code.py", "source/assets_dir", "source/lib"],
["CIRCUITPY"],
)
asyncio.create_task(update_one_board(board))
if __name__ == "__main__": # pragma: no cover
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment