Skip to content
45 changes: 40 additions & 5 deletions src/google/adk/agents/config_agent_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ def _resolve_agent_class(agent_class: str) -> type[BaseAgent]:


_BLOCKED_YAML_KEYS = frozenset({"args"})

_BLOCKED_CODE_REFERENCE_MODULES = frozenset({
"builtins",
"importlib",
"os",
"pathlib",
"shutil",
"socket",
"subprocess",
"sys",
})

_ENFORCE_DENYLIST = False


Expand All @@ -89,8 +101,18 @@ def _set_enforce_denylist(value: bool) -> None:
_ENFORCE_DENYLIST = value


def _check_config_for_blocked_keys(node: Any, filename: str) -> None:
"""Recursively check if the configuration contains any blocked keys."""
def is_blocked_code_reference(value: Any) -> bool:
"""Return True if a config value references a blocked Python module."""
if not isinstance(value, str):
return False

module_path = value.rsplit(".", 1)[0] if "." in value else value
root_module = module_path.split(".", 1)[0]
return root_module in _BLOCKED_CODE_REFERENCE_MODULES


def check_config_for_blocked_keys(node: Any, filename: str) -> None:
"""Recursively check if the configuration contains blocked entries."""
if isinstance(node, dict):
for key, value in node.items():
if key in _BLOCKED_YAML_KEYS:
Expand All @@ -99,10 +121,23 @@ def _check_config_for_blocked_keys(node: Any, filename: str) -> None:
f"The '{key}' field is not allowed in agent configurations "
"because it can execute arbitrary code."
)
_check_config_for_blocked_keys(value, filename)

if key in ("name", "code") and is_blocked_code_reference(value):
raise ValueError(
f"Blocked code reference {value!r} found in {filename!r}. "
"References to unsafe Python modules are not allowed in "
"agent configurations."
)

check_config_for_blocked_keys(value, filename)
elif isinstance(node, list):
for item in node:
_check_config_for_blocked_keys(item, filename)
check_config_for_blocked_keys(item, filename)


def _check_config_for_blocked_keys(node: Any, filename: str) -> None:
"""Recursively check if the configuration contains any blocked keys."""
check_config_for_blocked_keys(node, filename)


def _load_config_from_path(config_path: str) -> AgentConfig:
Expand All @@ -126,7 +161,7 @@ def _load_config_from_path(config_path: str) -> AgentConfig:
config_data = yaml.safe_load(f)

if _ENFORCE_DENYLIST:
_check_config_for_blocked_keys(config_data, config_path)
check_config_for_blocked_keys(config_data, config_path)

return AgentConfig.model_validate(config_data)

Expand Down
9 changes: 9 additions & 0 deletions src/google/adk/agents/llm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,15 @@ def _resolve_tools(
obj = getattr(module, tool_config.name)
else:
# User-defined tools
from .config_agent_utils import is_blocked_code_reference

if is_blocked_code_reference(tool_config.name):
raise ValueError(
f'Blocked tool reference: {tool_config.name!r}. '
'References to unsafe Python modules are not allowed in '
'agent tool configurations.'
)

module_path, obj_name = tool_config.name.rsplit('.', 1)
module = importlib.import_module(module_path)
obj = getattr(module, obj_name)
Expand Down
21 changes: 3 additions & 18 deletions src/google/adk/cli/fast_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,33 +311,18 @@ def _has_parent_reference(path: str) -> bool:
# allows callers to pass arbitrary arguments to Python constructors and
# functions, which is an RCE vector when exposed through the builder UI.
# Block any upload that contains an `args` key anywhere in the document.
_BLOCKED_YAML_KEYS = frozenset({"args"})

def _check_yaml_for_blocked_keys(content: bytes, filename: str) -> None:
"""Raise if the YAML document contains any blocked keys."""
"""Raise if the YAML document contains blocked config entries."""
from google.adk.agents.config_agent_utils import check_config_for_blocked_keys
import yaml

try:
docs = list(yaml.safe_load_all(content))
except yaml.YAMLError as exc:
raise ValueError(f"Invalid YAML in {filename!r}: {exc}") from exc

def _walk(node: Any) -> None:
if isinstance(node, dict):
for key, value in node.items():
if key in _BLOCKED_YAML_KEYS:
raise ValueError(
f"Blocked key {key!r} found in {filename!r}. "
f"The '{key}' field is not allowed in builder uploads "
"because it can execute arbitrary code."
)
_walk(value)
elif isinstance(node, list):
for item in node:
_walk(item)

for doc in docs:
_walk(doc)
check_config_for_blocked_keys(doc, filename)

def _parse_upload_filename(filename: Optional[str]) -> tuple[str, str]:
if not filename:
Expand Down
43 changes: 43 additions & 0 deletions tests/unittests/agents/test_config_agent_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from google.adk.agents.config_agent_utils import check_config_for_blocked_keys
import pytest


def test_check_config_for_blocked_keys_rejects_args_key():
config = {
"name": "test_agent",
"model": "gemini-2.0-flash",
"tools": [{
"name": "some_project.tools.create_tool",
"args": [],
}],
}

with pytest.raises(ValueError, match="Blocked key 'args'"):
check_config_for_blocked_keys(config, "root_agent.yaml")


def test_check_config_for_blocked_keys_rejects_blocked_tool_module():
config = {
"name": "test_agent",
"model": "gemini-2.0-flash",
"tools": [{
"name": "subprocess.run",
}],
}

with pytest.raises(
ValueError, match="Blocked code reference 'subprocess.run'"
):
check_config_for_blocked_keys(config, "root_agent.yaml")


def test_check_config_for_blocked_keys_allows_non_blocked_tool_reference():
config = {
"name": "test_agent",
"model": "gemini-2.0-flash",
"tools": [{
"name": "my_project.tools.echo",
}],
}

check_config_for_blocked_keys(config, "root_agent.yaml")