diff --git a/README.md b/README.md index dfe1ad2..ccdc121 100644 --- a/README.md +++ b/README.md @@ -118,12 +118,16 @@ Handles both `tftpboot` and `tftp` U-Boot commands transparently. - macOS serial workaround (ACK byte correction) - Cross-platform: Linux, macOS, Windows -## Automated Power Cycling (PoE) +## Automated Power Cycling -Defib can automatically power-cycle devices via a MikroTik PoE switch, -eliminating manual intervention for recovery loops and research workflows. +Defib can automatically power-cycle devices, eliminating manual intervention +for recovery loops and research workflows. Two backends are supported, +selected via `DEFIB_POWER_TYPE`: + +### MikroTik RouterOS PoE switch (default) ```bash +export DEFIB_POWER_TYPE=routeros # optional, the default export DEFIB_POE_HOST=192.168.88.1 export DEFIB_POE_USER=admin export DEFIB_POE_PASS= @@ -134,6 +138,50 @@ defib burn -c hi3516ev300 -p /dev/uart-IVG85HG50PYA-S --power-cycle -b defib burn -c hi3516ev300 -p /dev/uart-IVG85HG50PYA-S --power-cycle -t ``` +### OpenIPC Vectis UART bridge + +[Vectis](https://github.com/OpenIPC/vectis) is a USB/Ethernet UART bridge +that exposes the camera's UART over TCP and drives camera power via the +bridge's RTS/DTR lines. Vectis ≥ 1.2.0 speaks [RFC 2217][rfc2217] (Telnet +COM Port Control Option) on the listener: the data path is binary safe +and modem-control lines are commanded out-of-band as `SET-CONTROL` +sub-options instead of in-band magic bytes. Defib uses the same TCP +connection for the UART and the reset: + +[rfc2217]: https://datatracker.ietf.org/doc/html/rfc2217 + +```bash +export DEFIB_POWER_TYPE=vectis +export DEFIB_VECTIS_HOST=172.17.32.17 +export DEFIB_VECTIS_PORT=35240 # optional, the upstream default + +defib burn -c hi3516cv300 -p rfc2217://172.17.32.17:35240 --power-cycle -b +``` + +The `rfc2217://` URL scheme routes the UART through pyserial's RFC 2217 +client (which handles RFC 854 escaping for `0xFF` and the `SET-CONTROL` +DTR/RTS pulse for reset). An older Vectis without RFC 2217 support is +still reachable via the legacy `tcp://host:port` URL and the in-band +`Ctrl+P` reset trigger. + +Smoke-test the bridge first with `telnet` (which negotiates RFC 2217) +or with `socat` (which stays in legacy raw mode): + +```bash +telnet 172.17.32.17 35240 +# Or, legacy raw mode: +socat -,raw,echo=0 TCP:172.17.32.17:35240 +``` + +> Note: Vectis only emits a fixed-width reset pulse, so it does not +> work with `defib restore` (which needs independent +> `power_off`/`power_on`). +> +> Note: tight-loop bootrom catching needs a low-RTT link to Vectis. +> Over a high-RTT WAN link the bootrom's `0x20`-marker / `0xAA`-ack +> window can close before the round-trip completes; running Vectis +> on the same host as defib (or close to it on a LAN) is recommended. + The `-t` flag auto-detects the post-boot mode: - **Normal U-Boot shell** (e.g. hi3516ev300): raw terminal passthrough — type commands directly - **Download command mode** (e.g. hi3516av200): interactive `defib>` prompt that wraps commands in HiSilicon's XHEAD/XCMD protocol, enabling flash operations on devices that enter `download_process()` after serial boot diff --git a/pyproject.toml b/pyproject.toml index 47a72e0..a622c10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ hisilicon_cv6xx = "defib.protocol.hisilicon_cv6xx:HiSiliconCV6xx" [project.entry-points."defib.power"] routeros = "defib.power.routeros:RouterOSController" +vectis = "defib.power.vectis:VectisController" [build-system] requires = ["hatchling"] diff --git a/src/defib/cli/app.py b/src/defib/cli/app.py index 0de694c..442419d 100644 --- a/src/defib/cli/app.py +++ b/src/defib/cli/app.py @@ -100,10 +100,11 @@ def _dl_progress(done: int, total: int) -> None: power_controller = None poe_port = None if power_cycle: + from defib.power.factory import power_controller_from_env from defib.power.routeros import RouterOSController try: - power_controller = RouterOSController.from_env() + power_controller = power_controller_from_env() except Exception as e: if output == "json": print(json_mod.dumps({"event": "error", "message": str(e)})) @@ -111,24 +112,32 @@ def _dl_progress(done: int, total: int) -> None: console.print(f"[red]Power controller error:[/red] {e}") raise typer.Exit(1) - # Extract device label from serial port name for auto-discovery - # e.g. /dev/uart-IVGHP203Y-AF -> IVGHP203Y-AF - from pathlib import Path - port_basename = Path(port).name - device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename + if isinstance(power_controller, RouterOSController): + # Extract device label from serial port name for auto-discovery + # e.g. /dev/uart-IVGHP203Y-AF -> IVGHP203Y-AF + from pathlib import Path + port_basename = Path(port).name + device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename - try: - poe_port = await power_controller.find_port_by_comment(device_label) - except Exception as e: - if output == "json": - print(json_mod.dumps({"event": "error", "message": str(e)})) - else: - console.print(f"[red]PoE port discovery failed:[/red] {e}") - await power_controller.close() - raise typer.Exit(1) + try: + poe_port = await power_controller.find_port_by_comment(device_label) + except Exception as e: + if output == "json": + print(json_mod.dumps({"event": "error", "message": str(e)})) + else: + console.print(f"[red]PoE port discovery failed:[/red] {e}") + await power_controller.close() + raise typer.Exit(1) - if output == "human": - console.print(f"PoE control: [cyan]{poe_port}[/cyan] on [cyan]{power_controller._host}[/cyan]") + if output == "human": + console.print(f"PoE control: [cyan]{poe_port}[/cyan] on [cyan]{power_controller._host}[/cyan]") + else: + # Vectis (and any future single-port controller) has no port + # discovery — pass an empty string so the recovery session + # knows automated cycling is available. + poe_port = "" + if output == "human": + console.print(f"Power: [cyan]{power_controller.name()}[/cyan]") try: session = RecoverySession( @@ -159,6 +168,18 @@ def _dl_progress(done: int, total: int) -> None: console.print(f"[red]Failed to open serial port:[/red] {e}") raise typer.Exit(2) + # Vectis: hand the live RFC 2217 transport (or legacy raw TCP) to + # the controller so RTS/DTR toggles ride the same connection that + # the UART data uses — Vectis only allows one client at a time. + if power_controller is not None: + from defib.power.vectis import VectisController + from defib.transport.rfc2217 import Rfc2217Transport + from defib.transport.socket import SocketTransport + if isinstance(power_controller, VectisController) and isinstance( + transport, (Rfc2217Transport, SocketTransport) + ): + power_controller.attach_transport(transport) + # Rich progress bar for human output from rich.progress import TaskID progress_ctx = None @@ -1827,24 +1848,30 @@ async def _install_async( power_controller = None poe_port = None if power_cycle: + from defib.power.factory import power_controller_from_env from defib.power.routeros import RouterOSController try: - power_controller = RouterOSController.from_env() + power_controller = power_controller_from_env() except Exception as e: console.print(f"[red]Power controller error:[/red] {e}") raise typer.Exit(1) - port_basename = Path(port).name - device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename - try: - poe_port = await power_controller.find_port_by_comment(device_label) - except Exception as e: - console.print(f"[red]PoE port discovery failed:[/red] {e}") - await power_controller.close() - raise typer.Exit(1) + if isinstance(power_controller, RouterOSController): + port_basename = Path(port).name + device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename + try: + poe_port = await power_controller.find_port_by_comment(device_label) + except Exception as e: + console.print(f"[red]PoE port discovery failed:[/red] {e}") + await power_controller.close() + raise typer.Exit(1) - if output == "human": - console.print(f" PoE: [cyan]{poe_port}[/cyan]") + if output == "human": + console.print(f" PoE: [cyan]{poe_port}[/cyan]") + else: + poe_port = "" + if output == "human": + console.print(f" Power: [cyan]{power_controller.name()}[/cyan]") session = RecoverySession( chip=chip, firmware_path=str(cached), @@ -1858,6 +1885,15 @@ async def _install_async( transport = await create_transport(normalize_port_name(port)) + # Vectis: share the TCP transport for Ctrl+P delivery (see burn). + if power_controller is not None: + from defib.power.vectis import VectisController + from defib.transport.socket import SocketTransport + if isinstance(power_controller, VectisController) and isinstance( + transport, SocketTransport + ): + power_controller.attach_transport(transport) + def on_log(event: LogEvent) -> None: if output == "human": style = {"error": "red", "warn": "yellow", "info": "green"}.get(event.level, "") @@ -2367,13 +2403,27 @@ async def _restore_async( power_controller = None poe_port = None if power_cycle: + from defib.power.factory import power_controller_from_env from defib.power.routeros import RouterOSController try: - power_controller = RouterOSController.from_env() + power_controller = power_controller_from_env() except Exception as e: console.print(f"[red]Power controller error:[/red] {e}") raise typer.Exit(1) + # restore needs independent power_off/power_on (frame-blast flow); + # Vectis only emits a fixed pulse and cannot satisfy that. + if not isinstance(power_controller, RouterOSController): + console.print( + f"[red]restore requires a controller that supports independent " + f"power_off/power_on; {power_controller.name()!r} only supports " + f"power_cycle. Use --power-cycle with DEFIB_POWER_TYPE=routeros, " + f"or run restore without --power-cycle and cycle power manually." + f"[/red]" + ) + await power_controller.close() + raise typer.Exit(1) + port_basename = Path(port).name device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename try: @@ -2395,6 +2445,9 @@ async def _restore_async( # open serial on a quiet line, then let session handle power-on. if power_controller and poe_port: import asyncio as _aio + from defib.power.routeros import RouterOSController + # We rejected non-RouterOS controllers above — narrow for mypy. + assert isinstance(power_controller, RouterOSController) if output == "human": console.print(" Powering off...") power_controller._saved_poe_out[poe_port] = "forced-on" diff --git a/src/defib/power/factory.py b/src/defib/power/factory.py new file mode 100644 index 0000000..17cad4a --- /dev/null +++ b/src/defib/power/factory.py @@ -0,0 +1,31 @@ +"""Power controller factory — pick implementation from environment.""" + +from __future__ import annotations + +import os + +from defib.power.base import PowerController, PowerControllerError + + +def power_controller_from_env() -> PowerController: + """Build a PowerController based on the ``DEFIB_POWER_TYPE`` env var. + + - ``DEFIB_POWER_TYPE=routeros`` (default): MikroTik RouterOS API, + configured via ``DEFIB_POE_*``. + - ``DEFIB_POWER_TYPE=vectis``: OpenIPC Vectis UART bridge, + configured via ``DEFIB_VECTIS_*``. + + Raises: + PowerControllerError: if the type is unknown or required env + vars are missing. + """ + kind = os.environ.get("DEFIB_POWER_TYPE", "routeros").lower() + if kind == "routeros": + from defib.power.routeros import RouterOSController + return RouterOSController.from_env() + if kind == "vectis": + from defib.power.vectis import VectisController + return VectisController.from_env() + raise PowerControllerError( + f"Unknown DEFIB_POWER_TYPE: {kind!r} (expected 'routeros' or 'vectis')" + ) diff --git a/src/defib/power/vectis.py b/src/defib/power/vectis.py new file mode 100644 index 0000000..a186ee6 --- /dev/null +++ b/src/defib/power/vectis.py @@ -0,0 +1,170 @@ +"""OpenIPC Vectis UART bridge power controller. + +Vectis (https://github.com/OpenIPC/vectis) is a USB/Ethernet UART +bridge that exposes the camera's UART over TCP and gates camera power +via the bridge's RTS/DTR lines. Vectis ≥ 1.2.0 speaks RFC 2217 on the +listener: clients negotiate Telnet binary mode and drive RTS/DTR via +``SET-CONTROL`` sub-options instead of in-band magic bytes, so the +data path stays binary safe even for firmware blobs that contain the +old Ctrl+P (``0x10``) byte. + +This controller pulses RTS+DTR off → 200 ms → on through the +:class:`Rfc2217Transport` it shares with the recovery session. Two +properties of the bridge dictate the design: + +1. The camera is powered only while a TCP client is connected. If + the socket is closed the camera loses power. +2. The bridge accepts only one remote client at a time. + +Both are satisfied because defib uses the *same* RFC 2217 connection +for the UART and the modem-control commands. +""" + +from __future__ import annotations + +import asyncio +import logging +import os + +from defib.power.base import PowerController, PowerControllerError +from defib.transport.base import Transport +from defib.transport.rfc2217 import Rfc2217Transport + +logger = logging.getLogger(__name__) + + +class VectisController(PowerController): + """Drives camera power on the OpenIPC Vectis UART bridge. + + Two operating modes: + + - **Shared transport** (preferred for live recovery): the CLI + hands a live :class:`Rfc2217Transport` to the controller via + :meth:`attach_transport`. ``power_cycle`` toggles RTS+DTR + through it (RFC 2217 ``SET-CONTROL`` sub-options 8/9 + 11/12). + The transport stays open so the camera stays powered for the + whole session and the data path remains binary safe. + + - **Standalone**: no transport attached. ``power_cycle`` opens a + short-lived RFC 2217 connection, pulses RTS/DTR, and closes. + Useful for ad-hoc ``defib power cycle``-style debug commands. + Closing the connection cuts camera power, so this mode is not + suitable for driving a recovery flow. + """ + + def __init__( + self, + host: str, + port: int = 35240, + transport: Transport | None = None, + pulse_seconds: float = 0.25, + ) -> None: + self._host = host + self._port = port + self._transport: Transport | None = transport + self._owns_transport = False + self._pulse_seconds = pulse_seconds + + @classmethod + def name(cls) -> str: + return "OpenIPC Vectis UART bridge" + + @classmethod + def from_env(cls) -> VectisController: + """Create from ``DEFIB_VECTIS_*`` environment variables. + + Required: + DEFIB_VECTIS_HOST: Vectis bridge IP/hostname. + Optional: + DEFIB_VECTIS_PORT: TCP listener port (default 35240). + """ + host = os.environ.get("DEFIB_VECTIS_HOST") + if not host: + raise PowerControllerError( + "DEFIB_VECTIS_HOST env var required for Vectis power control" + ) + return cls( + host=host, + port=int(os.environ.get("DEFIB_VECTIS_PORT", "35240")), + ) + + def attach_transport(self, transport: Transport) -> None: + """Use a live shared transport for the RTS/DTR toggle. + + Expected to be an :class:`Rfc2217Transport`; any transport + exposing ``set_dtr``/``set_rts`` will work. + """ + self._transport = transport + self._owns_transport = False + + async def power_off(self, port: str) -> None: + raise PowerControllerError( + "Vectis pulses power; off/on are not separately addressable. " + "Use power_cycle()." + ) + + async def power_on(self, port: str) -> None: + raise PowerControllerError( + "Vectis pulses power; off/on are not separately addressable. " + "Use power_cycle()." + ) + + async def power_cycle(self, port: str, off_duration: float = 0.0) -> None: + """Pulse RTS+DTR off → ``pulse_seconds`` → on. + + ``port`` and ``off_duration`` are ignored — Vectis controls a + single attached camera and the pulse width comes from + ``self._pulse_seconds`` (default 250 ms, comfortable margin + over the 200 ms inverted-pulse the bridge generates from a + single Ctrl+P in legacy mode). + """ + if self._transport is None: + await self._open_standalone() + try: + await self._do_pulse() + finally: + await self._close_standalone() + else: + await self._do_pulse() + + async def _do_pulse(self) -> None: + transport = self._transport + assert transport is not None + logger.info( + "Vectis power-cycle: SET-CONTROL DTR/RTS on %s:%d (off → %.0f ms → on)", + self._host, self._port, self._pulse_seconds * 1000, + ) + + set_dtr = getattr(transport, "set_dtr", None) + set_rts = getattr(transport, "set_rts", None) + if set_dtr is None or set_rts is None: + # Fallback for transports that don't expose modem-control — + # only meaningful against a pre-RFC-2217 Vectis daemon. + logger.warning( + "Transport lacks set_dtr/set_rts; falling back to Ctrl+P byte" + ) + await transport.write(b"\x10") + await asyncio.sleep(self._pulse_seconds) + return + + await set_dtr(False) + await set_rts(False) + await asyncio.sleep(self._pulse_seconds) + await set_rts(True) + await set_dtr(True) + + async def _open_standalone(self) -> None: + url = f"rfc2217://{self._host}:{self._port}" + self._transport = await Rfc2217Transport.create(url) + self._owns_transport = True + + async def _close_standalone(self) -> None: + if self._transport is not None and self._owns_transport: + await self._transport.close() + self._transport = None + self._owns_transport = False + + async def close(self) -> None: + # Only close transports we opened ourselves. In shared-transport + # mode the CLI owns the transport. + await self._close_standalone() diff --git a/src/defib/protocol/hisilicon_standard.py b/src/defib/protocol/hisilicon_standard.py index d94ba05..6bd7327 100644 --- a/src/defib/protocol/hisilicon_standard.py +++ b/src/defib/protocol/hisilicon_standard.py @@ -128,10 +128,15 @@ async def handshake( ever_saw_data = False counter = 0 total_markers = 0 + # Write a chunk per iteration in flooding mode — at 115200 baud the + # UART can only clock ~11.5 KB/s, so a 64-byte burst keeps roughly + # 5 ms of 0xAA on the wire continuously and saturates the bootrom's + # ~100 ms catch window even with TCP/RFC 2217 round-trip latency. + BURST = b"\xaa" * 64 while True: if flooding: - await transport.write(BOOTMODE_ACK) + await transport.write(BURST) try: byte = await transport.read(1, timeout=0.05 if flooding else 1.0) diff --git a/src/defib/recovery/session.py b/src/defib/recovery/session.py index 3fa0602..3847c84 100644 --- a/src/defib/recovery/session.py +++ b/src/defib/recovery/session.py @@ -104,8 +104,10 @@ async def run( # Retry the transient phase (power-cycle + handshake + DDR init) up # to ``max_handshake_attempts`` times. Only meaningful when we have # programmatic power control — manual power cycling would require - # human re-intervention. - can_retry = bool(self._power and self._poe_port) + # human re-intervention. Some controllers (e.g. Vectis) don't have + # a notion of named ports, so we treat any non-None ``poe_port`` + # (including the empty string) as "automated power available". + can_retry = self._power is not None and self._poe_port is not None attempts = max_handshake_attempts if can_retry else 1 firmware = self._load_firmware() @@ -123,16 +125,17 @@ async def run( )) # Power cycle - if self._power and self._poe_port: + if self._power is not None and self._poe_port is not None: + label = self._poe_port or self._power.name() if on_log: on_log(LogEvent( level="info", - message=f"Power-cycling device on {self._poe_port}...", + message=f"Power-cycling device on {label}...", )) if on_progress: on_progress(ProgressEvent( stage=Stage.POWER_CYCLE, bytes_sent=0, bytes_total=1, - message=f"Power-cycling {self._poe_port}...", + message=f"Power-cycling {label}...", )) try: @@ -147,20 +150,22 @@ async def run( elapsed_ms=elapsed, ) - # Drain serial until line stays quiet for 500ms. Replaces a - # fixed 2-second sleep + flush_input — that approach can miss - # late-arriving stale bytes (the camera may still be powering - # down when the flush runs) and isn't robust against pyserial - # buffer caveats. Quiet-detection is deterministic: a - # powered-off chip cannot transmit. - discarded = await transport.drain_until_silent( - quiet_period=0.5, max_wait=5.0, - ) - if discarded and on_log: - on_log(LogEvent( - level="info", - message=f"Drained {discarded} stale bytes from serial", - )) + # Drain stale bytes — but skip on frame-blast chips because + # the bootrom emits the 0x20 markers we need to catch + # within ~tens of ms of reset. The marker-based handshake + # below filters non-marker bytes itself, so a separate + # quiet-line drain just eats the catch window. + if frame_blast: + await transport.flush_input() + else: + discarded = await transport.drain_until_silent( + quiet_period=0.5, max_wait=5.0, + ) + if discarded and on_log: + on_log(LogEvent( + level="info", + message=f"Drained {discarded} stale bytes from serial", + )) if on_progress: on_progress(ProgressEvent( @@ -168,15 +173,35 @@ async def run( message="Power cycle complete", )) - # Handshake — skip for frame-blast chips (handled inside send_firmware) - if frame_blast: + # Handshake. Frame-blast chips (e.g. hi3516cv300) historically + # skipped this and relied on send_firmware's HEAD-blast, but + # that approach catches U-Boot's character echo on healthy + # boards: the camera autoboots, U-Boot echoes our 0xAA back, + # and the protocol mistakes the echo for a bootrom ACK. The + # marker-based handshake (with continuous_ack flooding 0xAA) + # is strictly more robust because it requires actually seeing + # the bootrom's 0x20 markers as proof of download mode. + if frame_blast and self._power is not None and self._poe_port is not None: + if on_log: + on_log(LogEvent( + level="info", + message=f"Listening for 0x20 markers + flooding 0xAA for {self.chip}", + )) + import asyncio as _asyncio + handshake_task = _asyncio.create_task( + protocol.handshake(transport, on_progress) + ) + handshake = await handshake_task + elif frame_blast: + # No power control — fall back to deferred frame-blast + # inside send_firmware (manual reset path). if on_log: on_log(LogEvent( level="info", message=f"Using sendFrameForStart handshake for {self.chip}", )) handshake = HandshakeResult(success=True, message="Frame-blast (deferred)") - elif self._power and self._poe_port: + elif self._power is not None and self._poe_port is not None: # Power-cycle mode with 0x20→0xAA handshake: flood 0xAA if on_log: on_log(LogEvent( diff --git a/src/defib/transport/rfc2217.py b/src/defib/transport/rfc2217.py new file mode 100644 index 0000000..cfed19c --- /dev/null +++ b/src/defib/transport/rfc2217.py @@ -0,0 +1,197 @@ +"""RFC 2217 (Telnet COM Port Control) transport. + +Wraps pyserial's ``serial.serial_for_url("rfc2217://host:port")`` +backend so the rest of defib can talk to a remote UART bridge — +notably the OpenIPC Vectis daemon — exactly the way it talks to a +local serial port. + +Compared with ``SocketTransport``: + +- The data path is binary safe (``0x10`` and ``0xFF`` round-trip + through the bridge unchanged); pyserial handles RFC 854 escaping + and unescaping under the hood. +- Modem-control lines (``set_dtr``, ``set_rts``) and the UART baud + rate (``set_baudrate``) are exposed as out-of-band RFC 2217 + sub-options instead of in-band magic bytes. + +This is the transport ``VectisController`` uses to reset the +attached camera by pulsing RTS/DTR. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +import serial + +from defib.transport.base import Transport, TransportError, TransportTimeout + +logger = logging.getLogger(__name__) + + +class Rfc2217Transport(Transport): + """Transport over a pyserial RFC 2217 client. + + ``Serial`` instances returned by ``serial.serial_for_url`` are + blocking; we route every IO call through the default thread pool + via ``run_in_executor``. + """ + + def __init__(self, port: Any) -> None: + # Typed as Any so mypy doesn't insist on serial.Serial — the + # rfc2217 backend is serial.rfc2217.Serial, a sibling subclass + # of SerialBase with the same public API. + self._port = port + # pyserial's RFC 2217 backend may return chunks larger than the + # requested size (its read() loop appends a whole queue chunk + # then exits when ``len(data) >= size``). Stash overflow here + # so callers always get exactly ``size`` bytes (or fewer on + # timeout), matching the Transport ABC contract. + self._buf = bytearray() + + # pyserial 3.5's rfc2217.Serial.timeout setter invokes + # ``_reconfigure_port()`` which re-sends every port parameter + # (baud/datasize/parity/stopsize/control/flow-control) as a + # sub-negotiation, each costing a network round trip. Mutating + # ``timeout`` per-read makes a tight handshake loop ~400 ms slower + # per iteration than the underlying pyserial read. We therefore + # set a *small fixed* pyserial timeout once at open and enforce + # the caller's per-read timeout with our own loop in Python. + _PYSERIAL_READ_QUANTUM = 0.01 # 10 ms + + @classmethod + async def create( + cls, + url: str, + baudrate: int = 115200, + ) -> Rfc2217Transport: + """Open an RFC 2217 connection and return a transport. + + ``url`` must be a fully-formed pyserial URL, e.g. + ``"rfc2217://172.17.32.17:35241"``. ``baudrate`` is sent as + a ``SET-BAUDRATE`` sub-option during open. + """ + import socket as _socket + loop = asyncio.get_event_loop() + + def _open() -> Any: + port = serial.serial_for_url( + url, + baudrate=baudrate, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=cls._PYSERIAL_READ_QUANTUM, + ) + # Disable Nagle. pyserial's RFC 2217 backend leaves the + # default on, which on a high-RTT link (~40 ms) buffers + # small writes for one RTT before sending. That alone + # closes the HiSilicon bootrom's ~100 ms 0x20-marker / + # 0xAA-ack catch window — our 0xAA flood lands AFTER the + # camera has already moved on to SPI boot. + try: + sock = port._socket + if sock is not None: + sock.setsockopt(_socket.IPPROTO_TCP, _socket.TCP_NODELAY, 1) + except (AttributeError, OSError) as e: + logger.debug("Could not enable TCP_NODELAY: %s", e) + return port + + try: + port = await loop.run_in_executor(None, _open) + except (serial.SerialException, OSError) as e: + raise TransportError(f"Failed to open {url}: {e}") from e + + logger.info("Connected to RFC 2217 server: %s @ %d baud", url, baudrate) + return cls(port) + + async def read(self, size: int, timeout: float | None = None) -> bytes: + loop = asyncio.get_event_loop() + deadline = None if timeout is None else loop.time() + timeout + buf = bytearray() + + # Service the local overflow buffer first. + if self._buf: + take = min(size, len(self._buf)) + buf.extend(self._buf[:take]) + del self._buf[:take] + if len(buf) >= size: + return bytes(buf) + + while True: + remaining = size - len(buf) + if remaining <= 0: + return bytes(buf) + chunk = await loop.run_in_executor( + None, self._port.read, remaining + ) + if chunk: + if len(chunk) > remaining: + # pyserial returned a whole queue chunk that + # exceeds the requested size — keep the overflow + # for the next read so callers always get exactly + # what they asked for. + buf.extend(chunk[:remaining]) + self._buf.extend(chunk[remaining:]) + else: + buf.extend(chunk) + if len(buf) >= size: + return bytes(buf) + if deadline is None: + continue # caller wants to wait forever + if loop.time() >= deadline: + if buf: + return bytes(buf) + raise TransportTimeout(f"Read timeout ({timeout}s)") + + async def unread(self, data: bytes) -> None: + """Push data back to the front of the read buffer.""" + new = bytearray(data) + new.extend(self._buf) + self._buf = new + + async def bytes_waiting(self) -> int: + return len(self._buf) + int(self._port.in_waiting) + + async def write(self, data: bytes) -> None: + await asyncio.get_event_loop().run_in_executor( + None, self._port.write, data + ) + + async def flush_input(self) -> None: + self._buf.clear() + await asyncio.get_event_loop().run_in_executor( + None, self._port.reset_input_buffer + ) + + async def flush_output(self) -> None: + await asyncio.get_event_loop().run_in_executor( + None, self._port.reset_output_buffer + ) + + async def close(self) -> None: + if self._port is not None and self._port.is_open: + await asyncio.get_event_loop().run_in_executor( + None, self._port.close + ) + + # ------------------------------------------------------------------ + # RFC 2217 modem-control extensions (used by VectisController) + # ------------------------------------------------------------------ + + async def set_dtr(self, active: bool) -> None: + """Set DTR. Sends RFC 2217 SET-CONTROL 8 (on) or 9 (off).""" + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: setattr(self._port, "dtr", active)) + + async def set_rts(self, active: bool) -> None: + """Set RTS. Sends RFC 2217 SET-CONTROL 11 (on) or 12 (off).""" + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: setattr(self._port, "rts", active)) + + async def set_baudrate(self, baud: int) -> None: + """Set the remote UART baud rate via SET-BAUDRATE.""" + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, lambda: setattr(self._port, "baudrate", baud)) diff --git a/src/defib/transport/serial_platform.py b/src/defib/transport/serial_platform.py index 22f129a..6af0312 100644 --- a/src/defib/transport/serial_platform.py +++ b/src/defib/transport/serial_platform.py @@ -117,10 +117,14 @@ async def create_transport( """Create a platform-appropriate transport. Args: - device: Serial port device path (e.g., /dev/ttyUSB0, COM3) or + device: Serial port device path (e.g., /dev/ttyUSB0, COM3), Unix socket path with ``socket://`` prefix (e.g., - ``socket:///tmp/qemu.sock``). - baudrate: Baud rate (default 115200, ignored for sockets). + ``socket:///tmp/qemu.sock``), raw TCP endpoint with + ``tcp://`` prefix, or RFC 2217 endpoint with + ``rfc2217://`` prefix (recommended for OpenIPC Vectis + ≥1.2.0 — binary safe + out-of-band RTS/DTR control). + baudrate: Baud rate (default 115200, ignored for non-RFC-2217 + sockets — for ``rfc2217://`` it is sent during open). force_platform: Override platform detection ("linux", "darwin", "win32"). Returns: @@ -133,6 +137,33 @@ async def create_transport( logger.info("Using SocketTransport: %s", path) return await SocketTransport.create(path) + # TCP socket transport (raw, no escaping — for non-RFC-2217 bridges + # or for compatibility with old Vectis builds without RFC 2217). + if device.startswith("tcp://"): + from defib.transport.socket import SocketTransport + endpoint = device[len("tcp://"):] + host, _, port_str = endpoint.rpartition(":") + if not host or not port_str: + raise TransportError( + f"tcp:// transport needs host:port (got '{device}')" + ) + try: + port_num = int(port_str) + except ValueError as e: + raise TransportError( + f"tcp:// port is not a number: '{port_str}'" + ) from e + logger.info("Using TCP SocketTransport: %s:%d", host, port_num) + return await SocketTransport.create_tcp(host, port_num) + + # RFC 2217 transport (binary-safe + modem-control sub-options). + # Pass the URL through pyserial's rfc2217 backend; baud rate is + # negotiated via SET-BAUDRATE during open. Used by VectisController. + if device.startswith("rfc2217://"): + from defib.transport.rfc2217 import Rfc2217Transport + logger.info("Using RFC 2217 transport: %s", device) + return await Rfc2217Transport.create(device, baudrate=baudrate) + platform = force_platform or sys.platform if platform == "darwin": diff --git a/src/defib/transport/socket.py b/src/defib/transport/socket.py index 2a86e6a..60344b6 100644 --- a/src/defib/transport/socket.py +++ b/src/defib/transport/socket.py @@ -1,10 +1,13 @@ -"""Async Unix socket transport for connecting to QEMU chardev sockets. +"""Async socket transport for Unix-domain (QEMU) and TCP (Vectis) sockets. -Allows defib to connect to a QEMU instance using: - -chardev socket,id=ser0,path=/tmp/sock,server=on,wait=off -serial chardev:ser0 +Two URL schemes are supported via :func:`defib.transport.serial_platform. +create_transport`: -Usage: - defib burn -c hi3516ev300 -p socket:///tmp/sock +- ``socket:///tmp/sock`` — Unix-domain socket (QEMU chardev sockets). +- ``tcp://host:port`` — TCP/IP socket (e.g. OpenIPC Vectis UART bridge). + +Both share the same non-blocking read/write implementation; only the +``connect()`` step differs. """ from __future__ import annotations @@ -19,7 +22,7 @@ class SocketTransport(Transport): - """Transport over a Unix domain socket (SOCK_STREAM).""" + """Transport over a stream socket (AF_UNIX or AF_INET).""" def __init__(self, conn: sock_mod.socket) -> None: self._sock = conn @@ -37,7 +40,26 @@ async def create(cls, path: str) -> SocketTransport: except OSError as e: raise TransportError(f"Failed to connect to socket {path}: {e}") from e - logger.info("Connected to QEMU socket: %s", path) + logger.info("Connected to Unix socket: %s", path) + return cls(s) + + @classmethod + async def create_tcp(cls, host: str, port: int) -> SocketTransport: + """Connect to a TCP socket at host:port.""" + try: + s = sock_mod.socket(sock_mod.AF_INET, sock_mod.SOCK_STREAM) + s.setblocking(False) + # TCP_NODELAY: small UART-style writes (e.g. single Ctrl+P + # byte for Vectis) must not be delayed by Nagle's algorithm. + s.setsockopt(sock_mod.IPPROTO_TCP, sock_mod.TCP_NODELAY, 1) + loop = asyncio.get_event_loop() + await loop.sock_connect(s, (host, port)) + except OSError as e: + raise TransportError( + f"Failed to connect to TCP {host}:{port}: {e}" + ) from e + + logger.info("Connected to TCP socket: %s:%d", host, port) return cls(s) async def read(self, size: int, timeout: float | None = None) -> bytes: diff --git a/tests/test_handshake_resilience.py b/tests/test_handshake_resilience.py index 3786628..2196b39 100644 --- a/tests/test_handshake_resilience.py +++ b/tests/test_handshake_resilience.py @@ -257,6 +257,15 @@ async def drain_until_silent( return 0 +async def _fake_handshake_ok(self, transport, on_progress=None): # noqa: ARG001 + """Stand-in for ``HiSiliconStandard.handshake`` used by tests that + exercise session orchestration (retries, attempt counting) rather + than the marker-detection protocol itself. Returns success + immediately so the session moves on to ``send_firmware``.""" + from defib.recovery.events import HandshakeResult + return HandshakeResult(success=True, message="ok") + + @pytest.mark.asyncio async def test_session_retries_handshake_on_transient_failure(monkeypatch): """When DDR-init fails on the first attempt but succeeds on the second, @@ -284,6 +293,10 @@ async def scripted_send_firmware(self, transport, firmware, on_progress=None, monkeypatch.setattr( HiSiliconStandard, "send_firmware", scripted_send_firmware, ) + # Frame-blast chips with power control go through ``handshake()`` + # since the marker-handshake refactor; stub it out so the test + # focuses on the retry-loop semantics rather than the protocol. + monkeypatch.setattr(HiSiliconStandard, "handshake", _fake_handshake_ok) session = RecoverySession( chip="hi3516av200", @@ -329,6 +342,7 @@ async def post_ddr_failure(self, transport, firmware, on_progress=None, monkeypatch.setattr( HiSiliconStandard, "send_firmware", post_ddr_failure, ) + monkeypatch.setattr(HiSiliconStandard, "handshake", _fake_handshake_ok) session = RecoverySession( chip="hi3516av200", @@ -401,6 +415,7 @@ async def always_fail(self, transport, firmware, on_progress=None, ) monkeypatch.setattr(HiSiliconStandard, "send_firmware", always_fail) + monkeypatch.setattr(HiSiliconStandard, "handshake", _fake_handshake_ok) session = RecoverySession( chip="hi3516av200", diff --git a/tests/test_power_vectis.py b/tests/test_power_vectis.py new file mode 100644 index 0000000..c86d757 --- /dev/null +++ b/tests/test_power_vectis.py @@ -0,0 +1,149 @@ +"""Tests for the OpenIPC Vectis UART-bridge power controller.""" + +from __future__ import annotations + +import asyncio + +import pytest + +from defib.power.base import PowerControllerError +from defib.power.vectis import VectisController +from defib.transport.mock import MockTransport + + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + +class FakeRfc2217Transport(MockTransport): + """MockTransport plus the RFC 2217 modem-control surface that + VectisController exercises. Tracks the order of operations so + tests can assert the off → sleep → on sequence.""" + + def __init__(self) -> None: + super().__init__() + self.actions: list[tuple[str, bool]] = [] + + async def set_dtr(self, active: bool) -> None: + self.actions.append(("dtr", active)) + + async def set_rts(self, active: bool) -> None: + self.actions.append(("rts", active)) + + async def set_baudrate(self, baud: int) -> None: + self.actions.append(("baud", baud)) # type: ignore[arg-type] + + +# --------------------------------------------------------------------------- +# from_env +# --------------------------------------------------------------------------- + +class TestVectisFromEnv: + def test_missing_host(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("DEFIB_VECTIS_HOST", raising=False) + with pytest.raises(PowerControllerError, match="DEFIB_VECTIS_HOST"): + VectisController.from_env() + + def test_defaults(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DEFIB_VECTIS_HOST", "172.17.32.17") + monkeypatch.delenv("DEFIB_VECTIS_PORT", raising=False) + ctrl = VectisController.from_env() + assert ctrl._host == "172.17.32.17" + assert ctrl._port == 35240 # upstream default + + def test_custom_port(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("DEFIB_VECTIS_HOST", "10.0.0.1") + monkeypatch.setenv("DEFIB_VECTIS_PORT", "35241") + ctrl = VectisController.from_env() + assert ctrl._host == "10.0.0.1" + assert ctrl._port == 35241 + + +# --------------------------------------------------------------------------- +# Off / On are not separately addressable +# --------------------------------------------------------------------------- + +class TestOffOnRaise: + async def test_power_off_raises(self) -> None: + ctrl = VectisController(host="x", port=1) + with pytest.raises(PowerControllerError, match="off/on are not"): + await ctrl.power_off("anything") + + async def test_power_on_raises(self) -> None: + ctrl = VectisController(host="x", port=1) + with pytest.raises(PowerControllerError, match="off/on are not"): + await ctrl.power_on("anything") + + +# --------------------------------------------------------------------------- +# Shared-transport mode (the path the CLI takes for live recovery) +# --------------------------------------------------------------------------- + +class TestSharedTransport: + async def test_power_cycle_toggles_dtr_rts(self) -> None: + """Off DTR, off RTS, sleep, on RTS, on DTR — exactly four + SET-CONTROL operations through the transport, no in-band + bytes written.""" + transport = FakeRfc2217Transport() + ctrl = VectisController(host="x", port=1, pulse_seconds=0.0) + ctrl.attach_transport(transport) + + await ctrl.power_cycle("ignored") + + assert transport.actions == [ + ("dtr", False), + ("rts", False), + ("rts", True), + ("dtr", True), + ] + # Crucially: nothing was written in-band. Old controller wrote + # b"\x10" through the data path, which would have been escaped + # or filtered by RFC 2217. + assert transport.tx_log == [] + + async def test_close_does_not_close_shared_transport(self) -> None: + transport = FakeRfc2217Transport() + ctrl = VectisController(host="x", port=1, pulse_seconds=0.0) + ctrl.attach_transport(transport) + + await ctrl.close() + + assert transport._closed is False + + async def test_pulse_actually_waits(self) -> None: + transport = FakeRfc2217Transport() + ctrl = VectisController(host="x", port=1, pulse_seconds=0.05) + ctrl.attach_transport(transport) + + loop = asyncio.get_event_loop() + t0 = loop.time() + await ctrl.power_cycle("") + elapsed = loop.time() - t0 + + assert elapsed >= 0.04 + assert elapsed < 1.0 + + async def test_legacy_fallback_when_transport_lacks_modem_control(self) -> None: + """If the transport doesn't expose set_dtr/set_rts (e.g. raw + SocketTransport against a pre-RFC-2217 Vectis), fall back to + writing a single Ctrl+P byte.""" + transport = MockTransport() # no set_dtr / set_rts + ctrl = VectisController(host="x", port=1, pulse_seconds=0.0) + ctrl.attach_transport(transport) + + await ctrl.power_cycle("ignored") + + assert transport.tx_log == [b"\x10"] + + +# --------------------------------------------------------------------------- +# Standalone mode (the controller opens its own RFC 2217 connection) +# --------------------------------------------------------------------------- + +class TestStandalone: + async def test_close_is_idempotent(self) -> None: + ctrl = VectisController(host="x", port=1, pulse_seconds=0.0) + # No transport attached, no connection ever opened — close + # should be a safe no-op. + await ctrl.close() + await ctrl.close() diff --git a/tests/test_transport_rfc2217.py b/tests/test_transport_rfc2217.py new file mode 100644 index 0000000..2339a01 --- /dev/null +++ b/tests/test_transport_rfc2217.py @@ -0,0 +1,604 @@ +"""Tests for the RFC 2217 (Telnet COM Port Control) transport. + +Covers: + +- ``rfc2217://`` URL-scheme dispatch through ``create_transport``. +- Wrapper-level behaviour with a mocked pyserial port (overflow buffer, + timeout enforcement, modem-control delegation, flush, close). +- End-to-end protocol exchange against a small in-process RFC 2217 + server fixture (negotiation, ``SET-CONTROL`` / ``SET-BAUDRATE`` + sub-options, IAC-escaped binary data round-trip). +""" + +from __future__ import annotations + +import asyncio +import socket +from typing import Any +from unittest.mock import MagicMock + +import pytest +import serial + +from defib.transport.base import TransportTimeout +from defib.transport.rfc2217 import Rfc2217Transport + + +# =========================================================================== +# Fake RFC 2217 server fixture +# =========================================================================== + +# Telnet +_IAC, _DO, _DONT, _WILL, _WONT = 0xff, 0xfd, 0xfe, 0xfb, 0xfc +_SB, _SE = 0xfa, 0xf0 +_OPT_BINARY, _OPT_SGA, _OPT_COMPORT = 0, 3, 44 + +# COM-PORT-OPTION sub-options (client side) +_SIGNATURE = 0 +_SET_BAUDRATE = 1 +_SET_DATASIZE = 2 +_SET_PARITY = 3 +_SET_STOPSIZE = 4 +_SET_CONTROL = 5 +_PURGE_DATA = 12 +_SERVER_OFFSET = 100 # server replies use sub-option + 100 + + +class FakeRfc2217Server: + """Minimal RFC 2217 server: just enough for pyserial's ``open()`` + to succeed and for tests to verify which sub-options the client + issues. Mirrors the upstream Vectis server behaviour for the + sub-options the defib transport exercises. + """ + + def __init__(self) -> None: + self.set_control_history: list[int] = [] + self.set_baudrate_history: list[int] = [] + self.set_datasize_history: list[int] = [] + self.set_parity_history: list[int] = [] + self.set_stopsize_history: list[int] = [] + self.purge_history: list[int] = [] + self.received_data = bytearray() + self._writer: asyncio.StreamWriter | None = None + self._server: asyncio.Server | None = None + self.port = 0 + + async def start(self) -> int: + self._server = await asyncio.start_server( + self._handle_client, host="127.0.0.1", port=0, + ) + self.port = self._server.sockets[0].getsockname()[1] + return self.port + + async def stop(self) -> None: + if self._server is not None: + self._server.close() + await self._server.wait_closed() + + async def send_uart_bytes(self, data: bytes) -> None: + """Inject bytes from the simulated UART side toward the client. + IAC bytes are escaped per RFC 854.""" + if self._writer is None: + return + self._writer.write(data.replace(b"\xff", b"\xff\xff")) + await self._writer.drain() + + async def _handle_client( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ) -> None: + self._writer = writer + try: + # Proactively negotiate BINARY + COM-PORT-OPTION (and SGA). + writer.write(bytes([ + _IAC, _WILL, _OPT_BINARY, + _IAC, _DO, _OPT_BINARY, + _IAC, _WILL, _OPT_SGA, + _IAC, _DO, _OPT_SGA, + _IAC, _WILL, _OPT_COMPORT, + _IAC, _DO, _OPT_COMPORT, + ])) + await writer.drain() + await self._consume(reader, writer) + except (ConnectionError, asyncio.CancelledError): + pass + finally: + self._writer = None + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + + async def _consume( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ) -> None: + state = "data" + sb_opt = 0 + sb_buf = bytearray() + + while True: + chunk = await reader.read(4096) + if not chunk: + return + for b in chunk: + if state == "data": + if b == _IAC: + state = "iac" + else: + self.received_data.append(b) + elif state == "iac": + if b == _IAC: + self.received_data.append(b) + state = "data" + elif b in (_WILL, _WONT, _DO, _DONT): + state = "neg" # ack-by-ignoring is fine for tests + elif b == _SB: + state = "sb_opt" + else: + state = "data" + elif state == "neg": + state = "data" + elif state == "sb_opt": + sb_opt = b + sb_buf = bytearray() + state = "sb_data" + elif state == "sb_data": + if b == _IAC: + state = "sb_iac" + else: + sb_buf.append(b) + elif state == "sb_iac": + if b == _SE: + self._dispatch(sb_opt, bytes(sb_buf), writer) + state = "data" + elif b == _IAC: + sb_buf.append(_IAC) + state = "sb_data" + else: + state = "data" + await writer.drain() + + def _dispatch( + self, + opt: int, + data: bytes, + writer: asyncio.StreamWriter, + ) -> None: + if opt != _OPT_COMPORT or not data: + return + sub = data[0] + if sub == _SET_CONTROL and len(data) >= 2: + value = data[1] + self.set_control_history.append(value) + self._reply_byte(writer, _SET_CONTROL, value) + elif sub == _SET_BAUDRATE and len(data) >= 5: + baud = int.from_bytes(data[1:5], "big") + self.set_baudrate_history.append(baud) + writer.write(bytes([ + _IAC, _SB, _OPT_COMPORT, _SET_BAUDRATE + _SERVER_OFFSET, + *baud.to_bytes(4, "big"), + _IAC, _SE, + ])) + elif sub == _SET_DATASIZE and len(data) >= 2: + self.set_datasize_history.append(data[1]) + self._reply_byte(writer, _SET_DATASIZE, 8) # we always serve 8N1 + elif sub == _SET_PARITY and len(data) >= 2: + self.set_parity_history.append(data[1]) + self._reply_byte(writer, _SET_PARITY, 1) # NONE + elif sub == _SET_STOPSIZE and len(data) >= 2: + self.set_stopsize_history.append(data[1]) + self._reply_byte(writer, _SET_STOPSIZE, 1) + elif sub == _PURGE_DATA and len(data) >= 2: + self.purge_history.append(data[1]) + self._reply_byte(writer, _PURGE_DATA, data[1]) + elif sub == _SIGNATURE: + # Empty signature + writer.write(bytes([ + _IAC, _SB, _OPT_COMPORT, _SIGNATURE + _SERVER_OFFSET, + _IAC, _SE, + ])) + + @staticmethod + def _reply_byte( + writer: asyncio.StreamWriter, sub: int, value: int, + ) -> None: + writer.write(bytes([ + _IAC, _SB, _OPT_COMPORT, sub + _SERVER_OFFSET, value, + _IAC, _SE, + ])) + + +@pytest.fixture +async def fake_rfc2217() -> Any: + """Yield a FakeRfc2217Server bound to an ephemeral port.""" + srv = FakeRfc2217Server() + await srv.start() + try: + yield srv + finally: + await srv.stop() + + +# =========================================================================== +# URL-scheme dispatch +# =========================================================================== + +class TestUrlScheme: + async def test_rfc2217_dispatched_via_create_transport( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + from defib.transport import serial_platform + + captured: dict[str, Any] = {} + + async def fake_create(url: str, baudrate: int = 115200) -> Any: + captured["url"] = url + captured["baudrate"] = baudrate + return MagicMock() + + monkeypatch.setattr(Rfc2217Transport, "create", fake_create) + await serial_platform.create_transport( + "rfc2217://192.0.2.1:35240", baudrate=115200, + ) + assert captured["url"] == "rfc2217://192.0.2.1:35240" + assert captured["baudrate"] == 115200 + + async def test_baudrate_threaded_through( + self, monkeypatch: pytest.MonkeyPatch, + ) -> None: + from defib.transport import serial_platform + + captured: dict[str, Any] = {} + + async def fake_create(url: str, baudrate: int = 115200) -> Any: + captured["baudrate"] = baudrate + return MagicMock() + + monkeypatch.setattr(Rfc2217Transport, "create", fake_create) + await serial_platform.create_transport( + "rfc2217://localhost:35200", baudrate=921600, + ) + assert captured["baudrate"] == 921600 + + +# =========================================================================== +# Wrapper-level behaviour (mocked pyserial port) +# =========================================================================== + +class TestOverflowBuffer: + """pyserial's RFC 2217 ``read(size)`` returns a whole queue chunk + when ``len(chunk) >= size``, possibly more bytes than asked. Our + transport must cap the return at ``size`` and keep the overflow.""" + + async def test_overflow_stashed_for_next_read(self) -> None: + port = MagicMock() + port.read.side_effect = [b"hello", b"world"] # 5-byte chunks + port.is_open = True + + t = Rfc2217Transport(port) + + first = await t.read(1) + assert first == b"h" + + # Local buffer now has b"ello"; second read should drain it + # without calling pyserial again. + second = await t.read(4) + assert second == b"ello" + assert port.read.call_count == 1 + + # Buffer empty; next read pulls a fresh chunk from pyserial. + third = await t.read(5) + assert third == b"world" + + async def test_exact_size_no_overflow(self) -> None: + port = MagicMock() + port.read.return_value = b"abc" + port.is_open = True + t = Rfc2217Transport(port) + result = await t.read(3) + assert result == b"abc" + assert t._buf == bytearray() + + async def test_unread_prepends_to_buffer(self) -> None: + port = MagicMock() + port.is_open = True + # Stash some bytes via read overflow first. + calls = {"n": 0} + def fake_read(_size: int) -> bytes: + calls["n"] += 1 + return b"XYZab" if calls["n"] == 1 else b"" + port.read.side_effect = fake_read + + t = Rfc2217Transport(port) + first = await t.read(3) + assert first == b"XYZ" + # _buf has b"ab". Now unread b"!!". + await t.unread(b"!!") + # Next read should see "!!" first, then "ab". + result = await t.read(4) + assert result == b"!!ab" + + +class TestTimeout: + async def test_pyserial_timeout_not_mutated_per_read(self) -> None: + """Critical: setting pyserial-rfc2217's .timeout triggers a full + port re-renegotiation (~400 ms / call on a real link). Our + wrapper must NOT touch it after open.""" + port = MagicMock() + port.read.return_value = b"" + port.is_open = True + port.timeout = 0.01 # whatever was set at open + + t = Rfc2217Transport(port) + with pytest.raises(TransportTimeout): + await t.read(10, timeout=0.05) + + assert port.timeout == 0.01 # untouched + + async def test_returns_partial_on_timeout(self) -> None: + """If timeout expires after some bytes were collected, return + the partial result rather than raising.""" + port = MagicMock() + # First call returns 2 bytes, subsequent calls return empty + # (pyserial's quantum timeout). Use a side_effect function so + # we don't run out of pre-canned responses. + calls = {"n": 0} + def fake_read(_size: int) -> bytes: + calls["n"] += 1 + return b"hi" if calls["n"] == 1 else b"" + port.read.side_effect = fake_read + port.is_open = True + + t = Rfc2217Transport(port) + result = await t.read(10, timeout=0.1) + assert result == b"hi" + + async def test_raises_on_timeout_with_no_bytes(self) -> None: + port = MagicMock() + port.read.return_value = b"" + port.is_open = True + t = Rfc2217Transport(port) + with pytest.raises(TransportTimeout): + await t.read(5, timeout=0.05) + + +class TestModemControlDelegation: + """``set_dtr`` / ``set_rts`` / ``set_baudrate`` must update pyserial + properties — the actual RFC 2217 sub-option emission is pyserial's + job (and is exercised in the TestE2E section below).""" + + async def test_set_dtr_updates_pyserial_attr(self) -> None: + port = MagicMock() + port.dtr = True + t = Rfc2217Transport(port) + await t.set_dtr(False) + assert port.dtr is False + await t.set_dtr(True) + assert port.dtr is True + + async def test_set_rts_updates_pyserial_attr(self) -> None: + port = MagicMock() + port.rts = True + t = Rfc2217Transport(port) + await t.set_rts(False) + assert port.rts is False + await t.set_rts(True) + assert port.rts is True + + async def test_set_baudrate_updates_pyserial_attr(self) -> None: + port = MagicMock() + port.baudrate = 9600 + t = Rfc2217Transport(port) + await t.set_baudrate(115200) + assert port.baudrate == 115200 + + +class TestWriteFlushClose: + async def test_write_delegates(self) -> None: + port = MagicMock() + t = Rfc2217Transport(port) + await t.write(b"\xfe\x10\x00\xff") + port.write.assert_called_once_with(b"\xfe\x10\x00\xff") + + async def test_flush_input_clears_local_and_pyserial(self) -> None: + port = MagicMock() + t = Rfc2217Transport(port) + t._buf = bytearray(b"stale") + await t.flush_input() + assert t._buf == bytearray() + port.reset_input_buffer.assert_called_once() + + async def test_bytes_waiting_includes_local_buffer(self) -> None: + port = MagicMock() + port.in_waiting = 7 + t = Rfc2217Transport(port) + t._buf = bytearray(b"abc") + assert await t.bytes_waiting() == 10 # 3 local + 7 in pyserial + + async def test_close_when_open(self) -> None: + port = MagicMock() + port.is_open = True + t = Rfc2217Transport(port) + await t.close() + port.close.assert_called_once() + + async def test_close_when_already_closed(self) -> None: + port = MagicMock() + port.is_open = False + t = Rfc2217Transport(port) + await t.close() + port.close.assert_not_called() + + +# =========================================================================== +# End-to-end protocol exchange against the fake server +# =========================================================================== + +class TestE2EAgainstFakeServer: + """Open a real pyserial RFC 2217 client against the fake server, + issue calls through our wrapper, and verify the wire-level + protocol behaviour.""" + + async def test_open_and_close(self, fake_rfc2217: FakeRfc2217Server) -> None: + t = await Rfc2217Transport.create( + f"rfc2217://127.0.0.1:{fake_rfc2217.port}", baudrate=115200, + ) + await t.close() + + async def test_negotiation_sends_default_port_settings( + self, fake_rfc2217: FakeRfc2217Server, + ) -> None: + """pyserial sends SET-BAUDRATE + SET-DATASIZE + SET-PARITY + + SET-STOPSIZE during open; verify the server saw them.""" + t = await Rfc2217Transport.create( + f"rfc2217://127.0.0.1:{fake_rfc2217.port}", baudrate=115200, + ) + try: + assert 115200 in fake_rfc2217.set_baudrate_history + assert 8 in fake_rfc2217.set_datasize_history + assert 1 in fake_rfc2217.set_parity_history + assert 1 in fake_rfc2217.set_stopsize_history + finally: + await t.close() + + async def test_set_dtr_off_sends_value_9( + self, fake_rfc2217: FakeRfc2217Server, + ) -> None: + t = await Rfc2217Transport.create( + f"rfc2217://127.0.0.1:{fake_rfc2217.port}", baudrate=115200, + ) + try: + fake_rfc2217.set_control_history.clear() + await t.set_dtr(False) + await asyncio.sleep(0.05) + assert 9 in fake_rfc2217.set_control_history + finally: + await t.close() + + async def test_set_dtr_on_sends_value_8( + self, fake_rfc2217: FakeRfc2217Server, + ) -> None: + t = await Rfc2217Transport.create( + f"rfc2217://127.0.0.1:{fake_rfc2217.port}", baudrate=115200, + ) + try: + fake_rfc2217.set_control_history.clear() + await t.set_dtr(True) + await asyncio.sleep(0.05) + assert 8 in fake_rfc2217.set_control_history + finally: + await t.close() + + async def test_set_rts_off_sends_value_12( + self, fake_rfc2217: FakeRfc2217Server, + ) -> None: + t = await Rfc2217Transport.create( + f"rfc2217://127.0.0.1:{fake_rfc2217.port}", baudrate=115200, + ) + try: + fake_rfc2217.set_control_history.clear() + await t.set_rts(False) + await asyncio.sleep(0.05) + assert 12 in fake_rfc2217.set_control_history + finally: + await t.close() + + async def test_set_rts_on_sends_value_11( + self, fake_rfc2217: FakeRfc2217Server, + ) -> None: + t = await Rfc2217Transport.create( + f"rfc2217://127.0.0.1:{fake_rfc2217.port}", baudrate=115200, + ) + try: + fake_rfc2217.set_control_history.clear() + await t.set_rts(True) + await asyncio.sleep(0.05) + assert 11 in fake_rfc2217.set_control_history + finally: + await t.close() + + async def test_set_baudrate_sub_option( + self, fake_rfc2217: FakeRfc2217Server, + ) -> None: + t = await Rfc2217Transport.create( + f"rfc2217://127.0.0.1:{fake_rfc2217.port}", baudrate=115200, + ) + try: + fake_rfc2217.set_baudrate_history.clear() + await t.set_baudrate(921600) + await asyncio.sleep(0.05) + assert 921600 in fake_rfc2217.set_baudrate_history + finally: + await t.close() + + async def test_data_round_trip_with_iac_escape( + self, fake_rfc2217: FakeRfc2217Server, + ) -> None: + """Bytes containing ``0xFF`` must round-trip in both directions + with proper IAC IAC escaping handled by pyserial + our server.""" + t = await Rfc2217Transport.create( + f"rfc2217://127.0.0.1:{fake_rfc2217.port}", baudrate=115200, + ) + try: + # Client → server: every byte 0x00..0xFF, including 0x10 + 0xFF. + payload = bytes(range(256)) + fake_rfc2217.received_data.clear() + await t.write(payload) + await asyncio.sleep(0.1) + assert bytes(fake_rfc2217.received_data) == payload + + # Server → client: same payload coming back through transport. + await fake_rfc2217.send_uart_bytes(payload) + received = b"" + deadline = asyncio.get_event_loop().time() + 1.0 + while len(received) < len(payload) and asyncio.get_event_loop().time() < deadline: + try: + chunk = await t.read(len(payload) - len(received), timeout=0.1) + received += chunk + except TransportTimeout: + pass + assert received == payload + finally: + await t.close() + + async def test_tcp_nodelay_set_on_socket( + self, fake_rfc2217: FakeRfc2217Server, + ) -> None: + """Tight handshake loops over a high-RTT link need TCP_NODELAY + — without it, Nagle batches our 0xAA flood out of the bootrom's + catch window.""" + t = await Rfc2217Transport.create( + f"rfc2217://127.0.0.1:{fake_rfc2217.port}", baudrate=115200, + ) + try: + sock = t._port._socket + assert sock is not None + # ``getsockopt(TCP_NODELAY)`` returns a non-zero value when + # the option is enabled. Linux reports ``1``, macOS may + # report the C ``int`` width (``4``); either is fine. + opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) + assert opt != 0, "TCP_NODELAY must be enabled on the RFC 2217 socket" + finally: + await t.close() + + +# =========================================================================== +# create() error handling +# =========================================================================== + +class TestCreateErrors: + async def test_unreachable_url_raises_transport_error(self) -> None: + # Pick a port that nothing should be listening on + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + unused = s.getsockname()[1] + # s is now closed; the port is briefly free, nothing listening. + from defib.transport.base import TransportError + with pytest.raises((TransportError, serial.SerialException)): + await Rfc2217Transport.create( + f"rfc2217://127.0.0.1:{unused}", baudrate=115200, + )