Skip to content
38 changes: 36 additions & 2 deletions src/specify_cli/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,28 @@
}
)
EXTENSION_COMMAND_NAME_PATTERN = re.compile(r"^speckit\.([a-z0-9-]+)\.([a-z0-9-]+)$")
EXTENSION_ID_PATTERN = re.compile(r"^[a-z0-9-]+$")

# Characters allowed verbatim in a catalog-provided version when it is used to
# build a download filename. Anything else (path separators, "..", control
# chars, whitespace) is collapsed to "-" so an untrusted version cannot inject
# a nested path or an unwritable filename — we sanitize rather than rely on the
# post-hoc Path.resolve() containment check as the primary guard.
_UNSAFE_VERSION_TOKEN_PATTERN = re.compile(r"[^A-Za-z0-9._-]+")


def _safe_version_token(version: Any, fallback: str = "unknown") -> str:
"""Reduce a catalog ``version`` to a filename-safe token.

Non-strings (JSON null, numbers) and blank values degrade to ``fallback``.
Path separators and traversal tokens are stripped so the result can never
span directories or escape the download target.
"""
if not isinstance(version, str):
return fallback
token = _UNSAFE_VERSION_TOKEN_PATTERN.sub("-", version).strip("-.")
return token or fallback


VALID_EFFECTS = frozenset({"read-only", "read-write"})

Expand Down Expand Up @@ -217,7 +239,7 @@ def _validate(self):
raise ValidationError(f"Missing extension.{field}")

# Validate extension ID format
if not re.match(r"^[a-z0-9-]+$", ext["id"]):
if not EXTENSION_ID_PATTERN.match(ext["id"]):
raise ValidationError(
f"Invalid extension ID '{ext['id']}': "
"must be lowercase alphanumeric with hyphens only"
Expand Down Expand Up @@ -2576,6 +2598,14 @@ def download_extension(
"""
import urllib.error

if not isinstance(extension_id, str) or not EXTENSION_ID_PATTERN.match(
extension_id
):
raise ExtensionError(
f"Invalid extension ID '{extension_id}': "
"must be lowercase alphanumeric with hyphens only"
)

# Get extension info from catalog
ext_info = self.get_extension_info(extension_id)
if not ext_info:
Expand Down Expand Up @@ -2608,9 +2638,13 @@ def download_extension(
target_dir = self.cache_dir / "downloads"
target_dir.mkdir(parents=True, exist_ok=True)

version = ext_info.get("version", "unknown")
version = _safe_version_token(ext_info.get("version"))
zip_filename = f"{extension_id}-{version}.zip"
zip_path = target_dir / zip_filename
try:
zip_path.resolve().relative_to(target_dir.resolve())
except ValueError as e:
raise ExtensionError(f"Refusing unsafe extension ZIP path: {zip_filename}") from e

extra_headers = None
resolved_download_url = self._resolve_github_release_asset_api_url(download_url)
Expand Down
135 changes: 107 additions & 28 deletions src/specify_cli/extensions/_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,30 @@
extension_app.add_typer(catalog_app, name="catalog")


def _catalog_str(ext: dict, key: str, fallback: str = "") -> str:
"""Read a catalog string field, treating JSON null / non-strings / blanks
as missing so they fall back instead of rendering as the literal "None".

Catalog entries are untrusted (remote/community catalogs); ``dict.get(key,
default)`` only substitutes the default when the key is absent, so an
explicit ``null`` value would otherwise reach ``str()`` and print "None".
"""
value = ext.get(key)
if isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
return fallback


def _catalog_id(ext: dict) -> str:
"""Return a usable extension id, or "" when the catalog entry lacks a
valid one. An empty result means callers should skip the install hint (and
ideally the entry) rather than emit a command ``download_extension()`` will
refuse."""
return _catalog_str(ext, "id")


# Root helpers re-fetched at call time so test monkeypatching of
# `specify_cli.<name>` keeps working after the move.
def _require_specify_project(*args, **kwargs):
Expand Down Expand Up @@ -168,7 +192,7 @@ def _resolve_catalog_extension(

# Try by display name - search using argument as query, then filter for exact match
search_results = catalog.search(query=argument)
name_matches = [ext for ext in search_results if ext["name"].lower() == argument.lower()]
name_matches = [ext for ext in search_results if str(ext.get("name", "")).lower() == argument.lower()]

if len(name_matches) == 1:
return (name_matches[0], None)
Expand Down Expand Up @@ -207,22 +231,30 @@ def extension_list(
available: bool = typer.Option(False, "--available", help="Show available extensions from catalog"),
all_extensions: bool = typer.Option(False, "--all", help="Show both installed and available"),
):
"""List installed extensions."""
from . import ExtensionManager
"""List installed extensions, and available catalog extensions with --available/--all."""
from . import ExtensionManager, ExtensionCatalog, ExtensionError

project_root = _require_specify_project()
manager = ExtensionManager(project_root)
installed = manager.list_installed()

if not installed and not (available or all_extensions):
# Default (no flags) lists installed; --all also lists installed.
# --available alone lists only catalog extensions, not installed.
show_installed = all_extensions or not available
show_available = available or all_extensions

if not installed and not show_available:
console.print("[yellow]No extensions installed.[/yellow]")
console.print("\nInstall an extension with:")
console.print(" specify extension add <extension-name>")
return

if installed:
if show_installed:
console.print("\n[bold cyan]Installed Extensions:[/bold cyan]\n")

if not installed:
console.print(" [dim]No extensions installed.[/dim]")
console.print()
for ext in installed:
status_icon = "✓" if ext["enabled"] else "✗"
status_color = "green" if ext["enabled"] else "red"
Expand All @@ -233,9 +265,46 @@ def extension_list(
console.print(f" Commands: {ext['command_count']} | Hooks: {ext['hook_count']} | Priority: {ext['priority']} | Status: {'Enabled' if ext['enabled'] else 'Disabled'}")
console.print()

if available or all_extensions:
console.print("\nInstall an extension:")
console.print(" [cyan]specify extension add <name>[/cyan]")
if show_available:
# Query the catalog and show extensions that are not already installed.
catalog = ExtensionCatalog(project_root)
installed_ids = {ext["id"] for ext in installed}

try:
results = catalog.search()
except ExtensionError as e:
console.print(f"\n[red]Error:[/red] Could not query extension catalog: {_escape_markup(str(e))}")
console.print("[dim]The catalog may be temporarily unavailable. Try again later.[/dim]")
raise typer.Exit(1)

available_exts = [ext for ext in results if ext.get("id") not in installed_ids]

console.print("\n[bold cyan]Available Extensions:[/bold cyan]\n")
if not available_exts:
console.print(" [dim]No additional extensions available in the catalog.[/dim]")
else:
for ext in available_exts:
# Catalog fields are untrusted (remote/community catalogs); escape
# before embedding in Rich markup to prevent markup injection.
# A missing/blank id means the entry cannot be installed, so skip
# it entirely rather than printing a bogus install hint.
extension_id = _catalog_id(ext)
if not extension_id:
continue
safe_id = _escape_markup(extension_id)
verified_badge = " [green]✓ Verified[/green]" if ext.get("verified") else ""
safe_name = _escape_markup(_catalog_str(ext, "name", "(unnamed)"))
safe_version = _escape_markup(_catalog_str(ext, "version", "?"))
console.print(f" [bold]{safe_name}[/bold] (v{safe_version}){verified_badge}")
console.print(f" [dim]{safe_id}[/dim]")
console.print(f" {_escape_markup(_catalog_str(ext, 'description'))}")
install_allowed = ext.get("_install_allowed", True)
if install_allowed:
console.print(f" [cyan]Install:[/cyan] specify extension add {safe_id}")
else:
catalog_name = _escape_markup(_catalog_str(ext, "_catalog_name"))
console.print(f" [yellow]Discovery only — not installable from '{catalog_name}'[/yellow]")
console.print()


@catalog_app.command("list")
Expand Down Expand Up @@ -601,7 +670,7 @@ def extension_add(

# Download extension ZIP (use resolved ID, not original argument which may be display name)
extension_id = ext_info['id']
console.print(f"Downloading {_escape_markup(str(ext_info['name']))} v{_escape_markup(str(ext_info.get('version', 'unknown')))}...")
console.print(f"Downloading {_escape_markup(str(ext_info.get('name', extension_id)))} v{_escape_markup(str(ext_info.get('version', 'unknown')))}...")
zip_path = catalog.download_extension(extension_id)
Comment on lines 671 to 674

try:
Expand Down Expand Up @@ -751,13 +820,20 @@ def extension_search(
console.print(f"\n[green]Found {len(results)} extension(s):[/green]\n")

for ext in results:
# Catalog entries are untrusted; a missing/blank id cannot be
# installed or referenced, so skip it rather than emit a bogus
# install hint or crash on ext['id'].
extension_id = _catalog_id(ext)
if not extension_id:
continue

# Extension header
verified_badge = " [green]✓ Verified[/green]" if ext.get("verified") else ""
console.print(f"[bold]{_escape_markup(str(ext['name']))}[/bold] (v{_escape_markup(str(ext['version']))}){verified_badge}")
console.print(f" {_escape_markup(str(ext['description']))}")
console.print(f"[bold]{_escape_markup(_catalog_str(ext, 'name', '(unnamed)'))}[/bold] (v{_escape_markup(_catalog_str(ext, 'version', '?'))}){verified_badge}")
console.print(f" {_escape_markup(_catalog_str(ext, 'description'))}")

# Metadata
console.print(f"\n [dim]Author:[/dim] {_escape_markup(str(ext.get('author', 'Unknown')))}")
console.print(f"\n [dim]Author:[/dim] {_escape_markup(_catalog_str(ext, 'author', 'Unknown'))}")
if ext.get('tags'):
tags_str = ", ".join(str(t) for t in ext['tags'])
console.print(f" [dim]Tags:[/dim] {_escape_markup(tags_str)}")
Expand All @@ -776,7 +852,7 @@ def extension_search(
if ext.get('downloads') is not None:
stats.append(f"Downloads: {ext['downloads']:,}")
if ext.get('stars') is not None:
stats.append(f"Stars: {ext['stars']}")
stats.append(f"Stars: {_escape_markup(str(ext['stars']))}")
if stats:
console.print(f" [dim]{' | '.join(stats)}[/dim]")

Expand All @@ -785,7 +861,7 @@ def extension_search(
console.print(f" [dim]Repository:[/dim] {_escape_markup(str(ext['repository']))}")

# Install command (show warning if not installable)
safe_id = _escape_markup(str(ext['id']))
safe_id = _escape_markup(extension_id)
if install_allowed:
console.print(f"\n [cyan]Install:[/cyan] specify extension add {safe_id}")
else:
Expand Down Expand Up @@ -897,17 +973,17 @@ def _print_extension_info(ext_info: dict, manager):

# Header
verified_badge = " [green]✓ Verified[/green]" if ext_info.get("verified") else ""
console.print(f"\n[bold]{_escape_markup(str(ext_info['name']))}[/bold] (v{_escape_markup(str(ext_info['version']))}){verified_badge}")
console.print(f"\n[bold]{_escape_markup(_catalog_str(ext_info, 'name', '(unnamed)'))}[/bold] (v{_escape_markup(_catalog_str(ext_info, 'version', '?'))}){verified_badge}")
console.print(f"ID: {_escape_markup(str(ext_info['id']))}")
console.print()

# Description
console.print(f"{_escape_markup(str(ext_info['description']))}")
console.print(f"{_escape_markup(_catalog_str(ext_info, 'description'))}")
console.print()

# Author and License
console.print(f"[dim]Author:[/dim] {_escape_markup(str(ext_info.get('author', 'Unknown')))}")
console.print(f"[dim]License:[/dim] {_escape_markup(str(ext_info.get('license', 'Unknown')))}")
console.print(f"[dim]Author:[/dim] {_escape_markup(_catalog_str(ext_info, 'author', 'Unknown'))}")
console.print(f"[dim]License:[/dim] {_escape_markup(_catalog_str(ext_info, 'license', 'Unknown'))}")

# Category and Effect
if ext_info.get('category'):
Expand All @@ -923,23 +999,26 @@ def _print_extension_info(ext_info: dict, manager):
console.print()

# Requirements
if ext_info.get('requires'):
reqs = ext_info.get('requires')
if isinstance(reqs, dict) and reqs:
console.print("[bold]Requirements:[/bold]")
reqs = ext_info['requires']
if reqs.get('speckit_version'):
console.print(f" • Spec Kit: {_escape_markup(str(reqs['speckit_version']))}")
if reqs.get('tools'):
for tool in reqs['tools']:
tool_name = _escape_markup(str(tool['name']))
tools = reqs.get('tools')
if isinstance(tools, list):
for tool in tools:
if not isinstance(tool, dict):
continue
tool_name = _escape_markup(str(tool.get('name', '(unnamed)')))
tool_version = _escape_markup(str(tool.get('version', 'any')))
required = " (required)" if tool.get('required') else " (optional)"
console.print(f" • {tool_name}: {tool_version}{required}")
console.print()

# Provides
if ext_info.get('provides'):
provides = ext_info.get('provides')
if isinstance(provides, dict) and provides:
console.print("[bold]Provides:[/bold]")
provides = ext_info['provides']
if provides.get('commands'):
console.print(f" • Commands: {_escape_markup(str(provides['commands']))}")
if provides.get('hooks'):
Expand All @@ -957,7 +1036,7 @@ def _print_extension_info(ext_info: dict, manager):
if ext_info.get('downloads') is not None:
stats.append(f"Downloads: {ext_info['downloads']:,}")
if ext_info.get('stars') is not None:
stats.append(f"Stars: {ext_info['stars']}")
stats.append(f"Stars: {_escape_markup(str(ext_info['stars']))}")
if stats:
console.print(f"[bold]Statistics:[/bold] {' | '.join(stats)}")
console.print()
Expand Down Expand Up @@ -1064,8 +1143,8 @@ def extension_update(
continue

try:
catalog_version = pkg_version.Version(ext_info["version"])
except pkg_version.InvalidVersion:
catalog_version = pkg_version.Version(str(ext_info["version"]))
except (pkg_version.InvalidVersion, KeyError):
console.print(
f"⚠ {safe_ext_id}: Invalid catalog version '{_escape_markup(str(ext_info.get('version')))}' (skipping)"
)
Expand Down
Loading