Last active
September 25, 2019 09:46
-
-
Save zyga/2210f334115645a0014e1ecce22a944c to your computer and use it in GitHub Desktop.
raspi-tool for sending/receiving images from running Raspberry Pi devices
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import abc | |
| import argparse | |
| import contextlib | |
| import io | |
| import os | |
| import platform | |
| import socket | |
| import subprocess | |
| import sys | |
| import zlib | |
| from typing import Any, Callable, Optional, Sequence, Tuple, Union, List | |
| class ArticulatedValueError(ValueError, argparse.ArgumentTypeError): | |
| """ArticulatedTypeError is a ValueError that contains an error message | |
| displayed by argparse.""" | |
| def parse_host_and_maybe_port(host_port: str) -> Tuple[str, Optional[int]]: | |
| """ | |
| parse_host_and_maybe_port splits host:port into a typed tuple. | |
| The tuple always contains an address but the port number may be None. A | |
| ValueError is raised if the port is provided but does not represent a | |
| valid port number. | |
| """ | |
| parts = host_port.split(":", 1) | |
| host = parts[0] | |
| port = None # type: Optional[int] | |
| if len(parts) == 2: | |
| try: | |
| port = int(parts[1]) | |
| except ValueError: | |
| raise ArticulatedValueError("port is not a number") | |
| if 0 > port > 65535: | |
| raise ArticulatedValueError("port not in range 0..65535") | |
| return (host, port) | |
| def parse_host_and_port(host_port: str) -> Tuple[str, int]: | |
| """ | |
| parse_host_and_port splits host:port into a typed tuple. | |
| A ValueError is raised if the format is invalid. | |
| """ | |
| host, maybe_port = parse_host_and_maybe_port(host_port) | |
| if maybe_port is None: | |
| # See note in parse_host_and_maybe_port | |
| raise ArticulatedValueError("port number must be explicitly provided") | |
| return (host, maybe_port) | |
| class Device(int): | |
| """ | |
| Device is a device number with major and minor components. | |
| Note that this class does not attempt to mimic peculiar | |
| encoding used by the Linux kernel. | |
| """ | |
| @classmethod | |
| def pack(cls, major: int, minor: int) -> "Device": | |
| return cls((major << 16) | (minor & (1 << 16) - 1)) | |
| def __str__(self) -> str: | |
| return "{}:{}".format(self.major, self.minor) | |
| def __repr__(self) -> str: | |
| return "Device.pack({}, {})".format(self.major, self.minor) | |
| @property | |
| def major(self) -> int: | |
| """major is the higher 16 bits of the device number.""" | |
| return self >> 16 | |
| @property | |
| def minor(self) -> int: | |
| """minor is the lower 16 bits of the device number.""" | |
| return self & ((1 << 16) - 1) | |
| class OptionalFields(List[str]): | |
| def __str__(self) -> str: | |
| """__str__ returns the special formatting of optional fields.""" | |
| if len(self): | |
| return " ".join(self) + " -" | |
| else: | |
| return "-" | |
| class MountInfoEntry(object): | |
| """Single entry in /proc/pid/mointinfo, see proc(5)""" | |
| mount_id: int | |
| parent_id: int | |
| dev: Device | |
| root_dir: str | |
| mount_point: str | |
| mount_opts: str | |
| opt_fields: OptionalFields | |
| fs_type: str | |
| mount_source: str | |
| sb_opts: str | |
| def __init__(self) -> None: | |
| self.mount_id = 0 | |
| self.parent_id = 0 | |
| self.dev = Device.pack(0, 0) | |
| self.root_dir = "" | |
| self.mount_point = "" | |
| self.mount_opts = "" | |
| self.opt_fields = OptionalFields() | |
| self.fs_type = "" | |
| self.mount_source = "" | |
| self.sb_opts = "" | |
| def __eq__(self, other: object) -> "Union[NotImplemented, bool]": | |
| if not isinstance(other, MountInfoEntry): | |
| return NotImplemented | |
| return ( | |
| self.mount_id == other.mount_id | |
| and self.parent_id == other.parent_id | |
| and self.dev == other.dev | |
| and self.root_dir == other.root_dir | |
| and self.mount_point == other.mount_point | |
| and self.mount_opts == other.mount_opts | |
| and self.opt_fields == other.opt_fields | |
| and self.fs_type == other.fs_type | |
| and self.mount_source == other.mount_source | |
| and self.sb_opts == other.sb_opts | |
| ) | |
| @classmethod | |
| def parse(cls, line: str) -> "MountInfoEntry": | |
| it = iter(line.split()) | |
| self = cls() | |
| self.mount_id = int(next(it)) | |
| self.parent_id = int(next(it)) | |
| dev_maj, dev_min = map(int, next(it).split(":")) | |
| self.dev = Device((dev_maj << 16) | dev_min) | |
| self.root_dir = next(it) | |
| self.mount_point = next(it) | |
| self.mount_opts = next(it) | |
| self.opt_fields = OptionalFields() | |
| for opt_field in it: | |
| if opt_field == "-": | |
| break | |
| self.opt_fields.append(opt_field) | |
| self.fs_type = next(it) | |
| self.mount_source = next(it) | |
| self.sb_opts = next(it) | |
| try: | |
| next(it) | |
| except StopIteration: | |
| pass | |
| else: | |
| raise ValueError("leftovers after parsing {!r}".format(line)) | |
| return self | |
| def __str__(self) -> str: | |
| return ( | |
| "{0.mount_id} {0.parent_id} {0.dev} {0.root_dir}" | |
| " {0.mount_point} {0.mount_opts} {0.opt_fields} {0.fs_type}" | |
| " {0.mount_source} {0.sb_opts}" | |
| ).format(self) | |
| def __repr__(self) -> str: | |
| return "MountInfoEntry.parse({!r})".format(str(self)) | |
| @property | |
| def dev_maj(self) -> int: | |
| return self.dev.major | |
| @property | |
| def dev_min(self) -> int: | |
| return self.dev.minor | |
| class ReMountReadOnly: | |
| """ReMountReadOnly is a context manager remounting filesystem to read-only.""" | |
| def __init__( | |
| self, path: str, on_remount: Optional[Callable[[str, bool], None]] = None | |
| ): | |
| self.path = path | |
| self.altered = False | |
| self.on_remount = on_remount | |
| def __enter__(self) -> "Optional[ReMountReadOnly]": | |
| retcode = subprocess.call(["mount", "-o", "remount,ro", self.path]) | |
| if retcode == 0: | |
| self.altered = True | |
| if self.on_remount is not None: | |
| self.on_remount(self.path, True) | |
| return self | |
| return None | |
| def __exit__(self, *exc_details: Any) -> None: | |
| if not self.altered: | |
| return | |
| retcode = subprocess.call(["mount", "-o", "remount,rw", self.path]) | |
| if retcode == 0 and self.on_remount is not None: | |
| self.on_remount(self.path, False) | |
| class Command(abc.ABC): | |
| """Command is the interface for command line commands.""" | |
| class Args(argparse.Namespace): | |
| """Args describes the parsed arguments.""" | |
| @abc.abstractproperty | |
| def name(self) -> str: | |
| """name of the command""" | |
| @abc.abstractproperty | |
| def description(self) -> str: | |
| """description of the command""" | |
| @abc.abstractmethod | |
| def configure_parser(self, parser: argparse.ArgumentParser) -> None: | |
| """configure_parser configures the parser specific to the command.""" | |
| @abc.abstractmethod | |
| def invoke(self, args: Args) -> None: | |
| """invoke executes the command with the given arguments.""" | |
| def say(self, msg: str) -> None: | |
| """say prints stuff as the command""" | |
| print("raspi-tool", msg) | |
| class SendImgCmd(Command): | |
| name = "send-image" | |
| description = "send an image from this device to a remote machine" | |
| class Args(Command.Args): | |
| device: str | |
| addr: Tuple[str, int] | |
| def configure_parser(self, parser: argparse.ArgumentParser) -> None: | |
| parser.add_argument( | |
| "--device", | |
| metavar="DEVICE", | |
| default="/dev/mmcblk0", | |
| help="File representing SD card", | |
| ) | |
| parser.add_argument( | |
| "addr", | |
| metavar="HOST:PORT", | |
| type=parse_host_and_port, | |
| help="Destination (e.g. running raspberry-pi-tool recv-image)", | |
| ) | |
| def _on_remounted(self, path: str, is_ro: bool) -> None: | |
| self.say("{} re-mounted {}".format(path, "ro" if is_ro else "rw")) | |
| def invoke(self, args: Args) -> None: | |
| with contextlib.ExitStack() as stack: | |
| try: | |
| sink = socket.create_connection(args.addr) | |
| except IOError as exc: | |
| raise SystemExit("cannot connect to {}: {}".format(args.addr, exc)) | |
| else: | |
| stack.enter_context(sink) | |
| with open("/proc/self/mountinfo") as stream: | |
| mountinfo = [MountInfoEntry.parse(line) for line in stream] | |
| for entry in mountinfo: | |
| if entry.mount_source.startswith(args.device): | |
| stack.enter_context( | |
| ReMountReadOnly(entry.mount_point, self._on_remounted) | |
| ) | |
| try: | |
| source = io.FileIO(args.device) | |
| except IOError as exc: | |
| raise SystemExit("cannot open {}: {}".format(args.device, exc)) | |
| else: | |
| stack.enter_context(source) | |
| self.say("sending {} to {}".format(args.device, args.addr)) | |
| packer = zlib.compressobj(level=9) | |
| buf = bytearray(4096) | |
| while True: | |
| n = source.readinto(buf) | |
| if n == 0: | |
| break | |
| zbuf = packer.compress(buf[:n]) | |
| sink.sendall(zbuf) | |
| del zbuf | |
| zbuf = packer.flush(zlib.Z_FINISH) | |
| sink.sendall(zbuf) | |
| del zbuf | |
| self.say("sent!") | |
| class RecvImgCmd(Command): | |
| name = "recv-image" | |
| description = "receive an image a remote image and save it locally" | |
| class Args(Command.Args): | |
| img: str | |
| addr: Optional[Tuple[str, int]] | |
| def configure_parser(self, parser: argparse.ArgumentParser) -> None: | |
| parser.add_argument( | |
| "--addr", | |
| metavar="HOST:PORT", | |
| type=parse_host_and_port, | |
| help="Destination (e.g. running raspberry-pi-tool recv-image)", | |
| ) | |
| parser.add_argument( | |
| "--image", | |
| dest="img", | |
| metavar="IMAGE", | |
| help="File representing SD card image", | |
| default="pi.img", | |
| ) | |
| def invoke(self, args: Args) -> None: | |
| with contextlib.ExitStack() as stack: | |
| # Open the file we want to write to. | |
| try: | |
| sink = stack.enter_context(io.FileIO(args.img, "w")) | |
| except IOError as exc: | |
| raise SystemExit("cannot open {}: {}".format(args.img, exc)) | |
| # Open a socket and find a port to bind to. | |
| sock = stack.enter_context( | |
| socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| ) | |
| host = "0.0.0.0" | |
| port = 12345 | |
| if args.addr is not None: | |
| host, port = args.addr | |
| search = 1 | |
| else: | |
| search = 10 | |
| for try_port in range(port, port + search): | |
| try: | |
| sock.bind((host, try_port)) | |
| except OSError as exc: | |
| self.say("cannot bind to {}:{}: {}".format(host, try_port, exc)) | |
| else: | |
| break | |
| else: | |
| raise SystemExit("cannot bind to port, giving up") | |
| # Listen for incoming connections. | |
| sock.listen() | |
| self.say("listening on {}:{}".format(host, port)) | |
| hostname = socket.gethostname() | |
| hostaddr = socket.gethostbyname(hostname) | |
| if hostaddr.startswith("127."): | |
| self.say("cannot determine useful address of this machine") | |
| else: | |
| self.say("on the raspberry pi issue the following command") | |
| self.say("$ sudo ./raspi-tool.py send-image {}:{}".format(hostaddr, port)) | |
| self.say("waiting for connection...") | |
| # Wait for client connection. | |
| try: | |
| source, remote_addr = sock.accept() | |
| except Exception as exc: | |
| raise SystemExit("cannot accept connection: {}".format(exc)) | |
| self.say("saving image from {} to {}".format(remote_addr, args.img)) | |
| packer = zlib.decompressobj() | |
| buf = bytearray(4096) | |
| while True: | |
| n = source.recv_into(buf, 0) | |
| if n == 0: | |
| break | |
| zbuf = packer.decompress(buf[:n]) | |
| sink.write(zbuf) | |
| del zbuf | |
| zbuf = packer.flush(zlib.Z_FINISH) | |
| sink.write(zbuf) | |
| del zbuf | |
| self.say("saved!") | |
| def _make_parser(commands: Sequence[Command]) -> argparse.ArgumentParser: | |
| """_make_parser creates an argument parser with given subcommands.""" | |
| parser = argparse.ArgumentParser( | |
| description="Utility for working with Raspberry Pi devices." | |
| ) | |
| parser.add_argument("--version", action="version", version="0.1") | |
| sub = parser.add_subparsers() | |
| for cmd in commands: | |
| cmd_parser = sub.add_parser(cmd.name, help=cmd.description) | |
| cmd.configure_parser(cmd_parser) | |
| cmd_parser.set_defaults(invoke=cmd.invoke) | |
| return parser | |
| def main() -> None: | |
| parser = _make_parser([SendImgCmd(), RecvImgCmd()]) | |
| # On windows, when invoked without arguments, default to receiving an image. | |
| if platform.win32_ver()[0] != "" and len(sys.argv) == 1: | |
| args = parser.parse_args(["recv-image"]) | |
| else: | |
| args = parser.parse_args() | |
| if not hasattr(args, "invoke"): | |
| parser.error("select command to execute") | |
| args.invoke(args) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment