From e7c279620e978ae2a283795f59993cbf29568293 Mon Sep 17 00:00:00 2001 From: Dmitry Ilyin Date: Wed, 22 Apr 2026 23:31:39 +0300 Subject: [PATCH 1/2] Add UBI-aware NAND writes for install and restore commands Raw UBI images written via `nand write` corrupt UBIFS because bad block skipping shifts data, breaking UBI's physical-to-logical block mapping. This adds a UBI image parser (`defib.ubi`) that extracts UBIFS volume data from raw UBI images (UBI# magic), then writes via `ubi part` / `ubi create` / `ubi write` which lets U-Boot's UBI subsystem handle bad block mapping. Changes: - New `src/defib/ubi.py`: parse UBI EC/VID headers, extract UBIFS by volume ID - `defib install --nand`: detect raw UBI rootfs, extract UBIFS, write via `ubi write` instead of `nand write`; detect download command mode (XHEAD/XCMD) so install works on chips with CONFIG_START_MAGIC (e.g. hi3516av200); page-align NAND write sizes to 2KB - `defib restore`: auto-detect raw UBI images in partition files and convert to UBIFS before UBI-aware write; create UBI volumes with full partition space (not just image size) so UBIFS has room for runtime writes - 8 new tests for UBI extraction Tested on hi3516av200 (128MB SPI NAND, MX35LF1GE4AB): - UBI attaches cleanly: 944 PEBs, 0 bad, 0 corrupted - UBIFS mounts without errors: 111 MiB usable (vs 10 MiB before) - Linux boots fully, services start, no "space left" errors Co-Authored-By: Claude Opus 4.6 (1M context) --- src/defib/cli/app.py | 203 +++++++++++++++++++++++++++++++++++-------- src/defib/ubi.py | 133 ++++++++++++++++++++++++++++ tests/test_ubi.py | 161 ++++++++++++++++++++++++++++++++++ 3 files changed, 459 insertions(+), 38 deletions(-) create mode 100644 src/defib/ubi.py create mode 100644 tests/test_ubi.py diff --git a/src/defib/cli/app.py b/src/defib/cli/app.py index e1f84ec..60b7f3b 100644 --- a/src/defib/cli/app.py +++ b/src/defib/cli/app.py @@ -1690,7 +1690,7 @@ async def _install_async( from rich.console import Console - from defib.flashdump import get_ram_staging_addr, send_command, tftp_to_ram + from defib.flashdump import get_ram_staging_addr, send_command from defib.firmware import download_firmware, get_cached_path, has_firmware from defib.network.ip_manager import list_interfaces, temporary_ip from defib.network.tftp_server import start_tftp_server @@ -1849,7 +1849,7 @@ def on_progress(event: ProgressEvent) -> None: transport, on_progress=on_progress, on_log=on_log, - send_break=True, + send_break=False, ) if not result.success: @@ -1862,6 +1862,55 @@ def on_progress(event: ProgressEvent) -> None: if output == "human": console.print(f" [green]U-Boot loaded in {result.elapsed_ms:.0f}ms[/green]") + # --- Step 3.5: Detect U-Boot mode (download_process or shell) --- + # Must happen here (not in session.run) to detect download_process mode. + import asyncio as _aio + import time as _time_mod + + buf = bytearray() + start_detect = _time_mod.monotonic() + download_mode = False + + while _time_mod.monotonic() - start_detect < 15: + await transport.write(b"\x03") + try: + det_data = await transport.read(256, timeout=0.2) + buf.extend(det_data) + text = buf.decode("ascii", errors="replace") + if "start download process" in text: + download_mode = True + break + if "autoboot" in text.lower(): + if output == "human": + console.print(" Autoboot detected, sending Ctrl-C...") + for _ in range(20): + await transport.write(b"\x03") + await _aio.sleep(0.1) + break + tail = text[-256:] if len(text) > 256 else text + if "hisilicon #" in tail or "OpenIPC #" in tail or "\n=> " in tail: + break + except Exception: + pass + + if download_mode: + if output == "human": + console.print(" [cyan]Download command mode detected[/cyan]") + from defib.protocol.download_cmd import DownloadCommandClient + dl_client = DownloadCommandClient(transport) + + async def _cmd(cmd: str, timeout: float = 60.0, **kw: object) -> str: + ok, out = await dl_client.send_command(cmd, timeout=timeout) + if not ok and output == "human": + console.print(f" [yellow]Warning: {cmd} → ERROR[/yellow]") + return out + else: + if output == "human": + console.print(" [cyan]U-Boot shell mode[/cyan]") + + async def _cmd(cmd: str, timeout: float = 60.0, **kw: object) -> str: + return await send_command(transport, cmd, timeout=timeout, wait_for="# ") + # --- Step 4: U-Boot console — probe flash --- if output == "human": console.print("\n[bold yellow]Phase 2: Flash via TFTP[/bold yellow]") @@ -1869,7 +1918,7 @@ def on_progress(event: ProgressEvent) -> None: ram_addr = get_ram_staging_addr(chip) if nand: - resp = await send_command(transport, "nand info", timeout=5.0, wait_for="# ") + resp = await _cmd("nand info", timeout=5.0) if "error" in resp.lower() or "no nand" in resp.lower(): console.print(f"[red]NAND detection failed:[/red] {resp.strip()}") await transport.close() @@ -1877,7 +1926,7 @@ def on_progress(event: ProgressEvent) -> None: if output == "human": console.print(" [green]NAND flash detected[/green]") else: - resp = await send_command(transport, "sf probe 0", timeout=5.0, wait_for="# ") + resp = await _cmd("sf probe 0", timeout=5.0) if "error" in resp.lower() or "fail" in resp.lower(): console.print(f"[red]sf probe failed:[/red] {resp.strip()}") await transport.close() @@ -1921,12 +1970,21 @@ def on_progress(event: ProgressEvent) -> None: try: # Configure U-Boot networking - await send_command(transport, f"setenv ipaddr {device_ip}", timeout=3.0, wait_for="# ") - await send_command(transport, f"setenv serverip {host_ip}", timeout=3.0, wait_for="# ") + await _cmd(f"setenv ipaddr {device_ip}", timeout=3.0) + await _cmd(f"setenv serverip {host_ip}", timeout=3.0) if output == "human": console.print(f" Device IP: [cyan]{device_ip}[/cyan]") + async def _tftp_to_ram(filename: str, timeout: float = 120.0) -> str: + """TFTP download using _cmd (supports download mode).""" + resp = await _cmd(f"tftpboot 0x{ram_addr:x} {filename}", timeout=timeout) + if "unknown command" in resp.lower(): + resp = await _cmd(f"tftp 0x{ram_addr:x} {filename}", timeout=timeout) + if "done" not in resp.lower() and "bytes transferred" not in resp.lower(): + raise RuntimeError(f"TFTP download failed: {resp.strip()[-200:]}") + return resp + async def tftp_and_flash( name: str, tftp_name: str, orig_data: bytes, flash_off: int, erase_sz: int, @@ -1936,17 +1994,16 @@ async def tftp_and_flash( console.print(f"\n [bold]Flashing {name}[/bold] → 0x{flash_off:X} ({len(orig_data)} bytes)") try: - resp = await tftp_to_ram(transport, ram_addr, tftp_name, timeout=120.0) + resp = await _tftp_to_ram(tftp_name, timeout=120.0) except RuntimeError as e: console.print(f"[red]TFTP failed for {name}:[/red] {e}") raise typer.Exit(1) # Verify TFTP transfer in RAM before writing to flash expected_crc = zlib.crc32(orig_data) & 0xFFFFFFFF - resp = await send_command( - transport, + resp = await _cmd( f"crc32 0x{ram_addr:x} 0x{len(orig_data):x}", - timeout=10.0, wait_for="# ", + timeout=10.0, ) m = re_mod.search(r"==>\s*([0-9a-fA-F]{8})", resp) if m: @@ -1961,30 +2018,30 @@ async def tftp_and_flash( console.print(f" TFTP CRC verified: {ram_crc:08X}") erase_timeout = 120.0 if nand else 60.0 - await send_command( - transport, + await _cmd( f"{flash_cmd} erase 0x{flash_off:x} 0x{erase_sz:x}", - timeout=erase_timeout, wait_for="# ", + timeout=erase_timeout, ) - await send_command( - transport, - f"{flash_cmd} write 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}", - timeout=120.0 if nand else 60.0, wait_for="# ", + # NAND requires page-aligned write sizes (2KB pages) + write_sz = len(orig_data) + if nand: + write_sz = ((write_sz + 2047) // 2048) * 2048 + await _cmd( + f"{flash_cmd} write 0x{ram_addr:x} 0x{flash_off:x} 0x{write_sz:x}", + timeout=120.0 if nand else 60.0, ) # Verify flash write by reading back and checking CRC. # Skip for NAND — ECC/OOB makes raw read-back differ from # the original data; the TFTP-to-RAM CRC above is sufficient. if not nand: - await send_command( - transport, + await _cmd( f"{flash_cmd} read 0x{ram_addr:x} 0x{flash_off:x} 0x{len(orig_data):x}", - timeout=30.0, wait_for="# ", + timeout=30.0, ) - resp = await send_command( - transport, + resp = await _cmd( f"crc32 0x{ram_addr:x} 0x{len(orig_data):x}", - timeout=10.0, wait_for="# ", + timeout=10.0, ) m = re_mod.search(r"==>\s*([0-9a-fA-F]{8})", resp) if m: @@ -2003,7 +2060,66 @@ async def tftp_and_flash( await tftp_and_flash("U-Boot", "u-boot.bin", uboot_data, b_off, uboot_flash_size) await tftp_and_flash("kernel", kernel_name, kernel_data, k_off, k_sz) - await tftp_and_flash("rootfs", rootfs_name, rootfs_data, r_off, r_sz) + + # For NAND: raw UBI images must be written via ubi write, not nand write. + # nand write skips bad blocks, shifting data and corrupting UBIFS inside. + from defib.ubi import extract_ubifs, is_ubi_image + + if nand and is_ubi_image(rootfs_data): + if output == "human": + console.print( + f"\n [bold]Flashing rootfs (UBI)[/bold] → 0x{r_off:X}" + f" ({len(rootfs_data)} bytes)" + ) + + # Extract UBIFS volume data from raw UBI image + ubifs_data = extract_ubifs(rootfs_data) + if output == "human": + console.print( + f" Extracted UBIFS: {len(ubifs_data)} bytes" + f" from {len(rootfs_data)} byte UBI image" + ) + + # Replace TFTP file with extracted UBIFS + tftp_protocol._files[rootfs_name] = ubifs_data + + try: + resp = await _tftp_to_ram(rootfs_name, timeout=120.0) + except RuntimeError as e: + console.print(f"[red]TFTP failed for rootfs:[/red] {e}") + raise typer.Exit(1) + if output == "human": + console.print(" TFTP OK") + + # Erase the full rootfs partition + await _cmd(f"nand erase 0x{r_off:x} 0x{r_sz:x}", timeout=120.0) + + # UBI format + create volume + write + # mtdparts for OpenIPC NAND: hinand:1M(boot),1M(env),8M(kernel),-(ubi) + nand_name = "hinand" + await _cmd(f"setenv mtdids nand0={nand_name}", timeout=3.0) + await _cmd( + f"setenv mtdparts mtdparts={nand_name}:" + f"1024k(boot),1024k(env),8192k(kernel),-(ubi)", + timeout=3.0, + ) + await _cmd("mtdparts", timeout=3.0) + await _cmd("ubi part ubi", timeout=120.0) + # Create volume using all available space (not just image size) + # so UBIFS has room for runtime writes + await _cmd("ubi create rootfs", timeout=60.0) + resp = await _cmd( + f"ubi write 0x{ram_addr:x} rootfs 0x{len(ubifs_data):x}", + timeout=300.0, + ) + if "error" in resp.lower() or "cannot" in resp.lower(): + console.print(f"[red]ubi write failed:[/red] {resp.strip()[-120:]}") + raise typer.Exit(1) + + if output == "human": + console.print(" [green]rootfs (UBI) OK[/green]") + else: + await tftp_and_flash("rootfs", rootfs_name, rootfs_data, r_off, r_sz) # Set up proper boot environment if nand: @@ -2012,15 +2128,13 @@ async def tftp_and_flash( # Set mtdparts and bootcmd directly — don't rely on env macros # which may be wrong or missing on the target device. # Layout: 1M(boot),1M(env),8M(kernel),-(ubi) - await send_command( - transport, + await _cmd( "setenv mtdparts hinand:1024k(boot),1024k(env),8192k(kernel),-(ubi)", - timeout=3.0, wait_for="# ", + timeout=3.0, ) - await send_command( - transport, + await _cmd( r"setenv bootcmd nand read ${baseaddr} 0x200000 0x800000\; bootm ${baseaddr}", - timeout=3.0, wait_for="# ", + timeout=3.0, ) else: nor_cmd = "setnor8m" if nor_size < 16 else "setnor16m" @@ -2029,18 +2143,16 @@ async def tftp_and_flash( # setnor8m does: set mtdparts, set bootcmd, saveenv, reset # We do it manually to avoid the auto-reset mtdparts_var = f"mtdpartsnor{nor_size}m" - await send_command(transport, f"run {mtdparts_var}", timeout=3.0, wait_for="# ") - await send_command( - transport, "setenv bootcmd ${bootcmdnor}", timeout=3.0, wait_for="# ", - ) - resp = await send_command(transport, "saveenv", timeout=10.0, wait_for="# ") + await _cmd(f"run {mtdparts_var}", timeout=3.0) + await _cmd("setenv bootcmd ${bootcmdnor}", timeout=3.0) + resp = await _cmd("saveenv", timeout=10.0) if output == "human": console.print(" [green]Environment saved[/green]") # Reset if output == "human": console.print("\n [bold]Resetting device...[/bold]") - await send_command(transport, "reset", timeout=3.0) + await _cmd("reset", timeout=3.0) finally: tftp_transport.close() @@ -2409,7 +2521,7 @@ async def _send(cmd: str, timeout: float = 60.0) -> str: from defib.network.tftp_server import start_tftp_server tftp_files = {name: data for name, data in partitions} - tftp_transport, _ = await start_tftp_server( + tftp_transport, tftp_proto = await start_tftp_server( files=tftp_files, bind_addr=host_ip, port=69, done_count=len(partitions), ) @@ -2476,11 +2588,25 @@ async def _send(cmd: str, timeout: float = 60.0) -> str: t0 = _time.monotonic() # Detect partition type before TFTP + # Raw UBI images (UBI# magic) must be converted to UBIFS first — + # nand write of raw UBI corrupts UBIFS due to bad block shifting. + from defib.ubi import extract_ubifs as _extract_ubifs, is_ubi_image as _is_ubi + is_ubifs = ( detected_flash == "nand" and len(data) >= 4 and data[:4] == b"\x31\x18\x10\x06" # UBIFS superblock ) + if detected_flash == "nand" and len(data) >= 4 and _is_ubi(data): + if output == "human": + console.print(" Raw UBI image → extracting UBIFS volume data") + data = _extract_ubifs(data) + is_ubifs = True + if output == "human": + console.print(f" Extracted {len(data)} bytes of UBIFS") + # Update TFTP file and write_size for the extracted data + tftp_proto._files[name] = data + write_size = ((len(data) + page - 1) // page) * page if detected_flash == "nand" and is_ubifs: # UBI-aware write: let UBI handle bad block mapping @@ -2570,7 +2696,8 @@ async def _send(cmd: str, timeout: float = 60.0) -> str: if output == "human": console.print(f" UBI format {real_name}...") await _send(f"ubi part {real_name}", timeout=120) - await _send(f"ubi create {vol_name} 0x{len(data):x}", timeout=60) + # Use all available space so UBIFS has room for runtime writes + await _send(f"ubi create {vol_name}", timeout=60) resp = await _send(f"ubi write 0x{ram_addr:x} {vol_name} 0x{len(data):x}", timeout=300) if "error" in resp.lower() or "cannot" in resp.lower(): if output == "human": diff --git a/src/defib/ubi.py b/src/defib/ubi.py new file mode 100644 index 0000000..85cf5dd --- /dev/null +++ b/src/defib/ubi.py @@ -0,0 +1,133 @@ +"""Extract UBIFS volume data from raw UBI images. + +A raw UBI image (magic ``UBI#`` / 0x55424923) contains physical erase blocks +(PEBs) with EC and VID headers that encode a chip-specific physical-to-logical +block mapping. Writing such an image with ``nand write`` on a chip with +different bad blocks shifts data and corrupts the UBIFS inside. + +This module extracts the logical UBIFS volume data so it can be written back +via U-Boot's ``ubi write`` command, which creates fresh PEB mappings appropriate +for the target chip. +""" + +from __future__ import annotations + +import struct + +UBI_EC_MAGIC = b"UBI#" # 0x55424923 +UBIFS_MAGIC = b"\x31\x18\x10\x06" # 0x06101831 LE + +# UBI EC header: magic(4) version(1) padding(3) ec(8) vid_hdr_offset(4) +# data_offset(4) image_seq(4) padding(32) hdr_crc(4) = 64 bytes +EC_HDR_FMT = ">4sBxxx Q II I 32x I" +EC_HDR_SIZE = 64 + +# UBI VID header: magic(4) version(1) vol_type(1) copy_flag(1) compat(1) +# vol_id(4) lnum(4) padding(4) data_size(4) used_ebs(4) data_pad(4) +# data_crc(4) padding(4) sqnum(8) padding(12) hdr_crc(4) = 64 bytes +VID_HDR_FMT = ">4s B B B B I I 4x I I I I 4x Q 12x I" +VID_HDR_SIZE = 64 +VID_MAGIC = b"UBI!" # 0x55424921 + + +def is_ubi_image(data: bytes) -> bool: + """Check if data starts with UBI EC header magic.""" + return len(data) >= 4 and data[:4] == UBI_EC_MAGIC + + +def is_ubifs_image(data: bytes) -> bool: + """Check if data starts with UBIFS superblock magic.""" + return len(data) >= 4 and data[:4] == UBIFS_MAGIC + + +def extract_ubifs(ubi_data: bytes, peb_size: int = 0x20000, vol_id: int = 0) -> bytes: + """Extract UBIFS volume data from a raw UBI image. + + Parameters + ---------- + ubi_data: + Raw UBI image bytes (starts with ``UBI#`` magic). + peb_size: + Physical erase block size in bytes. Default 128KB (typical SPI NAND). + vol_id: + UBI volume ID to extract. Default 0 (first/only volume). + + Returns + ------- + bytes + UBIFS image suitable for ``ubi write``. + + Raises + ------ + ValueError + If the image is not a valid UBI image or the volume is not found. + """ + if not is_ubi_image(ubi_data): + raise ValueError("Not a UBI image (missing UBI# magic)") + + # Collect LEBs: lnum -> data + lebs: dict[int, bytes] = {} + + num_pebs = len(ubi_data) // peb_size + + for peb_idx in range(num_pebs): + peb_off = peb_idx * peb_size + + # Check EC header + if ubi_data[peb_off : peb_off + 4] != UBI_EC_MAGIC: + continue + + # Parse EC header for this PEB's offsets (they can vary) + ec = struct.unpack_from(EC_HDR_FMT, ubi_data, peb_off) + peb_vid_off = ec[3] + peb_data_off = ec[4] + + # Check VID header + vid_off = peb_off + peb_vid_off + if vid_off + VID_HDR_SIZE > len(ubi_data): + continue + if ubi_data[vid_off : vid_off + 4] != VID_MAGIC: + continue + + vid = struct.unpack_from(VID_HDR_FMT, ubi_data, vid_off) + # Fields: magic(0), version(1), vol_type(2), copy_flag(3), compat(4), + # vol_id(5), lnum(6), data_size(7), used_ebs(8), data_pad(9), + # data_crc(10), sqnum(11), hdr_crc(12) + peb_vol_id = vid[5] + lnum = vid[6] + + if peb_vol_id != vol_id: + continue + + # Extract LEB data + d_off = peb_off + peb_data_off + peb_leb_size = peb_size - peb_data_off + leb_data = ubi_data[d_off : d_off + peb_leb_size] + lebs[lnum] = leb_data + + if not lebs: + raise ValueError(f"No LEBs found for volume {vol_id}") + + # Assemble in LEB order + max_leb = max(lebs.keys()) + leb_size_actual = len(next(iter(lebs.values()))) + result = bytearray() + for i in range(max_leb + 1): + if i in lebs: + result.extend(lebs[i]) + else: + # Missing LEB — fill with 0xFF (erased NAND) + result.extend(b"\xff" * leb_size_actual) + + # Trim trailing 0xFF pages (unmapped LEBs at the end) + while len(result) > leb_size_actual and result[-leb_size_actual:] == b"\xff" * leb_size_actual: + result = result[:-leb_size_actual] + + ubifs = bytes(result) + if not is_ubifs_image(ubifs): + raise ValueError( + f"Extracted data does not start with UBIFS magic " + f"(got {ubifs[:4].hex()}, expected {UBIFS_MAGIC.hex()})" + ) + + return ubifs diff --git a/tests/test_ubi.py b/tests/test_ubi.py new file mode 100644 index 0000000..754a5e5 --- /dev/null +++ b/tests/test_ubi.py @@ -0,0 +1,161 @@ +"""Tests for UBI image parsing and UBIFS extraction.""" + +import struct +import zlib + +import pytest + +from defib.ubi import ( + EC_HDR_SIZE, + UBI_EC_MAGIC, + UBIFS_MAGIC, + VID_HDR_SIZE, + VID_MAGIC, + extract_ubifs, + is_ubi_image, + is_ubifs_image, +) + +PEB_SIZE = 0x20000 # 128KB +VID_HDR_OFFSET = 2048 +DATA_OFFSET = 4096 +LEB_SIZE = PEB_SIZE - DATA_OFFSET + + +def _make_ec_header(vid_hdr_offset: int = VID_HDR_OFFSET, data_offset: int = DATA_OFFSET) -> bytes: + """Build a minimal UBI EC header (64 bytes) with valid CRC.""" + # Pack without CRC first + hdr = struct.pack( + ">4sBxxx Q II I 32x", + UBI_EC_MAGIC, + 1, # version + 0, # erase counter + vid_hdr_offset, + data_offset, + 1, # image_seq + ) + crc = zlib.crc32(hdr[:60]) & 0xFFFFFFFF + return hdr[:60] + struct.pack(">I", crc) + + +def _make_vid_header(vol_id: int, lnum: int, data_size: int = 0) -> bytes: + """Build a minimal UBI VID header (64 bytes) with valid CRC.""" + hdr = struct.pack( + ">4s B B B B I I 4x I I I I 4x Q 12x", + VID_MAGIC, + 1, # version + 1, # vol_type (dynamic) + 0, # copy_flag + 0, # compat + vol_id, + lnum, + data_size, + 0, # used_ebs + 0, # data_pad + 0, # data_crc + 0, # sqnum + ) + crc = zlib.crc32(hdr[:60]) & 0xFFFFFFFF + return hdr[:60] + struct.pack(">I", crc) + + +def _make_peb(vol_id: int, lnum: int, leb_data: bytes) -> bytes: + """Build a complete PEB with EC header, VID header, and data.""" + peb = bytearray(PEB_SIZE) + # Fill with 0xFF (erased NAND) + for i in range(PEB_SIZE): + peb[i] = 0xFF + + ec = _make_ec_header() + peb[0 : len(ec)] = ec + + vid = _make_vid_header(vol_id, lnum, len(leb_data)) + peb[VID_HDR_OFFSET : VID_HDR_OFFSET + len(vid)] = vid + + peb[DATA_OFFSET : DATA_OFFSET + len(leb_data)] = leb_data + + return bytes(peb) + + +def _make_ubifs_superblock() -> bytes: + """Make a fake UBIFS superblock (just the magic + padding).""" + return UBIFS_MAGIC + b"\x00" * (LEB_SIZE - 4) + + +def test_is_ubi_image(): + assert is_ubi_image(UBI_EC_MAGIC + b"\x00" * 60) + assert not is_ubi_image(b"\x00\x00\x00\x00") + assert not is_ubi_image(UBIFS_MAGIC) + assert not is_ubi_image(b"") + + +def test_is_ubifs_image(): + assert is_ubifs_image(UBIFS_MAGIC + b"\x00" * 60) + assert not is_ubifs_image(UBI_EC_MAGIC) + assert not is_ubifs_image(b"") + + +def test_extract_single_leb(): + """Extract a single-LEB UBIFS volume.""" + leb_data = _make_ubifs_superblock() + peb = _make_peb(vol_id=0, lnum=0, leb_data=leb_data) + ubifs = extract_ubifs(peb, peb_size=PEB_SIZE, vol_id=0) + assert ubifs[:4] == UBIFS_MAGIC + assert len(ubifs) == LEB_SIZE + + +def test_extract_multiple_lebs(): + """Extract a multi-LEB volume with LEBs in arbitrary PEB order.""" + leb0 = _make_ubifs_superblock() + leb1 = b"\xAA" * LEB_SIZE + leb2 = b"\xBB" * LEB_SIZE + + # PEBs in reverse order + image = _make_peb(0, 2, leb2) + _make_peb(0, 0, leb0) + _make_peb(0, 1, leb1) + ubifs = extract_ubifs(image, peb_size=PEB_SIZE, vol_id=0) + + assert ubifs[:4] == UBIFS_MAGIC + assert ubifs[LEB_SIZE : LEB_SIZE + 1] == b"\xAA" + assert ubifs[2 * LEB_SIZE : 2 * LEB_SIZE + 1] == b"\xBB" + assert len(ubifs) == 3 * LEB_SIZE + + +def test_extract_filters_by_vol_id(): + """Only extract the requested volume ID.""" + leb_vol0 = _make_ubifs_superblock() + leb_vol1 = b"\xCC" * LEB_SIZE + + image = _make_peb(0, 0, leb_vol0) + _make_peb(1, 0, leb_vol1) + ubifs = extract_ubifs(image, peb_size=PEB_SIZE, vol_id=0) + assert ubifs[:4] == UBIFS_MAGIC + + +def test_extract_missing_volume_raises(): + """Raise ValueError if requested volume ID not found.""" + leb = _make_ubifs_superblock() + image = _make_peb(0, 0, leb) + with pytest.raises(ValueError, match="No LEBs found"): + extract_ubifs(image, peb_size=PEB_SIZE, vol_id=99) + + +def test_extract_not_ubi_raises(): + """Raise ValueError for non-UBI data.""" + with pytest.raises(ValueError, match="Not a UBI image"): + extract_ubifs(b"\x00" * PEB_SIZE, peb_size=PEB_SIZE) + + +def test_extract_skips_empty_pebs(): + """PEBs without VID header (erased) are skipped.""" + leb = _make_ubifs_superblock() + good_peb = _make_peb(0, 0, leb) + + # Erased PEB: EC header but no VID + erased = bytearray(PEB_SIZE) + for i in range(PEB_SIZE): + erased[i] = 0xFF + ec = _make_ec_header() + erased[0 : len(ec)] = ec + + image = bytes(erased) + good_peb + ubifs = extract_ubifs(image, peb_size=PEB_SIZE, vol_id=0) + assert ubifs[:4] == UBIFS_MAGIC From 0c98c32bf635f686112f4d94dab2c33eeb641b82 Mon Sep 17 00:00:00 2001 From: Dmitry Ilyin Date: Wed, 22 Apr 2026 23:35:54 +0300 Subject: [PATCH 2/2] Remove unused imports in test_ubi.py Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/test_ubi.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_ubi.py b/tests/test_ubi.py index 754a5e5..005cd65 100644 --- a/tests/test_ubi.py +++ b/tests/test_ubi.py @@ -6,10 +6,8 @@ import pytest from defib.ubi import ( - EC_HDR_SIZE, UBI_EC_MAGIC, UBIFS_MAGIC, - VID_HDR_SIZE, VID_MAGIC, extract_ubifs, is_ubi_image,