Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,16 @@ Handles both `tftpboot` and `tftp` U-Boot commands transparently.
- macOS serial workaround (ACK byte correction)
- Cross-platform: Linux, macOS, Windows

## Automated Power Cycling (PoE)
## Automated Power Cycling

Defib can automatically power-cycle devices via a MikroTik PoE switch,
eliminating manual intervention for recovery loops and research workflows.
Defib can automatically power-cycle devices, eliminating manual intervention
for recovery loops and research workflows. Two backends are supported,
selected via `DEFIB_POWER_TYPE`:

### MikroTik RouterOS PoE switch (default)

```bash
export DEFIB_POWER_TYPE=routeros # optional, the default
export DEFIB_POE_HOST=192.168.88.1
export DEFIB_POE_USER=admin
export DEFIB_POE_PASS=
Expand All @@ -134,6 +138,50 @@ defib burn -c hi3516ev300 -p /dev/uart-IVG85HG50PYA-S --power-cycle -b
defib burn -c hi3516ev300 -p /dev/uart-IVG85HG50PYA-S --power-cycle -t
```

### OpenIPC Vectis UART bridge

[Vectis](https://github.com/OpenIPC/vectis) is a USB/Ethernet UART bridge
that exposes the camera's UART over TCP and drives camera power via the
bridge's RTS/DTR lines. Vectis ≥ 1.2.0 speaks [RFC 2217][rfc2217] (Telnet
COM Port Control Option) on the listener: the data path is binary safe
and modem-control lines are commanded out-of-band as `SET-CONTROL`
sub-options instead of in-band magic bytes. Defib uses the same TCP
connection for the UART and the reset:

[rfc2217]: https://datatracker.ietf.org/doc/html/rfc2217

```bash
export DEFIB_POWER_TYPE=vectis
export DEFIB_VECTIS_HOST=172.17.32.17
export DEFIB_VECTIS_PORT=35240 # optional, the upstream default

defib burn -c hi3516cv300 -p rfc2217://172.17.32.17:35240 --power-cycle -b
```

The `rfc2217://` URL scheme routes the UART through pyserial's RFC 2217
client (which handles RFC 854 escaping for `0xFF` and the `SET-CONTROL`
DTR/RTS pulse for reset). An older Vectis without RFC 2217 support is
still reachable via the legacy `tcp://host:port` URL and the in-band
`Ctrl+P` reset trigger.

Smoke-test the bridge first with `telnet` (which negotiates RFC 2217)
or with `socat` (which stays in legacy raw mode):

```bash
telnet 172.17.32.17 35240
# Or, legacy raw mode:
socat -,raw,echo=0 TCP:172.17.32.17:35240
```

> Note: Vectis only emits a fixed-width reset pulse, so it does not
> work with `defib restore` (which needs independent
> `power_off`/`power_on`).
>
> Note: tight-loop bootrom catching needs a low-RTT link to Vectis.
> Over a high-RTT WAN link the bootrom's `0x20`-marker / `0xAA`-ack
> window can close before the round-trip completes; running Vectis
> on the same host as defib (or close to it on a LAN) is recommended.

The `-t` flag auto-detects the post-boot mode:
- **Normal U-Boot shell** (e.g. hi3516ev300): raw terminal passthrough — type commands directly
- **Download command mode** (e.g. hi3516av200): interactive `defib>` prompt that wraps commands in HiSilicon's XHEAD/XCMD protocol, enabling flash operations on devices that enter `download_process()` after serial boot
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ hisilicon_cv6xx = "defib.protocol.hisilicon_cv6xx:HiSiliconCV6xx"

[project.entry-points."defib.power"]
routeros = "defib.power.routeros:RouterOSController"
vectis = "defib.power.vectis:VectisController"

[build-system]
requires = ["hatchling"]
Expand Down
111 changes: 82 additions & 29 deletions src/defib/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,35 +100,44 @@ def _dl_progress(done: int, total: int) -> None:
power_controller = None
poe_port = None
if power_cycle:
from defib.power.factory import power_controller_from_env
from defib.power.routeros import RouterOSController

try:
power_controller = RouterOSController.from_env()
power_controller = power_controller_from_env()
except Exception as e:
if output == "json":
print(json_mod.dumps({"event": "error", "message": str(e)}))
else:
console.print(f"[red]Power controller error:[/red] {e}")
raise typer.Exit(1)

# Extract device label from serial port name for auto-discovery
# e.g. /dev/uart-IVGHP203Y-AF -> IVGHP203Y-AF
from pathlib import Path
port_basename = Path(port).name
device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename
if isinstance(power_controller, RouterOSController):
# Extract device label from serial port name for auto-discovery
# e.g. /dev/uart-IVGHP203Y-AF -> IVGHP203Y-AF
from pathlib import Path
port_basename = Path(port).name
device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename

try:
poe_port = await power_controller.find_port_by_comment(device_label)
except Exception as e:
if output == "json":
print(json_mod.dumps({"event": "error", "message": str(e)}))
else:
console.print(f"[red]PoE port discovery failed:[/red] {e}")
await power_controller.close()
raise typer.Exit(1)
try:
poe_port = await power_controller.find_port_by_comment(device_label)
except Exception as e:
if output == "json":
print(json_mod.dumps({"event": "error", "message": str(e)}))
else:
console.print(f"[red]PoE port discovery failed:[/red] {e}")
await power_controller.close()
raise typer.Exit(1)

if output == "human":
console.print(f"PoE control: [cyan]{poe_port}[/cyan] on [cyan]{power_controller._host}[/cyan]")
if output == "human":
console.print(f"PoE control: [cyan]{poe_port}[/cyan] on [cyan]{power_controller._host}[/cyan]")
else:
# Vectis (and any future single-port controller) has no port
# discovery — pass an empty string so the recovery session
# knows automated cycling is available.
poe_port = ""
if output == "human":
console.print(f"Power: [cyan]{power_controller.name()}[/cyan]")

try:
session = RecoverySession(
Expand Down Expand Up @@ -159,6 +168,18 @@ def _dl_progress(done: int, total: int) -> None:
console.print(f"[red]Failed to open serial port:[/red] {e}")
raise typer.Exit(2)

# Vectis: hand the live RFC 2217 transport (or legacy raw TCP) to
# the controller so RTS/DTR toggles ride the same connection that
# the UART data uses — Vectis only allows one client at a time.
if power_controller is not None:
from defib.power.vectis import VectisController
from defib.transport.rfc2217 import Rfc2217Transport
from defib.transport.socket import SocketTransport
if isinstance(power_controller, VectisController) and isinstance(
transport, (Rfc2217Transport, SocketTransport)
):
power_controller.attach_transport(transport)

# Rich progress bar for human output
from rich.progress import TaskID
progress_ctx = None
Expand Down Expand Up @@ -1827,24 +1848,30 @@ async def _install_async(
power_controller = None
poe_port = None
if power_cycle:
from defib.power.factory import power_controller_from_env
from defib.power.routeros import RouterOSController
try:
power_controller = RouterOSController.from_env()
power_controller = power_controller_from_env()
except Exception as e:
console.print(f"[red]Power controller error:[/red] {e}")
raise typer.Exit(1)

port_basename = Path(port).name
device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename
try:
poe_port = await power_controller.find_port_by_comment(device_label)
except Exception as e:
console.print(f"[red]PoE port discovery failed:[/red] {e}")
await power_controller.close()
raise typer.Exit(1)
if isinstance(power_controller, RouterOSController):
port_basename = Path(port).name
device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename
try:
poe_port = await power_controller.find_port_by_comment(device_label)
except Exception as e:
console.print(f"[red]PoE port discovery failed:[/red] {e}")
await power_controller.close()
raise typer.Exit(1)

if output == "human":
console.print(f" PoE: [cyan]{poe_port}[/cyan]")
if output == "human":
console.print(f" PoE: [cyan]{poe_port}[/cyan]")
else:
poe_port = ""
if output == "human":
console.print(f" Power: [cyan]{power_controller.name()}[/cyan]")

session = RecoverySession(
chip=chip, firmware_path=str(cached),
Expand All @@ -1858,6 +1885,15 @@ async def _install_async(

transport = await create_transport(normalize_port_name(port))

# Vectis: share the TCP transport for Ctrl+P delivery (see burn).
if power_controller is not None:
from defib.power.vectis import VectisController
from defib.transport.socket import SocketTransport
if isinstance(power_controller, VectisController) and isinstance(
transport, SocketTransport
):
power_controller.attach_transport(transport)

def on_log(event: LogEvent) -> None:
if output == "human":
style = {"error": "red", "warn": "yellow", "info": "green"}.get(event.level, "")
Expand Down Expand Up @@ -2367,13 +2403,27 @@ async def _restore_async(
power_controller = None
poe_port = None
if power_cycle:
from defib.power.factory import power_controller_from_env
from defib.power.routeros import RouterOSController
try:
power_controller = RouterOSController.from_env()
power_controller = power_controller_from_env()
except Exception as e:
console.print(f"[red]Power controller error:[/red] {e}")
raise typer.Exit(1)

# restore needs independent power_off/power_on (frame-blast flow);
# Vectis only emits a fixed pulse and cannot satisfy that.
if not isinstance(power_controller, RouterOSController):
console.print(
f"[red]restore requires a controller that supports independent "
f"power_off/power_on; {power_controller.name()!r} only supports "
f"power_cycle. Use --power-cycle with DEFIB_POWER_TYPE=routeros, "
f"or run restore without --power-cycle and cycle power manually."
f"[/red]"
)
await power_controller.close()
raise typer.Exit(1)

port_basename = Path(port).name
device_label = port_basename.removeprefix("uart-") if port_basename.startswith("uart-") else port_basename
try:
Expand All @@ -2395,6 +2445,9 @@ async def _restore_async(
# open serial on a quiet line, then let session handle power-on.
if power_controller and poe_port:
import asyncio as _aio
from defib.power.routeros import RouterOSController
# We rejected non-RouterOS controllers above — narrow for mypy.
assert isinstance(power_controller, RouterOSController)
if output == "human":
console.print(" Powering off...")
power_controller._saved_poe_out[poe_port] = "forced-on"
Expand Down
31 changes: 31 additions & 0 deletions src/defib/power/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Power controller factory — pick implementation from environment."""

from __future__ import annotations

import os

from defib.power.base import PowerController, PowerControllerError


def power_controller_from_env() -> PowerController:
"""Build a PowerController based on the ``DEFIB_POWER_TYPE`` env var.

- ``DEFIB_POWER_TYPE=routeros`` (default): MikroTik RouterOS API,
configured via ``DEFIB_POE_*``.
- ``DEFIB_POWER_TYPE=vectis``: OpenIPC Vectis UART bridge,
configured via ``DEFIB_VECTIS_*``.

Raises:
PowerControllerError: if the type is unknown or required env
vars are missing.
"""
kind = os.environ.get("DEFIB_POWER_TYPE", "routeros").lower()
if kind == "routeros":
from defib.power.routeros import RouterOSController
return RouterOSController.from_env()
if kind == "vectis":
from defib.power.vectis import VectisController
return VectisController.from_env()
raise PowerControllerError(
f"Unknown DEFIB_POWER_TYPE: {kind!r} (expected 'routeros' or 'vectis')"
)
Loading
Loading