Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import abc
- import argparse
- import contextlib
- import io
- import os
- import platform
- import socket
- import subprocess
- import sys
- import zlib
- from typing import Any, Callable, Optional, Sequence, Tuple, Union, List
- class ArticulatedValueError(ValueError, argparse.ArgumentTypeError):
- """ArticulatedTypeError is a ValueError that contains an error message
- displayed by argparse."""
- def parse_host_and_maybe_port(host_port: str) -> Tuple[str, Optional[int]]:
- """
- parse_host_and_maybe_port splits host:port into a typed tuple.
- The tuple always contains an address but the port number may be None. A
- ValueError is raised if the port is provided but does not represent a
- valid port number.
- """
- parts = host_port.split(":", 1)
- host = parts[0]
- port = None # type: Optional[int]
- if len(parts) == 2:
- try:
- port = int(parts[1])
- except ValueError:
- raise ArticulatedValueError("port is not a number")
- if 0 > port > 65535:
- raise ArticulatedValueError("port not in range 0..65535")
- return (host, port)
- def parse_host_and_port(host_port: str) -> Tuple[str, int]:
- """
- parse_host_and_port splits host:port into a typed tuple.
- A ValueError is raised if the format is invalid.
- """
- host, maybe_port = parse_host_and_maybe_port(host_port)
- if maybe_port is None:
- # See note in parse_host_and_maybe_port
- raise ArticulatedValueError("port number must be explicitly provided")
- return (host, maybe_port)
- class Device(int):
- """
- Device is a device number with major and minor components.
- Note that this class does not attempt to mimic peculiar
- encoding used by the Linux kernel.
- """
- @classmethod
- def pack(cls, major: int, minor: int) -> "Device":
- return cls((major << 16) | (minor & (1 << 16) - 1))
- def __str__(self) -> str:
- return "{}:{}".format(self.major, self.minor)
- def __repr__(self) -> str:
- return "Device.pack({}, {})".format(self.major, self.minor)
- @property
- def major(self) -> int:
- """major is the higher 16 bits of the device number."""
- return self >> 16
- @property
- def minor(self) -> int:
- """minor is the lower 16 bits of the device number."""
- return self & ((1 << 16) - 1)
- class OptionalFields(List[str]):
- def __str__(self) -> str:
- """__str__ returns the special formatting of optional fields."""
- if len(self):
- return " ".join(self) + " -"
- else:
- return "-"
- class MountInfoEntry(object):
- """Single entry in /proc/pid/mointinfo, see proc(5)"""
- mount_id: int
- parent_id: int
- dev: Device
- root_dir: str
- mount_point: str
- mount_opts: str
- opt_fields: OptionalFields
- fs_type: str
- mount_source: str
- sb_opts: str
- def __init__(self) -> None:
- self.mount_id = 0
- self.parent_id = 0
- self.dev = Device.pack(0, 0)
- self.root_dir = ""
- self.mount_point = ""
- self.mount_opts = ""
- self.opt_fields = OptionalFields()
- self.fs_type = ""
- self.mount_source = ""
- self.sb_opts = ""
- def __eq__(self, other: object) -> "Union[NotImplemented, bool]":
- if not isinstance(other, MountInfoEntry):
- return NotImplemented
- return (
- self.mount_id == other.mount_id
- and self.parent_id == other.parent_id
- and self.dev == other.dev
- and self.root_dir == other.root_dir
- and self.mount_point == other.mount_point
- and self.mount_opts == other.mount_opts
- and self.opt_fields == other.opt_fields
- and self.fs_type == other.fs_type
- and self.mount_source == other.mount_source
- and self.sb_opts == other.sb_opts
- )
- @classmethod
- def parse(cls, line: str) -> "MountInfoEntry":
- it = iter(line.split())
- self = cls()
- self.mount_id = int(next(it))
- self.parent_id = int(next(it))
- dev_maj, dev_min = map(int, next(it).split(":"))
- self.dev = Device((dev_maj << 16) | dev_min)
- self.root_dir = next(it)
- self.mount_point = next(it)
- self.mount_opts = next(it)
- self.opt_fields = OptionalFields()
- for opt_field in it:
- if opt_field == "-":
- break
- self.opt_fields.append(opt_field)
- self.fs_type = next(it)
- self.mount_source = next(it)
- self.sb_opts = next(it)
- try:
- next(it)
- except StopIteration:
- pass
- else:
- raise ValueError("leftovers after parsing {!r}".format(line))
- return self
- def __str__(self) -> str:
- return (
- "{0.mount_id} {0.parent_id} {0.dev} {0.root_dir}"
- " {0.mount_point} {0.mount_opts} {0.opt_fields} {0.fs_type}"
- " {0.mount_source} {0.sb_opts}"
- ).format(self)
- def __repr__(self) -> str:
- return "MountInfoEntry.parse({!r})".format(str(self))
- @property
- def dev_maj(self) -> int:
- return self.dev.major
- @property
- def dev_min(self) -> int:
- return self.dev.minor
- class ReMountReadOnly:
- """ReMountReadOnly is a context manager remounting filesystem to read-only."""
- def __init__(
- self, path: str, on_remount: Optional[Callable[[str, bool], None]] = None
- ):
- self.path = path
- self.altered = False
- self.on_remount = on_remount
- def __enter__(self) -> "Optional[ReMountReadOnly]":
- retcode = subprocess.call(["mount", "-o", "remount,ro", self.path])
- if retcode == 0:
- self.altered = True
- if self.on_remount is not None:
- self.on_remount(self.path, True)
- return self
- return None
- def __exit__(self, *exc_details: Any) -> None:
- if not self.altered:
- return
- retcode = subprocess.call(["mount", "-o", "remount,rw", self.path])
- if retcode == 0 and self.on_remount is not None:
- self.on_remount(self.path, False)
- class Command(abc.ABC):
- """Command is the interface for command line commands."""
- class Args(argparse.Namespace):
- """Args describes the parsed arguments."""
- @abc.abstractproperty
- def name(self) -> str:
- """name of the command"""
- @abc.abstractproperty
- def description(self) -> str:
- """description of the command"""
- @abc.abstractmethod
- def configure_parser(self, parser: argparse.ArgumentParser) -> None:
- """configure_parser configures the parser specific to the command."""
- @abc.abstractmethod
- def invoke(self, args: Args) -> None:
- """invoke executes the command with the given arguments."""
- def say(self, msg: str) -> None:
- """say prints stuff as the command"""
- print("raspi-tool", msg)
- class SendImgCmd(Command):
- name = "send-image"
- description = "send an image from this device to a remote machine"
- class Args(Command.Args):
- device: str
- addr: Tuple[str, int]
- def configure_parser(self, parser: argparse.ArgumentParser) -> None:
- parser.add_argument(
- "--device",
- metavar="DEVICE",
- default="/dev/mmcblk0",
- help="File representing SD card",
- )
- parser.add_argument(
- "addr",
- metavar="HOST:PORT",
- type=parse_host_and_port,
- help="Destination (e.g. running raspberry-pi-tool recv-image)",
- )
- def _on_remounted(self, path: str, is_ro: bool) -> None:
- self.say("{} re-mounted {}".format(path, "ro" if is_ro else "rw"))
- def invoke(self, args: Args) -> None:
- with contextlib.ExitStack() as stack:
- try:
- sink = socket.create_connection(args.addr)
- except IOError as exc:
- raise SystemExit("cannot connect to {}: {}".format(args.addr, exc))
- else:
- stack.enter_context(sink)
- with open("/proc/self/mountinfo") as stream:
- mountinfo = [MountInfoEntry.parse(line) for line in stream]
- for entry in mountinfo:
- if entry.mount_source.startswith(args.device):
- stack.enter_context(
- ReMountReadOnly(entry.mount_point, self._on_remounted)
- )
- try:
- source = io.FileIO(args.device)
- except IOError as exc:
- raise SystemExit("cannot open {}: {}".format(args.device, exc))
- else:
- stack.enter_context(source)
- self.say("sending {} to {}".format(args.device, args.addr))
- packer = zlib.compressobj(level=9)
- buf = bytearray(4096)
- while True:
- n = source.readinto(buf)
- if n == 0:
- break
- zbuf = packer.compress(buf[:n])
- sink.sendall(zbuf)
- del zbuf
- zbuf = packer.flush(zlib.Z_FINISH)
- sink.sendall(zbuf)
- del zbuf
- self.say("sent!")
- class RecvImgCmd(Command):
- name = "recv-image"
- description = "receive an image a remote image and save it locally"
- class Args(Command.Args):
- img: str
- addr: Optional[Tuple[str, int]]
- def configure_parser(self, parser: argparse.ArgumentParser) -> None:
- parser.add_argument(
- "--addr",
- metavar="HOST:PORT",
- type=parse_host_and_port,
- help="Destination (e.g. running raspberry-pi-tool recv-image)",
- )
- parser.add_argument(
- "--image",
- dest="img",
- metavar="IMAGE",
- help="File representing SD card image",
- default="pi.img",
- )
- def invoke(self, args: Args) -> None:
- with contextlib.ExitStack() as stack:
- # Open the file we want to write to.
- try:
- sink = stack.enter_context(io.FileIO(args.img, "w"))
- except IOError as exc:
- raise SystemExit("cannot open {}: {}".format(args.img, exc))
- # Open a socket and find a port to bind to.
- sock = stack.enter_context(
- socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- )
- host = "0.0.0.0"
- port = 12345
- if args.addr is not None:
- host, port = args.addr
- search = 1
- else:
- search = 10
- for try_port in range(port, port + search):
- try:
- sock.bind((host, try_port))
- except OSError as exc:
- self.say("cannot bind to {}:{}: {}".format(host, try_port, exc))
- else:
- break
- else:
- raise SystemExit("cannot bind to port, giving up")
- # Listen for incoming connections.
- sock.listen()
- self.say("listening on {}:{}".format(host, port))
- hostname = socket.gethostname()
- hostaddr = socket.gethostbyname(hostname)
- if hostaddr.startswith("127."):
- self.say("cannot determine useful address of this machine")
- else:
- self.say("on the raspberry pi issue the following command")
- self.say("$ sudo ./raspi-tool.py send-image {}:{}".format(hostaddr, port))
- self.say("waiting for connection...")
- # Wait for client connection.
- try:
- source, remote_addr = sock.accept()
- except Exception as exc:
- raise SystemExit("cannot accept connection: {}".format(exc))
- self.say("saving image from {} to {}".format(remote_addr, args.img))
- packer = zlib.decompressobj()
- buf = bytearray(4096)
- while True:
- n = source.recv_into(buf, 0)
- if n == 0:
- break
- zbuf = packer.decompress(buf[:n])
- sink.write(zbuf)
- del zbuf
- zbuf = packer.flush(zlib.Z_FINISH)
- sink.write(zbuf)
- del zbuf
- self.say("saved!")
- def _make_parser(commands: Sequence[Command]) -> argparse.ArgumentParser:
- """_make_parser creates an argument parser with given subcommands."""
- parser = argparse.ArgumentParser(
- description="Utility for working with Raspberry Pi devices."
- )
- parser.add_argument("--version", action="version", version="0.1")
- sub = parser.add_subparsers()
- for cmd in commands:
- cmd_parser = sub.add_parser(cmd.name, help=cmd.description)
- cmd.configure_parser(cmd_parser)
- cmd_parser.set_defaults(invoke=cmd.invoke)
- return parser
- def main() -> None:
- parser = _make_parser([SendImgCmd(), RecvImgCmd()])
- # On windows, when invoked without arguments, default to receiving an image.
- if platform.win32_ver()[0] != "" and len(sys.argv) == 1:
- args = parser.parse_args(["recv-image"])
- else:
- args = parser.parse_args()
- if not hasattr(args, "invoke"):
- parser.error("select command to execute")
- args.invoke(args)
- if __name__ == "__main__":
- try:
- main()
- except KeyboardInterrupt:
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement