Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions src/dayamlchecker/yaml_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
from dataclasses import dataclass, field
from pathlib import Path
from pyexpat import features
import re
import sys

Expand Down Expand Up @@ -1442,6 +1443,95 @@ def _find_unmatched_interview_order_references(
return unmatched


def _extract_mako_guards_by_line(content: str) -> dict[int, list[str]]:
"""Extract Mako % if conditions active at each line of a content string.

Returns a dict mapping line number (1-indexed) to list of active guard
condition strings at that line.
"""
guards_by_line: dict[int, list[str]] = {}
# Stack of (condition_str, start_line) for nested % if blocks
guard_stack: list[str] = []

for i, line in enumerate(content.splitlines(), start=1):
stripped = line.strip()
if stripped.startswith("% if ") or stripped.startswith("%if "):
# Extract condition — everything after "% if " or "%if "
cond = re.sub(r"^%\s*if\s+", "", stripped).rstrip(":")
guard_stack.append(cond)
elif stripped.startswith("% elif ") or stripped.startswith("%elif "):
if guard_stack:
guard_stack.pop()
cond = re.sub(r"^%\s*elif\s+", "", stripped).rstrip(":")
guard_stack.append(cond)
elif stripped in ("% endif", "%endif", "% endif", "% endfor"):
if guard_stack:
guard_stack.pop()

if guard_stack:
guards_by_line[i] = list(guard_stack)

return guards_by_line


def _find_unmatched_mako_references(
doc: dict[str, Any], conditional_fields: list[dict[str, Any]]
) -> list[tuple[str, int, str]]:
"""Check Mako text in question, subquestion, and attachment content blocks for unconditional references to variables that are only conditionally asked"""
if not conditional_fields:
return []

# collect (mako_text, source_label) pairs to check
text_blocks: list[tuple[str, str]] = []

question = _get_case_insensitive(doc, "question")
if isinstance(question, str):
text_blocks.append((question, "question"))

subquestion = _get_case_insensitive(doc, "subquestion")
if isinstance(subquestion, str):
text_blocks.append((subquestion, "subquestion"))

attachment = _get_case_insensitive(doc, "attachment")
attachments = _get_case_insensitive(doc, "attachments")

if isinstance(attachment, dict):
content = attachment.get("content")
if isinstance(content, str):
text_blocks.append((content, "attachment content"))
elif isinstance(attachment, list):
for item in attachment:
if isinstance(item, dict):
content = item.get("content")
if isinstance(content, str):
text_blocks.append((content, "attachment content"))

if isinstance(attachments, list):
for item in attachments:
if isinstance(item, dict):
content = item.get("content")
if isinstance(content, str):
text_blocks.append((content, "attachment content"))

if not text_blocks:
return []

unmatched: list[tuple[str, int, str]] = []
for text, source in text_blocks:
mako_guards_by_line = _extract_mako_guards_by_line(text)
for conditional in conditional_fields:
field_var = conditional["field_var"]
expected_guards = conditional["guards"]
ref_lines = _find_variable_reference_lines(text, field_var)
for ref_line in ref_lines:
active_guards = mako_guards_by_line.get(ref_line, [])
if _has_matching_guard(active_guards, expected_guards):
continue
unmatched.append((field_var, ref_line, source))

return unmatched


def _max_screen_visibility_nesting_depth(doc: dict[str, Any]) -> int:
fields = _get_case_insensitive(doc, "fields")
if not isinstance(fields, list):
Expand Down Expand Up @@ -1517,6 +1607,8 @@ def find_errors_from_string(
]
yaml_parser = _make_yaml_parser()
prior_conditional_fields: list[dict[str, Any]] = []
skip_undefined = False
seen_ids: dict[str, int] = {}
line_number = 1
for source_code in document_match.split(full_content):
lines_in_code = sum(l == "\n" for l in source_code)
Expand Down Expand Up @@ -1640,6 +1732,29 @@ def find_errors_from_string(
file_name=input_file,
)
)
features = _get_case_insensitive(doc, "features")
if isinstance(features, dict):
skip_val = features.get("skip undefined")
if skip_val is True or (
isinstance(skip_val, str) and skip_val.lower() == "true"
):
skip_undefined = True

block_id = _get_case_insensitive(doc, "id")
if isinstance(block_id, str) and block_id.strip():
block_id_clean = block_id.strip()
block_start = line_number + 1 if line_number > 1 else line_number
if block_id_clean in seen_ids:
all_errors.append(
YAMLError(
err_str=f'Duplicate block id "{block_id_clean}" - first used at line {seen_ids[block_id_clean]}, reused here. DA will silently use the later block, which is almost certainly a mistake',
line_number=block_start,
file_name=input_file,
experimental=False,
)
)
else:
seen_ids[block_id_clean] = block_start

unmatched_refs = _find_unmatched_interview_order_references(
doc, prior_conditional_fields
Expand All @@ -1656,6 +1771,23 @@ def find_errors_from_string(
)
)

if not skip_undefined:
unmatched_mako_refs = _find_unmatched_mako_references(
doc, prior_conditional_fields
)
for field_var, ref_line, source in unmatched_mako_refs:
all_errors.append(
YAMLError(
err_str=(
f'{source} references "{field_var}", but that field is only asked '
f"when certain conditions are met (show if/hide if). If those conditions are "
f"not met, the interview may get stuck"
),
line_number=doc["__line__"] + line_number + ref_line,
file_name=input_file,
)
)

nesting_depth = _max_screen_visibility_nesting_depth(doc)
if nesting_depth > 2:
all_errors.append(
Expand Down
Loading
Loading