Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 165 additions & 38 deletions src/defib/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1690,7 +1690,7 @@ async def _install_async(

from rich.console import Console

from defib.flashdump import get_ram_staging_addr, send_command, tftp_to_ram
from defib.flashdump import get_ram_staging_addr, send_command
from defib.firmware import download_firmware, get_cached_path, has_firmware
from defib.network.ip_manager import list_interfaces, temporary_ip
from defib.network.tftp_server import start_tftp_server
Expand Down Expand Up @@ -1849,7 +1849,7 @@ def on_progress(event: ProgressEvent) -> None:
transport,
on_progress=on_progress,
on_log=on_log,
send_break=True,
send_break=False,
)

if not result.success:
Expand All @@ -1862,22 +1862,71 @@ def on_progress(event: ProgressEvent) -> None:
if output == "human":
console.print(f" [green]U-Boot loaded in {result.elapsed_ms:.0f}ms[/green]")

# --- Step 3.5: Detect U-Boot mode (download_process or shell) ---
# Must happen here (not in session.run) to detect download_process mode.
import asyncio as _aio
import time as _time_mod

buf = bytearray()
start_detect = _time_mod.monotonic()
download_mode = False

while _time_mod.monotonic() - start_detect < 15:
await transport.write(b"\x03")
try:
det_data = await transport.read(256, timeout=0.2)
buf.extend(det_data)
text = buf.decode("ascii", errors="replace")
if "start download process" in text:
download_mode = True
break
if "autoboot" in text.lower():
if output == "human":
console.print(" Autoboot detected, sending Ctrl-C...")
for _ in range(20):
await transport.write(b"\x03")
await _aio.sleep(0.1)
break
tail = text[-256:] if len(text) > 256 else text
if "hisilicon #" in tail or "OpenIPC #" in tail or "\n=> " in tail:
break
except Exception:
pass

if download_mode:
if output == "human":
console.print(" [cyan]Download command mode detected[/cyan]")
from defib.protocol.download_cmd import DownloadCommandClient
dl_client = DownloadCommandClient(transport)

async def _cmd(cmd: str, timeout: float = 60.0, **kw: object) -> str:
ok, out = await dl_client.send_command(cmd, timeout=timeout)
if not ok and output == "human":
console.print(f" [yellow]Warning: {cmd} → ERROR[/yellow]")
return out
else:
if output == "human":
console.print(" [cyan]U-Boot shell mode[/cyan]")

async def _cmd(cmd: str, timeout: float = 60.0, **kw: object) -> str:
return await send_command(transport, cmd, timeout=timeout, wait_for="# ")

# --- Step 4: U-Boot console — probe flash ---
if output == "human":
console.print("\n[bold yellow]Phase 2: Flash via TFTP[/bold yellow]")

ram_addr = get_ram_staging_addr(chip)

if nand:
resp = await send_command(transport, "nand info", timeout=5.0, wait_for="# ")
resp = await _cmd("nand info", timeout=5.0)
if "error" in resp.lower() or "no nand" in resp.lower():
console.print(f"[red]NAND detection failed:[/red] {resp.strip()}")
await transport.close()
raise typer.Exit(1)
if output == "human":
console.print(" [green]NAND flash detected[/green]")
else:
resp = await send_command(transport, "sf probe 0", timeout=5.0, wait_for="# ")
resp = await _cmd("sf probe 0", timeout=5.0)
if "error" in resp.lower() or "fail" in resp.lower():
console.print(f"[red]sf probe failed:[/red] {resp.strip()}")
await transport.close()
Expand Down Expand Up @@ -1921,12 +1970,21 @@ def on_progress(event: ProgressEvent) -> None:

try:
# Configure U-Boot networking
await send_command(transport, f"setenv ipaddr {device_ip}", timeout=3.0, wait_for="# ")
await send_command(transport, f"setenv serverip {host_ip}", timeout=3.0, wait_for="# ")
await _cmd(f"setenv ipaddr {device_ip}", timeout=3.0)
await _cmd(f"setenv serverip {host_ip}", timeout=3.0)

if output == "human":
console.print(f" Device IP: [cyan]{device_ip}[/cyan]")

async def _tftp_to_ram(filename: str, timeout: float = 120.0) -> str:
"""TFTP download using _cmd (supports download mode)."""
resp = await _cmd(f"tftpboot 0x{ram_addr:x} {filename}", timeout=timeout)
if "unknown command" in resp.lower():
resp = await _cmd(f"tftp 0x{ram_addr:x} {filename}", timeout=timeout)
if "done" not in resp.lower() and "bytes transferred" not in resp.lower():
raise RuntimeError(f"TFTP download failed: {resp.strip()[-200:]}")
return resp

async def tftp_and_flash(
name: str, tftp_name: str, orig_data: bytes,
flash_off: int, erase_sz: int,
Expand All @@ -1936,17 +1994,16 @@ async def tftp_and_flash(
console.print(f"\n [bold]Flashing {name}[/bold] → 0x{flash_off:X} ({len(orig_data)} bytes)")

try:
resp = await tftp_to_ram(transport, ram_addr, tftp_name, timeout=120.0)
resp = await _tftp_to_ram(tftp_name, timeout=120.0)
except RuntimeError as e:
console.print(f"[red]TFTP failed for {name}:[/red] {e}")
raise typer.Exit(1)

# Verify TFTP transfer in RAM before writing to flash
expected_crc = zlib.crc32(orig_data) & 0xFFFFFFFF
resp = await send_command(
transport,
resp = await _cmd(
f"crc32 0x{ram_addr:x} 0x{len(orig_data):x}",
timeout=10.0, wait_for="# ",
timeout=10.0,
)
m = re_mod.search(r"==>\s*([0-9a-fA-F]{8})", resp)
if m:
Expand All @@ -1961,30 +2018,30 @@ async def tftp_and_flash(
console.print(f" TFTP CRC verified: {ram_crc:08X}")

erase_timeout = 120.0 if nand else 60.0
await send_command(
transport,
await _cmd(
f"{flash_cmd} erase 0x{flash_off:x} 0x{erase_sz:x}",
timeout=erase_timeout, wait_for="# ",
timeout=erase_timeout,
)
await send_command(
transport,
f"{flash_cmd} write 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}",
timeout=120.0 if nand else 60.0, wait_for="# ",
# NAND requires page-aligned write sizes (2KB pages)
write_sz = len(orig_data)
if nand:
write_sz = ((write_sz + 2047) // 2048) * 2048
await _cmd(
f"{flash_cmd} write 0x{ram_addr:x} 0x{flash_off:x} 0x{write_sz:x}",
timeout=120.0 if nand else 60.0,
)

# Verify flash write by reading back and checking CRC.
# Skip for NAND — ECC/OOB makes raw read-back differ from
# the original data; the TFTP-to-RAM CRC above is sufficient.
if not nand:
await send_command(
transport,
await _cmd(
f"{flash_cmd} read 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}",
timeout=30.0, wait_for="# ",
timeout=30.0,
)
resp = await send_command(
transport,
resp = await _cmd(
f"crc32 0x{ram_addr:x} 0x{len(orig_data):x}",
timeout=10.0, wait_for="# ",
timeout=10.0,
)
m = re_mod.search(r"==>\s*([0-9a-fA-F]{8})", resp)
if m:
Expand All @@ -2003,7 +2060,66 @@ async def tftp_and_flash(

await tftp_and_flash("U-Boot", "u-boot.bin", uboot_data, b_off, uboot_flash_size)
await tftp_and_flash("kernel", kernel_name, kernel_data, k_off, k_sz)
await tftp_and_flash("rootfs", rootfs_name, rootfs_data, r_off, r_sz)

# For NAND: raw UBI images must be written via ubi write, not nand write.
# nand write skips bad blocks, shifting data and corrupting UBIFS inside.
from defib.ubi import extract_ubifs, is_ubi_image

if nand and is_ubi_image(rootfs_data):
if output == "human":
console.print(
f"\n [bold]Flashing rootfs (UBI)[/bold] → 0x{r_off:X}"
f" ({len(rootfs_data)} bytes)"
)

# Extract UBIFS volume data from raw UBI image
ubifs_data = extract_ubifs(rootfs_data)
if output == "human":
console.print(
f" Extracted UBIFS: {len(ubifs_data)} bytes"
f" from {len(rootfs_data)} byte UBI image"
)

# Replace TFTP file with extracted UBIFS
tftp_protocol._files[rootfs_name] = ubifs_data

try:
resp = await _tftp_to_ram(rootfs_name, timeout=120.0)
except RuntimeError as e:
console.print(f"[red]TFTP failed for rootfs:[/red] {e}")
raise typer.Exit(1)
if output == "human":
console.print(" TFTP OK")

# Erase the full rootfs partition
await _cmd(f"nand erase 0x{r_off:x} 0x{r_sz:x}", timeout=120.0)

# UBI format + create volume + write
# mtdparts for OpenIPC NAND: hinand:1M(boot),1M(env),8M(kernel),-(ubi)
nand_name = "hinand"
await _cmd(f"setenv mtdids nand0={nand_name}", timeout=3.0)
await _cmd(
f"setenv mtdparts mtdparts={nand_name}:"
f"1024k(boot),1024k(env),8192k(kernel),-(ubi)",
timeout=3.0,
)
await _cmd("mtdparts", timeout=3.0)
await _cmd("ubi part ubi", timeout=120.0)
# Create volume using all available space (not just image size)
# so UBIFS has room for runtime writes
await _cmd("ubi create rootfs", timeout=60.0)
resp = await _cmd(
f"ubi write 0x{ram_addr:x} rootfs 0x{len(ubifs_data):x}",
timeout=300.0,
)
if "error" in resp.lower() or "cannot" in resp.lower():
console.print(f"[red]ubi write failed:[/red] {resp.strip()[-120:]}")
raise typer.Exit(1)

if output == "human":
console.print(" [green]rootfs (UBI) OK[/green]")
else:
await tftp_and_flash("rootfs", rootfs_name, rootfs_data, r_off, r_sz)

# Set up proper boot environment
if nand:
Expand All @@ -2012,15 +2128,13 @@ async def tftp_and_flash(
# Set mtdparts and bootcmd directly — don't rely on env macros
# which may be wrong or missing on the target device.
# Layout: 1M(boot),1M(env),8M(kernel),-(ubi)
await send_command(
transport,
await _cmd(
"setenv mtdparts hinand:1024k(boot),1024k(env),8192k(kernel),-(ubi)",
timeout=3.0, wait_for="# ",
timeout=3.0,
)
await send_command(
transport,
await _cmd(
r"setenv bootcmd nand read ${baseaddr} 0x200000 0x800000\; bootm ${baseaddr}",
timeout=3.0, wait_for="# ",
timeout=3.0,
)
else:
nor_cmd = "setnor8m" if nor_size < 16 else "setnor16m"
Expand All @@ -2029,18 +2143,16 @@ async def tftp_and_flash(
# setnor8m does: set mtdparts, set bootcmd, saveenv, reset
# We do it manually to avoid the auto-reset
mtdparts_var = f"mtdpartsnor{nor_size}m"
await send_command(transport, f"run {mtdparts_var}", timeout=3.0, wait_for="# ")
await send_command(
transport, "setenv bootcmd ${bootcmdnor}", timeout=3.0, wait_for="# ",
)
resp = await send_command(transport, "saveenv", timeout=10.0, wait_for="# ")
await _cmd(f"run {mtdparts_var}", timeout=3.0)
await _cmd("setenv bootcmd ${bootcmdnor}", timeout=3.0)
resp = await _cmd("saveenv", timeout=10.0)
if output == "human":
console.print(" [green]Environment saved[/green]")

# Reset
if output == "human":
console.print("\n [bold]Resetting device...[/bold]")
await send_command(transport, "reset", timeout=3.0)
await _cmd("reset", timeout=3.0)

finally:
tftp_transport.close()
Expand Down Expand Up @@ -2409,7 +2521,7 @@ async def _send(cmd: str, timeout: float = 60.0) -> str:
from defib.network.tftp_server import start_tftp_server

tftp_files = {name: data for name, data in partitions}
tftp_transport, _ = await start_tftp_server(
tftp_transport, tftp_proto = await start_tftp_server(
files=tftp_files, bind_addr=host_ip, port=69, done_count=len(partitions),
)

Expand Down Expand Up @@ -2476,11 +2588,25 @@ async def _send(cmd: str, timeout: float = 60.0) -> str:
t0 = _time.monotonic()

# Detect partition type before TFTP
# Raw UBI images (UBI# magic) must be converted to UBIFS first —
# nand write of raw UBI corrupts UBIFS due to bad block shifting.
from defib.ubi import extract_ubifs as _extract_ubifs, is_ubi_image as _is_ubi

is_ubifs = (
detected_flash == "nand"
and len(data) >= 4
and data[:4] == b"\x31\x18\x10\x06" # UBIFS superblock
)
if detected_flash == "nand" and len(data) >= 4 and _is_ubi(data):
if output == "human":
console.print(" Raw UBI image → extracting UBIFS volume data")
data = _extract_ubifs(data)
is_ubifs = True
if output == "human":
console.print(f" Extracted {len(data)} bytes of UBIFS")
# Update TFTP file and write_size for the extracted data
tftp_proto._files[name] = data
write_size = ((len(data) + page - 1) // page) * page

if detected_flash == "nand" and is_ubifs:
# UBI-aware write: let UBI handle bad block mapping
Expand Down Expand Up @@ -2570,7 +2696,8 @@ async def _send(cmd: str, timeout: float = 60.0) -> str:
if output == "human":
console.print(f" UBI format {real_name}...")
await _send(f"ubi part {real_name}", timeout=120)
await _send(f"ubi create {vol_name} 0x{len(data):x}", timeout=60)
# Use all available space so UBIFS has room for runtime writes
await _send(f"ubi create {vol_name}", timeout=60)
resp = await _send(f"ubi write 0x{ram_addr:x} {vol_name} 0x{len(data):x}", timeout=300)
if "error" in resp.lower() or "cannot" in resp.lower():
if output == "human":
Expand Down
Loading
Loading