diff --git a/cagent-schema.json b/cagent-schema.json new file mode 100644 index 000000000..9cc91296d --- /dev/null +++ b/cagent-schema.json @@ -0,0 +1,1321 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://github.com/cagent/cagent/blob/main/cagent-schema.json", + "title": "Cagent Configuration", + "description": "Configuration schema for Cagent v4", + "type": "object", + "properties": { + "version": { + "type": "string", + "description": "Configuration version", + "enum": [ + "0", + "1", + "2", + "3", + "4" + ], + "examples": [ + "0", + "1", + "2", + "3", + "4" + ] + }, + "providers": { + "type": "object", + "description": "Map of custom provider configurations. Providers define reusable defaults (base_url, token_key, api_type) that models can reference.", + "additionalProperties": { + "$ref": "#/definitions/ProviderConfig" + } + }, + "workflow": { + "type": "object", + "properties": { + "steps": { + "type": "array", + "description": "List of workflow steps", + "items": { + "$ref": "#/definitions/StepConfig" + } + }, + "max_loop_iterations": { + "type": "integer", + "description": "Maximum number of times a step can be re-executed due to a conditional back-edge (loop). Default: 100", + "minimum": 0 + } + }, + "additionalProperties": false, + "description": "Workflow configuration for sequential, conditional, and parallel step execution" + }, + "agents": { + "type": "object", + "description": "Map of agent configurations", + "additionalProperties": { + "$ref": "#/definitions/AgentConfig" + } + }, + "models": { + "type": "object", + "description": "Map of model configurations", + "additionalProperties": { + "$ref": "#/definitions/ModelConfig" + } + }, + "rag": { + "type": "object", + "description": "Map of RAG (Retrieval-Augmented Generation) configurations", + "additionalProperties": { + "$ref": "#/definitions/RAGConfig" + } + }, + "metadata": { + "$ref": "#/definitions/Metadata", + "description": "Configuration metadata" + }, + "permissions": { + "$ref": "#/definitions/PermissionsConfig", + "description": "Tool permission configuration for controlling tool approval behavior" + } + }, + "additionalProperties": false, + "definitions": { + "ProviderConfig": { + "type": "object", + "description": "Configuration for a custom model provider. Can be used for custom gateways", + "properties": { + "api_type": { + "type": "string", + "description": "The API schema type to use. Determines which API schema to use.", + "enum": [ + "openai_chatcompletions", + "openai_responses" + ], + "default": "openai_chatcompletions", + "examples": [ + "openai_chatcompletions", + "openai_responses" + ] + }, + "base_url": { + "type": "string", + "description": "Base URL for the provider's API endpoint (required)", + "format": "uri", + "examples": [ + "https://router.example.com/v1" + ] + }, + "token_key": { + "type": "string", + "description": "Environment variable name containing the API token. If not set, requests will be sent without authentication.", + "examples": [ + "CUSTOM_PROVIDER_API_KEY" + ] + } + }, + "required": ["base_url"], + "additionalProperties": false + }, + "AgentConfig": { + "type": "object", + "description": "Configuration for a single agent", + "properties": { + "model": { + "type": "string", + "description": "Model to use for this agent (can be just model name or provider/model format)", + "examples": [ + "gpt-4", + "openai/gpt-4o", + "anthropic/claude-sonnet-4-0", + "anthropic/claude-sonnet-4-5", + "claude" + ] + }, + "description": { + "type": "string", + "description": "Description of the agent" + }, + "welcome_message": { + "type": "string", + "description": "Optional welcome message to display when the agent starts" + }, + "toolsets": { + "type": "array", + "description": "List of toolsets available to the agent", + "items": { + "$ref": "#/definitions/Toolset" + } + }, + "instruction": { + "type": "string", + "description": "Instructions for the agent" + }, + "code_mode_tools": { + "type": "boolean", + "description": "Enable Code Mode for tools" + }, + "sub_agents": { + "type": "array", + "description": "List of sub-agents", + "items": { + "type": "string" + } + }, + "handoffs": { + "type": "array", + "description": "List of agents this agent can hand off the conversation to", + "items": { + "type": "string" + } + }, + "add_date": { + "type": "boolean", + "description": "Whether to add date information" + }, + "add_environment_info": { + "type": "boolean", + "description": "Whether to add environment information" + }, + "max_iterations": { + "type": "integer", + "description": "Maximum number of iterations", + "minimum": 0 + }, + "num_history_items": { + "type": "integer", + "description": "Number of history items to keep", + "minimum": 0 + }, + "add_prompt_files": { + "type": "array", + "description": "List of prompt files to add", + "items": { + "type": "string" + } + }, + "commands": { + "description": "Named prompts for /commands. Supports simple string format or advanced object format with description and instruction.", + "oneOf": [ + { + "type": "object", + "additionalProperties": { + "oneOf": [ + { + "type": "string", + "description": "Simple command format: the string becomes the instruction" + }, + { + "$ref": "#/definitions/CommandConfig" + } + ] + } + }, + { + "type": "array", + "items": { + "type": "object", + "additionalProperties": { + "oneOf": [ + { + "type": "string", + "description": "Simple command format: the string becomes the instruction" + }, + { + "$ref": "#/definitions/CommandConfig" + } + ] + } + } + } + ] + }, + "structured_output": { + "type": "object", + "description": "Structured output configuration for constraining model responses to a specific JSON schema. Supported by OpenAI (native) and Google Gemini (native). Anthropic requires prompt engineering or tool-based approaches.", + "properties": { + "name": { + "type": "string", + "description": "Name of the response format schema" + }, + "description": { + "type": "string", + "description": "Optional description of what the schema represents" + }, + "strict": { + "type": "boolean", + "description": "Enable strict schema adherence (OpenAI only). When true, all properties must be in required array.", + "default": false + }, + "schema": { + "type": "object", + "description": "JSON Schema object defining the structure of the response. Must include type, properties, and required fields.", + "required": [ + "type", + "properties" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "object" + ], + "description": "Schema type, must be 'object' for structured outputs" + }, + "properties": { + "type": "object", + "description": "Object properties with their schemas", + "additionalProperties": true + }, + "required": { + "type": "array", + "description": "List of required property names", + "items": { + "type": "string" + } + }, + "additionalProperties": { + "type": "boolean", + "description": "Whether additional properties are allowed", + "default": false + } + }, + "additionalProperties": true + } + }, + "required": [ + "name", + "schema" + ], + "additionalProperties": false + }, + "rag": { + "type": "array", + "description": "List of RAG sources to use for this agent", + "items": { + "type": "string" + } + }, + "hooks": { + "$ref": "#/definitions/HooksConfig", + "description": "Lifecycle hooks for executing shell commands at various points in the agent's execution" + } + }, + "additionalProperties": false + }, + "CommandConfig": { + "type": "object", + "description": "Advanced command configuration with description and instruction", + "properties": { + "description": { + "type": "string", + "description": "Description shown in completion dialogs and help text" + }, + "instruction": { + "type": "string", + "description": "The prompt sent to the agent. Supports bang commands (!`command`) and positional arguments ($1, $2, etc.)" + } + }, + "additionalProperties": false + }, + "HooksConfig": { + "type": "object", + "description": "Lifecycle hooks configuration for an agent. Hooks allow running shell commands at various points in the agent's execution lifecycle.", + "properties": { + "pre_tool_use": { + "type": "array", + "description": "Hooks that run before a tool is executed. Can allow/deny/modify tool calls.", + "items": { + "$ref": "#/definitions/HookMatcherConfig" + } + }, + "post_tool_use": { + "type": "array", + "description": "Hooks that run after a tool completes. Can provide validation or additional context.", + "items": { + "$ref": "#/definitions/HookMatcherConfig" + } + }, + "session_start": { + "type": "array", + "description": "Hooks that run when a session begins. Can load context or setup environment.", + "items": { + "$ref": "#/definitions/HookDefinition" + } + }, + "session_end": { + "type": "array", + "description": "Hooks that run when a session ends. Can perform cleanup or logging.", + "items": { + "$ref": "#/definitions/HookDefinition" + } + } + }, + "additionalProperties": false + }, + "HookMatcherConfig": { + "type": "object", + "description": "Configuration for matching tools and their associated hooks", + "properties": { + "matcher": { + "type": "string", + "description": "Regex pattern to match tool names (e.g., 'shell|edit_file'). Use '*' to match all tools. Case-sensitive.", + "examples": [ + "*", + "shell", + "shell|edit_file|write_file", + "mcp__.*" + ] + }, + "hooks": { + "type": "array", + "description": "Hooks to execute when the matcher matches", + "items": { + "$ref": "#/definitions/HookDefinition" + } + } + }, + "required": [ + "hooks" + ], + "additionalProperties": false + }, + "HookDefinition": { + "type": "object", + "description": "Definition of a single hook command", + "properties": { + "type": { + "type": "string", + "description": "Type of hook (currently only 'command' is supported)", + "enum": [ + "command" + ] + }, + "command": { + "type": "string", + "description": "Shell command to execute. Receives JSON input via stdin with tool/session information." + }, + "timeout": { + "type": "integer", + "description": "Execution timeout in seconds (default: 60)", + "minimum": 1, + "default": 60 + } + }, + "required": [ + "type", + "command" + ], + "additionalProperties": false + }, + "ModelConfig": { + "type": "object", + "description": "Configuration for a model", + "properties": { + "provider": { + "type": "string", + "description": "Model provider (e.g., openai, anthropic, dmr)", + "examples": [ + "openai", + "anthropic", + "dmr", + "ollama" + ] + }, + "model": { + "type": "string", + "description": "Model name" + }, + "temperature": { + "type": "number", + "description": "Sampling temperature", + "minimum": 0, + "maximum": 2 + }, + "max_tokens": { + "type": "integer", + "description": "Maximum number of tokens", + "minimum": 1 + }, + "top_p": { + "type": "number", + "description": "Top-p sampling parameter", + "minimum": 0, + "maximum": 1 + }, + "frequency_penalty": { + "type": "number", + "description": "Frequency penalty", + "minimum": -2, + "maximum": 2 + }, + "presence_penalty": { + "type": "number", + "description": "Presence penalty", + "minimum": -2, + "maximum": 2 + }, + "base_url": { + "type": "string", + "description": "Base URL for the model API", + "format": "uri" + }, + "parallel_tool_calls": { + "type": "boolean", + "description": "Whether to enable parallel tool calls" + }, + "token_key": { + "type": "string", + "description": "Token key for authentication" + }, + "provider_opts": { + "type": "object", + "description": "Provider-specific options. dmr: runtime_flags. anthropic/amazon-bedrock (Claude): interleaved_thinking (boolean, default true). openai/anthropic/google: rerank_prompt (string) to fully override the system prompt used for RAG reranking (advanced - prefer using results.reranking.criteria for domain-specific guidance).", + "additionalProperties": true + }, + "track_usage": { + "type": "boolean", + "description": "Whether to track usage" + }, + "thinking_budget": { + "description": "Controls reasoning effort/budget. Use 'none' or 0 to disable thinking. OpenAI: string levels ('minimal','low','medium','high'), default 'medium'. Anthropic: integer token budget (1024-32768), default 8192. Amazon Bedrock (Claude): same as Anthropic. Google Gemini 2.5: integer token budget (-1 for dynamic, 0 to disable, 24576 max), default -1. Google Gemini 3: string levels ('minimal' Flash only,'low','medium','high'), default 'high' for Pro, 'medium' for Flash.", + "oneOf": [ + { + "type": "string", + "enum": [ + "none", + "minimal", + "low", + "medium", + "high" + ], + "description": "Reasoning effort level (OpenAI, Gemini 3). Use 'none' to disable thinking." + }, + { + "type": "integer", + "minimum": -1, + "maximum": 32768, + "description": "Token budget for extended thinking (Anthropic, Bedrock Claude, Gemini 2.5). Use 0 to disable thinking." + } + ], + "examples": [ + "none", + 0, + "minimal", + "low", + "medium", + "high", + -1, + 1024, + 8192, + 32768 + ] + }, + "routing": { + "type": "array", + "description": "Routing rules for request-based model selection. When configured, this model becomes a router that selects the best model based on the user's input. The model's provider/model fields define the fallback model.", + "items": { + "$ref": "#/definitions/RoutingRule" + } + } + }, + "additionalProperties": false + }, + "RoutingRule": { + "type": "object", + "description": "A single routing rule that maps example phrases to a target model", + "properties": { + "model": { + "type": "string", + "description": "Model reference (another model name in the models section or inline spec like 'openai/gpt-4o')" + }, + "examples": { + "type": "array", + "description": "Example phrases that should trigger routing to this model", + "items": { + "type": "string" + } + } + }, + "required": [ + "model", + "examples" + ], + "additionalProperties": false + }, + "Metadata": { + "type": "object", + "description": "Configuration metadata", + "properties": { + "author": { + "type": "string", + "description": "Author of the configuration" + }, + "license": { + "type": "string", + "description": "License for the configuration" + }, + "readme": { + "type": "string", + "description": "README or description" + } + }, + "additionalProperties": false + }, + "PermissionsConfig": { + "type": "object", + "description": "Tool permission configuration. Controls tool call approval behavior with optional argument matching.", + "properties": { + "allow": { + "type": "array", + "description": "Tool patterns that are auto-approved without user confirmation. Supports tool names with glob patterns (e.g., 'read_*') and argument matching (e.g., 'shell:cmd=ls*' to allow shell commands starting with 'ls'). MCP tools can use qualified names (e.g., 'mcp:github:*').", + "items": { + "type": "string" + }, + "examples": [ + ["shell:cmd=ls*", "shell:cmd=git status*", "shell:cmd=go test*"], + ["mcp:github:get_*", "mcp:github:list_*"], + ["think", "create_todo*", "list_todos"] + ] + }, + "deny": { + "type": "array", + "description": "Tool patterns that are always rejected. Takes priority over allow patterns. Supports the same pattern syntax as allow: tool names with globs and argument matching (e.g., 'shell:cmd=rm -rf*' to block dangerous rm commands).", + "items": { + "type": "string" + }, + "examples": [ + ["shell:cmd=rm -rf*", "shell:cmd=sudo*"], + ["shell:cmd=git push --force*", "shell:cmd=git reset --hard*"], + ["mcp:github:delete_*"] + ] + } + }, + "additionalProperties": false + }, + "Toolset": { + "type": "object", + "description": "Tool configuration", + "properties": { + "type": { + "type": "string", + "description": "Type of tool", + "enum": [ + "mcp", + "script", + "think", + "memory", + "filesystem", + "shell", + "todo", + "fetch", + "api", + "a2a", + "lsp", + "user_prompt" + ] + }, + "instruction": { + "type": "string", + "description": "Additional instruction on how to use this toolset" + }, + "toon": { + "type": "string", + "description": "A comma-delimited list of regular expressions of tools to toonify" + }, + "ref": { + "type": "string", + "description": "Reference to external tool (e.g., docker:context7)", + "pattern": "^docker:" + }, + "config": { + "description": "Tool-specific configuration" + }, + "command": { + "type": "string", + "description": "Command to execute for MCP tools" + }, + "remote": { + "$ref": "#/definitions/Remote", + "description": "Remote tool configuration" + }, + "args": { + "type": "array", + "description": "Arguments for the tool", + "items": { + "type": "string" + } + }, + "tools": { + "type": "array", + "description": "List of tools to include", + "items": { + "type": "string" + } + }, + "env": { + "type": "object", + "description": "Environment variables", + "additionalProperties": { + "type": "string" + } + }, + "shared": { + "type": "boolean", + "description": "Whether the tool is shared (for think tool)" + }, + "path": { + "type": "string", + "description": "Path for memory tool" + }, + "shell": { + "type": "object", + "description": "Shell script configurations (for script tool)", + "patternProperties": { + "^[A-Za-z_][A-Za-z0-9_\\-]*$": { + "$ref": "#/definitions/ScriptShellToolConfig" + } + }, + "additionalProperties": false + }, + "post_edit": { + "type": "array", + "description": "Post-edit commands for filesystem tool", + "items": { + "$ref": "#/definitions/PostEditConfig" + } + }, + "api_config": { + "$ref": "#/definitions/ApiConfig", + "description": "API tool configuration" + }, + "ignore_vcs": { + "type": "boolean", + "description": "Whether to ignore VCS files (.git directories and .gitignore patterns) in filesystem operations. Default: true", + "default": true + }, + "defer": { + "description": "Enable deferred loading for tools in this toolset. Set to true to defer all tools, or an array of tool names to defer only those tools. Deferred tools are not loaded into the agent's context immediately, but can be discovered and loaded on-demand using search_tool and add_tool.", + "oneOf": [ + { + "type": "boolean", + "description": "Set to true to defer all tools" + }, + { + "type": "array", + "description": "Array of tool names to defer", + "items": { + "type": "string" + } + } + ], + "examples": [ + true, + ["read_file", "write_file"] + ] + }, + "timeout": { + "type": "integer", + "description": "Timeout in seconds for the fetch tool", + "minimum": 1 + }, + "url": { + "type": "string", + "description": "URL for the a2a tool", + "format": "uri" + }, + "name": { + "type": "string", + "description": "Name for the a2a tool" + }, + "sandbox": { + "$ref": "#/definitions/SandboxConfig", + "description": "Sandbox configuration for running shell commands in a Docker container (shell tool only)" + } + }, + "additionalProperties": false, + "anyOf": [ + { + "allOf": [ + { + "properties": { + "type": { + "const": "mcp" + } + } + }, + { + "anyOf": [ + { + "required": [ + "command" + ] + }, + { + "required": [ + "remote" + ] + }, + { + "required": [ + "ref" + ] + } + ] + } + ] + }, + { + "properties": { + "type": { + "enum": [ + "shell", + "filesystem", + "todo", + "think", + "memory", + "script", + "fetch", + "user_prompt" + ] + } + } + }, + { + "allOf": [ + { + "properties": { + "type": { + "const": "lsp" + } + } + }, + { + "required": [ + "command" + ] + } + ] + }, + { + "allOf": [ + { + "properties": { + "type": { + "const": "api" + } + } + }, + { + "required": [ + "api_config" + ] + } + ] + }, + { + "allOf": [ + { + "properties": { + "type": { + "const": "a2a" + } + } + }, + { + "required": [ + "url" + ] + } + ] + } + ] + }, + "Remote": { + "type": "object", + "description": "Remote tool configuration", + "properties": { + "url": { + "type": "string", + "description": "URL for the remote tool", + "format": "uri" + }, + "transport_type": { + "type": "string", + "description": "Transport type for the remote connection" + }, + "headers": { + "type": "object", + "description": "HTTP headers for remote requests", + "additionalProperties": { + "type": "string" + } + } + }, + "required": [ + "url" + ], + "additionalProperties": false + }, + "SandboxConfig": { + "type": "object", + "description": "Configuration for running shell commands inside a sandboxed Docker container", + "properties": { + "image": { + "type": "string", + "description": "Docker image to use for the sandbox container. Defaults to 'alpine:latest' if not specified.", + "examples": [ + "alpine:latest", + "ubuntu:22.04", + "python:3.12-alpine", + "node:20-alpine" + ] + }, + "paths": { + "type": "array", + "description": "List of paths to bind-mount into the container. Each path can have an optional ':ro' suffix for read-only access (default is read-write ':rw'). Relative paths are resolved from the agent's working directory.", + "items": { + "type": "string" + }, + "minItems": 1, + "examples": [ + [".", "/tmp"], + ["./src", "./config:ro"], + ["/data:rw", "/secrets:ro"] + ] + } + }, + "required": [ + "paths" + ], + "additionalProperties": false + }, + "ScriptShellToolConfig": { + "type": "object", + "description": "Configuration for custom shell tool", + "properties": { + "cmd": { + "type": "string", + "description": "Command to execute" + }, + "description": { + "type": "string", + "description": "Description of the shell tool" + }, + "args": { + "type": "object", + "description": "Arguments schema (passed as properties in JSON schema)", + "additionalProperties": true + }, + "required": { + "type": "array", + "description": "Required arguments", + "items": { + "type": "string" + } + }, + "env": { + "type": "object", + "description": "Environment variables for the command", + "additionalProperties": { + "type": "string" + } + }, + "working_dir": { + "type": "string", + "description": "Working directory for the command" + } + }, + "additionalProperties": false + }, + "PostEditConfig": { + "type": "object", + "description": "Post-edit command configuration", + "properties": { + "path": { + "type": "string", + "description": "Path pattern for files to apply post-edit command" + }, + "cmd": { + "type": "string", + "description": "Command to execute after edit" + } + }, + "required": [ + "path", + "cmd" + ], + "additionalProperties": false + }, + "ApiConfig": { + "type": "object", + "description": "API tool configuration for making HTTP requests to external APIs", + "properties": { + "name": { + "type": "string", + "description": "Name of the API tool" + }, + "instruction": { + "type": "string", + "description": "Instructions for using the API tool" + }, + "endpoint": { + "type": "string", + "description": "API endpoint URL", + "format": "uri" + }, + "method": { + "type": "string", + "description": "HTTP method", + "enum": [ + "GET", + "POST", + "PUT", + "PATCH", + "DELETE" + ] + }, + "headers": { + "type": "object", + "description": "HTTP headers for the request", + "additionalProperties": { + "type": "string" + } + }, + "args": { + "type": "object", + "description": "Arguments schema for the API call", + "additionalProperties": { + "type": "object", + "properties": { + "type": { + "type": "string", + "description": "Argument type" + }, + "description": { + "type": "string", + "description": "Argument description" + } + } + } + }, + "required": { + "type": "array", + "description": "Required argument names", + "items": { + "type": "string" + } + }, + "output_schema": { + "type": "object", + "description": "JSON Schema describing the API tool's output. Used by MCP/Code Mode; tool responses are still returned as strings at runtime.", + "additionalProperties": true + } + }, + "required": [ + "name", + "endpoint", + "method" + ], + "additionalProperties": false + }, + "RAGConfig": { + "type": "object", + "description": "RAG (Retrieval-Augmented Generation) configuration for document search and retrieval with pluggable strategies. Multiple strategies enable hybrid retrieval and reranking.", + "properties": { + "tool": { + "type": "object", + "description": "Tool configuration for the RAG source", + "properties": { + "name": { + "type": "string", + "description": "Custom name for the tool (defaults to RAG source name if not specified)" + }, + "description": { + "type": "string", + "description": "Description of what the tool does (shown to the LLM when selecting tools)" + }, + "instruction": { + "type": "string", + "description": "Instruction on how the RAG tool should be used effectively (shown in system prompt)" + } + }, + "additionalProperties": false + }, + "docs": { + "type": "array", + "description": "Shared document paths or directories indexed by all strategies", + "items": { + "type": "string" + } + }, + "respect_vcs": { + "type": "boolean", + "description": "Whether to respect VCS ignore files (e.g., .gitignore) when collecting documents for indexing. When true (default), files matching ignore patterns will be excluded. Can be overridden per-strategy.", + "default": true + }, + "strategies": { + "type": "array", + "description": "Array of retrieval strategy configurations. Each strategy can have different parameters based on its type.", + "minItems": 1, + "items": { + "type": "object", + "description": "Retrieval strategy configuration with type-specific parameters. Structured fields are limited; additional parameters are passed through as-is for strategy-specific use.", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "description": "Retrieval strategy type", + "enum": [ + "bm25", + "chunked-embeddings", + "semantic-embeddings" + ] + }, + "embedding_model": { + "type": "string", + "description": "Embedding model reference for chunked-embeddings and semantic-embeddings strategies (looked up in models map, or 'auto' for automatic selection)", + "examples": [ + "openai/text-embedding-3-small", + "dmr/embeddinggemma", + "auto" + ] + }, + "docs": { + "type": "array", + "description": "Additional documents for this strategy only (augments shared docs)", + "items": { + "type": "string" + } + }, + "database": { + "type": "string", + "description": "Database path or connection string. Currently only simple string values are supported (e.g., './vector.db', './bm25.db')." + }, + "similarity_metric": { + "type": "string", + "description": "Similarity metric (chunked-embeddings only). Currently only 'cosine_similarity' is implemented.", + "enum": [ + "cosine_similarity" + ] + }, + "vector_dimensions": { + "type": "integer", + "description": "Vector dimensions for embeddings (chunked-embeddings only). Must match your embedding model's output dimensions and is required for chunked-embeddings strategies.", + "minimum": 1, + "examples": [ + 1536, + 3072, + 1024, + 768 + ] + }, + "k1": { + "type": "number", + "description": "BM25 term frequency saturation (bm25 only, typically 1.2-2.0)", + "minimum": 0 + }, + "b": { + "type": "number", + "description": "BM25 length normalization (bm25 only, 0-1, typically 0.75)", + "minimum": 0, + "maximum": 1 + }, + "threshold": { + "type": "number", + "description": "Minimum score threshold (0-1 for chunked-embeddings, unbounded for bm25)", + "minimum": 0 + }, + "limit": { + "type": "integer", + "description": "Max results from this strategy (candidates for fusion). If unset, defaults to 5 in the implementation.", + "minimum": 1 + }, + "chunking": { + "type": "object", + "description": "Text chunking configuration", + "properties": { + "size": { + "type": "integer", + "description": "Chunk size in characters. If unset, defaults to 1000 in the implementation.", + "minimum": 1 + }, + "overlap": { + "type": "integer", + "description": "Overlap between chunks in characters. If unset, defaults to 75 in the implementation.", + "minimum": 0 + }, + "respect_word_boundaries": { + "type": "boolean", + "description": "When true, chunks will split on the nearest whitespace boundary instead of at the exact character limit, preventing words from being truncated." + }, + "code_aware": { + "type": "boolean", + "description": "Enable code-aware chunking for source files. When true, the chunking strategy will prefer AST-based or language-aware processors when available (tree-sitter based), and fall back to plain text chunking for unsupported languages." + } + }, + "additionalProperties": false + }, + "embedding_batch_size": { + "type": "integer", + "description": "Number of text chunks to send to the embedding API in a single request (chunked-embeddings/semantic-embeddings only)", + "minimum": 1, + "default": 50 + }, + "max_embedding_concurrency": { + "type": "integer", + "description": "Maximum concurrent embedding batch API requests. For semantic-embeddings, also controls parallel LLM calls for generating chunk summaries.", + "minimum": 1, + "default": 3 + }, + "max_indexing_concurrency": { + "type": "integer", + "description": "Maximum number of files to index in parallel during initialization", + "minimum": 1, + "default": 3 + }, + "respect_vcs": { + "type": "boolean", + "description": "Override the RAG-level respect_vcs setting for this strategy only." + }, + "chat_model": { + "type": "string", + "description": "Chat model used to generate semantic representations for each chunk (semantic-embeddings only, required)", + "examples": [ + "anthropic/claude-sonnet-4-5", + "openai/gpt-4o-mini" + ] + }, + "semantic_prompt": { + "type": "string", + "description": "Custom prompt template for semantic LLM. Uses JavaScript template literal syntax with the following placeholders: ${path} (full source file path), ${basename} (base name of file), ${chunk_index} (numeric chunk index), ${content} (raw chunk content), ${ast_context} (AST metadata when ast_context is enabled). Only applicable to semantic-embeddings strategy." + }, + "ast_context": { + "type": "boolean", + "description": "Include TreeSitter-derived AST metadata in the semantic prompt (semantic-embeddings only, requires chunking.code_aware for best results)", + "default": false + } + }, + "additionalProperties": true + } + }, + "results": { + "type": "object", + "description": "Result post-processing configuration (fusion, deduplication, limiting). If omitted, sensible defaults are applied in code.", + "properties": { + "limit": { + "type": "integer", + "description": "Maximum number of results to return (top K)", + "minimum": 1, + "default": 15 + }, + "fusion": { + "type": "object", + "description": "Configuration for combining results from multiple strategies. If omitted and multiple strategies are configured, Reciprocal Rank Fusion (rrf) with k=60 is used.", + "properties": { + "strategy": { + "type": "string", + "description": "Fusion strategy to use", + "enum": [ + "rrf", + "reciprocal_rank_fusion", + "weighted", + "max" + ], + "default": "rrf", + "examples": [ + "rrf", + "weighted" + ] + }, + "k": { + "type": "integer", + "description": "RRF smoothing parameter k (only for RRF strategy)", + "minimum": 1, + "default": 60 + }, + "weights": { + "type": "object", + "description": "Strategy weights for weighted fusion (strategy name -> weight)", + "additionalProperties": { + "type": "number", + "minimum": 0, + "maximum": 1 + }, + "examples": [ + { + "chunked-embeddings": 0.7, + "bm25": 0.3 + } + ] + } + }, + "additionalProperties": false + }, + "reranking": { + "type": "object", + "description": "Configuration for reranking results using a specialized reranking model. Reranking re-scores the retrieved results to improve relevance accuracy.", + "properties": { + "model": { + "type": "string", + "description": "Model reference for reranking (can be inline like 'dmr/model-name' or a reference to a defined model)", + "examples": [ + "dmr/hf.co/ggml-org/Qwen3-Reranker-0.6B-Q8_0-GGUF", + "reranker_model" + ] + }, + "top_k": { + "type": "integer", + "description": "Optional: only rerank top K results for efficiency. When unset or 0, defaults to the global results.limit (which itself defaults to 15).", + "minimum": 0, + "default": 0 + }, + "threshold": { + "type": "number", + "description": "Optional: minimum score threshold after reranking (filter results below this score)", + "minimum": 0, + "maximum": 1, + "default": 0.5 + }, + "criteria": { + "type": "string", + "description": "Optional: domain-specific relevance criteria to guide scoring. This text is appended to the base reranking prompt to customize what 'relevance' means for your use case. Supported by OpenAI, Anthropic, and Gemini providers (not DMR native reranking).", + "examples": [ + "Prioritize recent information and practical examples over historical context", + "When scoring relevance, focus on code examples and implementation details" + ] + } + }, + "required": [ + "model" + ], + "additionalProperties": false + }, + "deduplicate": { + "type": "boolean", + "description": "Remove duplicate documents across strategies", + "default": true + }, + "include_score": { + "type": "boolean", + "description": "Include relevance scores in results", + "default": false + }, + "return_full_content": { + "type": "boolean", + "description": "Return full document content instead of just the matched chunk. The full document is read directly from the file system.", + "default": false + } + }, + "additionalProperties": false + } + }, + "required": [ + "strategies" + ], + "additionalProperties": false + } + } +} diff --git a/cmd/root/run.go b/cmd/root/run.go index 9cada080e..646179913 100644 --- a/cmd/root/run.go +++ b/cmd/root/run.go @@ -31,6 +31,8 @@ import ( "github.com/docker/docker-agent/pkg/tui" "github.com/docker/docker-agent/pkg/tui/styles" "github.com/docker/docker-agent/pkg/userconfig" + "github.com/docker/docker-agent/pkg/workflow" + "github.com/docker/docker-agent/pkg/workflowrun" ) type runExecFlags struct { @@ -62,9 +64,12 @@ type runExecFlags struct { // Run only hideToolResults bool - lean bool - listenAddr string - onEventSpecs []string + + // Workflow: set when config has workflow; exec mode runs workflow instead of single agent + workflowConfig *workflow.Workflow + lean bool + listenAddr string + onEventSpecs []string // globalPermissions holds the user-level global permission checker built // from user config settings. Nil when no global permissions are configured. @@ -449,6 +454,14 @@ func (f *runExecFlags) handleExecMode(ctx context.Context, out *cli.Printer, rt userMessages = args[1:] } + if f.workflowConfig != nil { + var userMsg string + if len(args) > 1 { + userMsg = args[1] + } + return f.runExecWorkflow(ctx, out, rt, sess, userMsg) + } + err := cli.Run(ctx, out, cli.Config{ AppName: AppName, AttachmentPath: f.attachmentPath, @@ -462,6 +475,71 @@ func (f *runExecFlags) handleExecMode(ctx context.Context, out *cli.Printer, rt return err } +// runExecWorkflow runs the workflow executor and prints events to out (exec mode only). +func (f *runExecFlags) runExecWorkflow(ctx context.Context, out *cli.Printer, rt runtime.Runtime, sess *session.Session, userMessage string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + msg, _ := cli.PrepareUserMessage(ctx, rt, userMessage, f.attachmentPath) + sess.AddMessage(msg) + + exec := workflowrun.NewLocalExecutor(rt) + events := make(chan workflowrun.Event, 128) + go func() { + defer close(events) + if _, err := exec.Run(ctx, f.workflowConfig, sess, events); err != nil { + events <- runtime.Error(err.Error()) + } + }() + + var lastErr error + firstAgent := true + lastAgentName := "" + for event := range events { + if errEvent, ok := event.(*runtime.ErrorEvent); ok { + lastErr = fmt.Errorf("%s", errEvent.Error) + out.PrintError(lastErr) + continue + } + var agentName string + if re, ok := event.(runtime.Event); ok { + agentName = re.GetAgentName() + } + if agentName != "" && (firstAgent || agentName != lastAgentName) { + if !firstAgent { + out.Println() + } + out.PrintAgentName(agentName) + firstAgent = false + lastAgentName = agentName + } + switch e := event.(type) { + case *runtime.AgentChoiceEvent: + out.Print(e.Content) + case *runtime.AgentChoiceReasoningEvent: + out.Print(e.Content) + case *runtime.ToolCallConfirmationEvent: + if !f.autoApprove { + rt.Resume(ctx, runtime.ResumeReject("")) + } else { + rt.Resume(ctx, runtime.ResumeApprove()) + } + case *runtime.ToolCallEvent: + if !f.hideToolCalls { + out.PrintToolCall(e.ToolCall) + } + case *runtime.ToolCallResponseEvent: + if !f.hideToolCalls { + out.PrintToolCallResponse(e.AgentName, e.Response) + } + } + } + + if lastErr != nil { + return RuntimeError{Err: lastErr} + } + return nil +} + func readInitialMessage(args []string) (*string, error) { if len(args) < 2 { return nil, nil diff --git a/docs/workflow-module.md b/docs/workflow-module.md new file mode 100644 index 000000000..b2b52e397 --- /dev/null +++ b/docs/workflow-module.md @@ -0,0 +1,317 @@ +# Cagent Workflow Module + +This document designs the three core workflow execution patterns in docker/cagent and answers implementation-specific use cases. + +## Overview + +Workflows define **declarative pipelines** of agents and conditions. Execution is driven by the runtime: each step runs an agent (or evaluates a condition), and step outputs flow to the next step according to the pattern. + +## 1. Sequential Step Execution + +**Description:** Agents execute one after another in a linear chain. Each agent's output becomes available as input context for the next agent. + +**Example:** + +```yaml +workflow: + - type: agent + name: generator + - type: agent + name: translator + - type: agent + name: publisher +``` + +**Behavior:** + +- `generator` runs first and completes. +- `translator` receives `generator`'s output as context in its prompt and processes it. +- `publisher` receives `translator`'s output as context and finalizes. + +**Output propagation:** Each step automatically receives **all prior step outputs** injected as context into its user message. The executor collects the last assistant message content from each completed step and formats it as a structured context block: + +``` +--- Prior Step Outputs --- + +[step_id (agent: generator)]: + + +--- End Prior Step Outputs --- + + +``` + +Outputs are also accessible via template expressions: `{{ $steps..output }}`. + +--- + +## 2. Conditional Branching & Loops + +**Description:** The workflow branches based on condition evaluation. When a condition's branch routes back to an earlier step (by step ID or index), it creates a **loop**. Conditions reference step outputs via templates. + +**Example:** + +```yaml +workflow: + - id: gen + type: agent + name: generator + - id: trans + type: agent + name: translator + - id: qa_check + type: condition + name: qa_check + condition: "{{ $steps.qa.output.is_approved }}" + true: + - type: agent + name: publisher + false: + - id: back_to_trans + type: agent + name: translator + - id: qa + type: agent + name: qa_agent +``` + +**Behavior:** + +- After `translator`, the `qa_check` condition runs (using `qa_agent` output when referenced by `$steps.qa.output`). +- If `is_approved == true`: workflow proceeds to `publisher`. +- If `is_approved == false`: workflow routes to the step that runs `translator` again (retry loop). + +**Condition schema:** Conditions are evaluated after the step(s) that produce the referenced output. The condition expression uses a small expression language (e.g. `{{ $steps..output. }}`) and must resolve to a boolean. Schema validation ensures referenced step IDs exist and that structured output (e.g. `is_approved`) is declared where needed (e.g. via agent `structured_output`). + +--- + +## 3. Parallel Step Execution + +**Description:** Multiple steps run concurrently. The workflow waits for **all** parallel steps to complete before moving to the next sequential step. + +**Example:** + +```yaml +workflow: + - type: parallel + id: par_gen + steps: + - id: gen_1 + type: agent + name: generator + - id: gen_2 + type: agent + name: generator + - type: agent + name: translator +``` + +**Behavior:** + +- Two `generator` agents run concurrently in separate goroutines. +- Both must complete before `translator` starts. +- `translator` receives **outputs from all parallel steps** as context in its prompt (see "Output structure from parallel steps" below). + +**Concurrency safety:** Parallel steps use two mechanisms to avoid races: +1. A **`runnerMu` mutex** on the executor serializes `SetCurrentAgent` + `RunStream` calls so each goroutine's internal runtime captures the correct agent name. +2. Each parallel goroutine uses a **sub-session** (`ParentID` set), causing `PersistentRuntime` to skip all SQLite persistence for those sessions. +3. The **`SQLiteSessionStore`** has a `sync.Mutex` on all write methods as an additional safety net. + +**Error handling:** If **any** agent in a parallel block fails, the **entire workflow** fails immediately (all-or-nothing). No partial success; this keeps data consistency and avoids downstream agents seeing incomplete data. + +--- + +## Use Case: How deep can loops go? (max iteration count) + +**Answer:** Loops are bounded by a **max loop iterations** setting. + +- **Config:** `workflow.max_loop_iterations` (default: `100`). Optional per-workflow override: `workflow.overrides.max_loop_iterations`. +- **Semantics:** A "loop" is one execution of a cycle (e.g. trans → qa_check → trans). The executor counts how many times the **same step ID** has been executed in a cycle. When that count reaches `max_loop_iterations`, the workflow fails with a deterministic error (e.g. `workflow: max loop iterations exceeded (step: trans, limit: 100)`). +- **Scope:** The count is per logical loop (per back-edge in the workflow graph), not global across all steps. + +This prevents infinite loops while allowing retries (e.g. QA reject → translator) up to a clear limit. + +--- + +## Use Case: Can we nest parallel blocks? + +**Answer:** **Yes.** Parallel steps are just steps; their children can be any step type, including another `parallel`. + +**Example:** + +```yaml +workflow: + - type: parallel + id: outer + steps: + - type: agent + name: generator + - type: parallel + id: inner + steps: + - type: agent + name: researcher + - type: agent + name: summarizer + - type: agent + name: publisher +``` + +**Behavior:** `generator` runs in parallel with the inner parallel block (`researcher` and `summarizer`). All three agent outputs are available to `publisher` (see output structure below). Failure of any of the three fails the whole workflow. + +--- + +## Use Case: How are outputs from multiple parallel agents structured when passed to the next step? + +**Answer:** Outputs from a parallel block are passed as a **keyed map** by step ID (and optionally by index for backwards compatibility). + +**Structure:** + +```json +{ + "steps": { + "gen_1": { "output": "", "agent": "generator" }, + "gen_2": { "output": "", "agent": "generator" } + }, + "order": ["gen_1", "gen_2"] +} +``` + +- **Next step input:** The next agent receives all parallel outputs injected as context in its user message: + ``` + --- Prior Step Outputs --- + + [par_gen/gen_1 (agent: generator)]: + + + [par_gen/gen_2 (agent: generator)]: + + + --- End Prior Step Outputs --- + + + ``` +- **Templates:** In conditions or in agent instructions, parallel outputs are accessed as: + - `{{ $steps.par_gen.outputs.gen_1.output }}` — output of parallel step `gen_1` + - `{{ $steps.par_gen.outputs.gen_2.output }}` + - Or by index: `{{ $steps.par_gen.outputs[0].output }}` (using `order` for deterministic indexing). + +So: **one structured object** keyed by step ID (and ordered list for index-based access), passed as context to the next step. + +--- + +## Use Case: What retry behavior exists for failed steps? + +**Answer:** Configurable **per-step retry** with optional backoff. + +- **Config:** On any step (agent or parallel block): + - `retry.max_attempts` (default: 0 = no retry) + - `retry.backoff` (optional): `fixed` (e.g. 1s) or `exponential` (e.g. 1s, 2s, 4s) + - `retry.on` (optional): list of error patterns or exit conditions to retry on (e.g. `["timeout", "rate_limit"]`); if absent, retry on any error. + +**Behavior:** + +- A **step** (single agent or whole parallel block) is retried up to `max_attempts` times on failure. +- After exhausting retries, the **workflow** fails (no partial success for parallel). +- Retries are **transparent** to downstream steps: they only see the final success or the workflow fails. + +**Loops vs retries:** Loops (condition → back to earlier step) are **logical workflow branches**. Retries are **transient error handling** for the same step. Both can be used: e.g. retry a step 2 times, then continue to a condition that may send the workflow back to an earlier step (e.g. QA reject → translator). + +--- + +## Use Case: How do we access outputs from parallel steps in subsequent agents? + +**Answer:** Two mechanisms: + +1. **Automatic context injection:** The executor injects a **context blob** into the next step's session (e.g. as a system or user message) containing: + - `$steps..outputs` — the keyed map of step ID → `{ output, agent }` + - `$steps..order` — deterministic order for index-based access. + +2. **Templates in config:** In agent instructions or in condition expressions, use: + - `{{ $steps.par_gen.outputs.gen_1.output }}` — output of parallel step `gen_1` + - `{{ $steps.par_gen.outputs[0].output }}` — first output by `order` + - Same for nested parallel: `{{ $steps.outer.outputs.inner.outputs.researcher.output }}` (or a flatter key like `inner.researcher` by convention). + +So: **structured access by step ID** (and by index via `order`), both in injected context and in templates. + +--- + +## Summary Table + +| Topic | Decision | +|-----------------------|--------------------------------------------------------------------------| +| Loop depth | `max_loop_iterations` (default 100); per-cycle count per step ID | +| Nested parallel | Yes; parallel steps can contain parallel (or any) steps | +| Parallel output shape | Keyed map by step ID + `order` array; one blob to next step | +| Retry | Per-step `retry.max_attempts` + optional backoff; workflow fails after | +| Access parallel outs | `$steps..outputs..output` and `$steps..outputs[n]` | +| Parallel failure | Any failure in a parallel block fails the whole workflow immediately | + +--- + +## How to run workflow via CLI + +When your agent config defines a `workflow` section, use **exec** (non-TUI) to run the workflow: + +```bash +# Run workflow from config (exec mode runs the workflow executor) +cagent exec ./agent-with-workflow.yaml + +# With a prompt (passed as initial user message to the workflow) +cagent exec ./agent-with-workflow.yaml "Translate and publish this draft" + +# With stdin +echo "Process these items" | cagent exec ./agent-with-workflow.yaml - +``` + +Workflow execution is **only** wired for **exec** mode. The `run` command (TUI) still uses single-agent mode even when the config has a workflow. + +## Implementation Notes + +- **Types:** `pkg/workflow` holds workflow and step types (Config, Step, StepContext, loop counter, condition evaluation). No dependency on runtime or session to avoid import cycles. + - `StepContext` is concurrency-safe (`sync.RWMutex`) and exposes a `Snapshot()` method for serialization/debugging. +- **Executor:** `pkg/workflowrun` holds the executor: runs the workflow DAG (sequential/conditional/parallel), calls runtime `RunStream` per agent step, maintains step outputs and loop counters, evaluates conditions, and injects output context into sessions. + - Use `workflowrun.NewLocalExecutor(runtime)` and `Executor.Run(ctx, cfg, sess, events)` which returns `(*workflow.StepContext, error)`. + - After execution, the step context is printed to stderr as formatted JSON for debugging (`--- Step Context ---`). + - **Context propagation:** `buildPriorContext()` collects all prior step outputs and injects them as a structured text block into the next step's user message. + - **Parallel safety:** `runnerMu` serializes `SetCurrentAgent` + `RunStream` to prevent agent name races; sub-sessions skip SQLite persistence. +- **Session Store:** `SQLiteSessionStore` has a `sync.Mutex` protecting all write methods (`AddMessage`, `UpdateMessage`, `AddSession`, `UpdateSession`, etc.) to prevent concurrent write panics. +- **Config:** Workflow config lives in `pkg/config/latest` as `Config.Workflow` (type `*workflow.Config`). Validation in `validate.go` ensures agent names exist, step types are valid, and condition steps have a condition expression. +- **CLI:** When `Config.Workflow` is set, `cagent exec` uses the workflow executor and streams events to stdout; `cagent run` (TUI) still uses single-agent mode. + +Developer Certificate of Origin +Version 1.1 + +Copyright (C) 2004, 2006 The Linux Foundation and its contributors. +1 Letterman Drive +Suite D4700 +San Francisco, CA, 94129 + +Everyone is permitted to copy and distribute verbatim copies of this +license document, but changing it is not allowed. + +Developer's Certificate of Origin 1.1 + +By making a contribution to this project, I certify that: + +(a) The contribution was created in whole or in part by me and I + have the right to submit it under the open source license + indicated in the file; or + +(b) The contribution is based upon previous work that, to the best + of my knowledge, is covered under an appropriate open source + license and I have the right under that license to submit that + work with modifications, whether created in whole or in part + by me, under the same open source license (unless I am + permitted to submit under a different license), as indicated + in the file; or + +(c) The contribution was provided directly to me by some other + person who certified (a), (b) or (c) and I have not modified + it. + +(d) I understand and agree that this project and the contribution + are public and that a record of the contribution (including all + personal information I submit with it, including my sign-off) is + maintained indefinitely and may be redistributed consistent with + this project or the open source license(s) involved. \ No newline at end of file diff --git a/examples/README.md b/examples/README.md index 57acbf170..319c93352 100644 --- a/examples/README.md +++ b/examples/README.md @@ -30,6 +30,15 @@ If you are looking for inspiration to build a real agent, jump to --- +## **Workflow Configurations** + +These examples use the workflow feature: declarative pipelines of agents (sequential, conditional, or parallel). Run with **`cagent exec`** (workflow runs in exec mode only). + +| Name | Description | +|------|-------------| +| [workflow_sequential.yaml](workflow_sequential.yaml) | Sequential pipeline: generator → translator → publisher. Each agent's output is passed to the next. | +| [workflow_parallel.yaml](workflow_parallel.yaml) | Parallel step: two generators run at once; translator receives both outputs. | + ## Getting started — minimal agents The smallest possible configurations. Read these first to learn the YAML diff --git a/examples/workflow_parallel.yaml b/examples/workflow_parallel.yaml new file mode 100644 index 000000000..1448d5437 --- /dev/null +++ b/examples/workflow_parallel.yaml @@ -0,0 +1,43 @@ +# Sample workflow: parallel step execution +# Run with: cagent exec ./examples/workflow_parallel.yaml "Your prompt" +# +# Two generators run concurrently; translator receives both outputs. + +version: "4" + +models: + default: + provider: openai + model: gpt-4o-mini + max_tokens: 4096 + +agents: + root: + model: default + description: Generates content + instruction: | + Generate content based on the user's request. + Output only the generated content. + + translator: + model: default + description: Combines and refines content from multiple sources + instruction: | + You receive outputs from multiple generator steps (in order). + Combine, deduplicate, or synthesize them into one coherent result. + Output the final result only. + +# Map form: steps + optional max_loop_iterations +# Parallel block: two generators run at once; translator runs after both complete. +workflow: + max_loop_iterations: 100 + steps: + - type: parallel + id: generators + steps: + - type: agent + name: root + - type: agent + name: root + - type: agent + name: translator diff --git a/examples/workflow_sequential.yaml b/examples/workflow_sequential.yaml new file mode 100644 index 000000000..94e91ca65 --- /dev/null +++ b/examples/workflow_sequential.yaml @@ -0,0 +1,45 @@ +# Sample workflow: sequential step execution +# Run with: cagent exec ./examples/workflow_sequential.yaml "Your prompt" +# Or: cagent exec ./examples/workflow_sequential.yaml + +version: "4" + +models: + default: + provider: openai + model: gpt-4o-mini + max_tokens: 4096 + +agents: + root: + model: default + description: Generates initial content + instruction: | + You are the first step in a content pipeline. + Generate clear, structured content based on the user's request. + Output only the generated content; the next agent will process it. + + translator: + model: default + description: Translates or transforms content + instruction: | + You receive content from the previous step. + Translate, summarize, or transform it as requested. + Output only the result; the next agent will use it. + + publisher: + model: default + description: Finalizes and formats output + instruction: | + You receive content from the previous step. + Finalize it: format for publication, add a brief conclusion, and output the final result. + +# Sequential workflow: generator → translator → publisher +# Each agent's output becomes input context for the next. +workflow: + - type: agent + name: root + - type: agent + name: translator + - type: agent + name: publisher diff --git a/pkg/config/config.go b/pkg/config/config.go index 721d4171b..18ade1148 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -124,7 +124,8 @@ func validateConfig(cfg *latest.Config) error { for name := range cfg.Models { if cfg.Models[name].ParallelToolCalls == nil { m := cfg.Models[name] - m.ParallelToolCalls = new(true) + m.ParallelToolCalls = new(bool) + *m.ParallelToolCalls = true cfg.Models[name] = m } } diff --git a/pkg/config/latest/types.go b/pkg/config/latest/types.go index 0eb22cb4e..ef8b65db9 100644 --- a/pkg/config/latest/types.go +++ b/pkg/config/latest/types.go @@ -14,6 +14,7 @@ import ( "github.com/docker/docker-agent/pkg/config/types" "github.com/docker/docker-agent/pkg/effort" + "github.com/docker/docker-agent/pkg/workflow" ) const Version = "9" @@ -28,6 +29,8 @@ type Config struct { RAG map[string]RAGToolset `json:"rag,omitempty"` Metadata Metadata `json:"metadata"` Permissions *PermissionsConfig `json:"permissions,omitempty"` + // Workflow defines optional sequential, conditional, and parallel step execution. + Workflow *workflow.Workflow `json:"workflow,omitempty" yaml:"workflow,omitempty"` } // MCPToolset is a reusable MCP server definition stored in the top-level diff --git a/pkg/config/latest/validate.go b/pkg/config/latest/validate.go index 744d15d86..e3a259787 100644 --- a/pkg/config/latest/validate.go +++ b/pkg/config/latest/validate.go @@ -6,6 +6,8 @@ import ( "net" "net/url" "strings" + + "github.com/docker/docker-agent/pkg/workflow" ) func (t *Config) UnmarshalYAML(unmarshal func(any) error) error { @@ -50,9 +52,66 @@ func (t *Config) Validate() error { } } + if t.Workflow != nil { + if err := validateWorkflow(t.Workflow, t.Agents); err != nil { + return err + } + } + return nil } +// validateWorkflow ensures workflow step agent names exist in Agents and step types are valid. +func validateWorkflow(cfg *workflow.Workflow, agents Agents) error { + if cfg == nil { + return nil + } + agentSet := make(map[string]bool) + for _, a := range agents { + agentSet[a.Name] = true + } + var validateSteps func(steps []workflow.Step) error + validateSteps = func(steps []workflow.Step) error { + for i := range steps { + s := &steps[i] + switch s.Type { + case workflow.StepTypeAgent: + if s.Name == "" { + return fmt.Errorf("workflow step[%d]: agent step requires name", i) + } + if !agentSet[s.Name] { + return fmt.Errorf("workflow step[%d]: agent %q not found in agents", i, s.Name) + } + case workflow.StepTypeCondition: + if s.Condition == "" { + return fmt.Errorf("workflow step[%d]: condition step requires condition", i) + } + if err := validateSteps(s.TrueSteps); err != nil { + return err + } + if err := validateSteps(s.FalseSteps); err != nil { + return err + } + case workflow.StepTypeParallel: + if err := validateSteps(s.Steps); err != nil { + return err + } + case workflow.StepTypeSubWorkflow: + if s.Workflow == nil { + return fmt.Errorf("workflow step[%d]: sub-workflow step requires workflow", i) + } + if err := validateWorkflow(s.Workflow, agents); err != nil { + return err + } + default: + return fmt.Errorf("workflow step[%d]: unknown type %q", i, s.Type) + } + } + return nil + } + return validateSteps(cfg.Steps) +} + // validateFallback validates the fallback configuration for an agent func (a *AgentConfig) validateFallback() error { if a.Fallback == nil { diff --git a/pkg/config/latest/validate_test.go b/pkg/config/latest/validate_test.go new file mode 100644 index 000000000..d686743e1 --- /dev/null +++ b/pkg/config/latest/validate_test.go @@ -0,0 +1,245 @@ +package latest + +import ( + "testing" + + "github.com/goccy/go-yaml" + "github.com/stretchr/testify/require" +) + +func TestToolset_Validate_LSP(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + config string + wantErr string + }{ + { + name: "valid lsp with command", + config: ` +version: "3" +agents: + root: + model: "openai/gpt-4" + toolsets: + - type: lsp + command: gopls +`, + wantErr: "", + }, + { + name: "lsp missing command", + config: ` +version: "3" +agents: + root: + model: "openai/gpt-4" + toolsets: + - type: lsp +`, + wantErr: "lsp toolset requires a command to be set", + }, + { + name: "lsp with args", + config: ` +version: "3" +agents: + root: + model: "openai/gpt-4" + toolsets: + - type: lsp + command: gopls + args: + - -remote=auto +`, + wantErr: "", + }, + { + name: "lsp with env", + config: ` +version: "3" +agents: + root: + model: "openai/gpt-4" + toolsets: + - type: lsp + command: gopls + env: + GOFLAGS: "-mod=vendor" +`, + wantErr: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + var cfg Config + err := yaml.Unmarshal([]byte(tt.config), &cfg) + + if tt.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.wantErr) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestToolset_Validate_Sandbox(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + config string + wantErr string + }{ + { + name: "valid shell with sandbox", + config: ` +version: "3" +agents: + root: + model: "openai/gpt-4" + toolsets: + - type: shell + sandbox: + image: alpine:latest + paths: + - . + - /tmp +`, + wantErr: "", + }, + { + name: "shell sandbox with readonly path", + config: ` +version: "3" +agents: + root: + model: "openai/gpt-4" + toolsets: + - type: shell + sandbox: + paths: + - ./:rw + - /config:ro +`, + wantErr: "", + }, + { + name: "shell sandbox without paths", + config: ` +version: "3" +agents: + root: + model: "openai/gpt-4" + toolsets: + - type: shell + sandbox: + image: alpine:latest +`, + wantErr: "sandbox requires at least one path to be set", + }, + { + name: "sandbox on non-shell toolset", + config: ` +version: "3" +agents: + root: + model: "openai/gpt-4" + toolsets: + - type: filesystem + sandbox: + paths: + - . +`, + wantErr: "sandbox can only be used with type 'shell'", + }, + { + name: "shell without sandbox is valid", + config: ` +version: "3" +agents: + root: + model: "openai/gpt-4" + toolsets: + - type: shell +`, + wantErr: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + var cfg Config + err := yaml.Unmarshal([]byte(tt.config), &cfg) + + if tt.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.wantErr) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestValidateWorkflow(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + config string + wantErr string + }{ + { + name: "valid workflow sequential", + config: ` +version: "4" +agents: + generator: + model: openai/gpt-4 + translator: + model: openai/gpt-4 +workflow: + - type: agent + name: generator + - type: agent + name: translator +`, + wantErr: "", + }, + { + name: "workflow agent not found", + config: ` +version: "4" +agents: + generator: + model: openai/gpt-4 +workflow: + - type: agent + name: translator +`, + wantErr: "not found in agents", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + var cfg Config + err := yaml.Unmarshal([]byte(tt.config), &cfg) + if tt.wantErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.wantErr) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/session/store.go b/pkg/session/store.go index f40cc1674..f075ea248 100644 --- a/pkg/session/store.go +++ b/pkg/session/store.go @@ -11,6 +11,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" "github.com/docker/docker-agent/pkg/chat" @@ -338,6 +339,7 @@ type querier interface { // SQLiteSessionStore implements Store using SQLite type SQLiteSessionStore struct { + mu sync.Mutex db *sql.DB } @@ -565,6 +567,8 @@ func backupDatabase(path string) error { // AddSession adds a new session to the store, including any messages func (s *SQLiteSessionStore) AddSession(ctx context.Context, session *Session) error { + s.mu.Lock() + defer s.mu.Unlock() if session.ID == "" { return ErrEmptyID } @@ -852,6 +856,8 @@ func (s *SQLiteSessionStore) GetSessionSummaries(ctx context.Context) ([]Summary // DeleteSession deletes a session by ID func (s *SQLiteSessionStore) DeleteSession(ctx context.Context, id string) error { + s.mu.Lock() + defer s.mu.Unlock() if id == "" { return ErrEmptyID } @@ -877,6 +883,8 @@ func (s *SQLiteSessionStore) DeleteSession(ctx context.Context, id string) error // Only metadata is modified - use AddMessage, AddSubSession, AddSummary for items. // Messages are persisted separately via events to avoid duplication. func (s *SQLiteSessionStore) UpdateSession(ctx context.Context, session *Session) error { + s.mu.Lock() + defer s.mu.Unlock() if session.ID == "" { return ErrEmptyID } @@ -932,6 +940,8 @@ func (s *SQLiteSessionStore) UpdateSession(ctx context.Context, session *Session // SetSessionStarred sets the starred status of a session. func (s *SQLiteSessionStore) SetSessionStarred(ctx context.Context, id string, starred bool) error { + s.mu.Lock() + defer s.mu.Unlock() if id == "" { return ErrEmptyID } @@ -961,6 +971,8 @@ func (s *SQLiteSessionStore) Close() error { // AddMessage adds a message to a session at the next position. // Returns the ID of the created message item. func (s *SQLiteSessionStore) AddMessage(ctx context.Context, sessionID string, msg *Message) (int64, error) { + s.mu.Lock() + defer s.mu.Unlock() if sessionID == "" { return 0, ErrEmptyID } @@ -990,6 +1002,8 @@ func (s *SQLiteSessionStore) AddMessage(ctx context.Context, sessionID string, m // UpdateMessage updates an existing message by its ID. func (s *SQLiteSessionStore) UpdateMessage(ctx context.Context, messageID int64, msg *Message) error { + s.mu.Lock() + defer s.mu.Unlock() msgJSON, err := json.Marshal(msg.Message) if err != nil { return fmt.Errorf("marshaling message: %w", err) @@ -1016,6 +1030,8 @@ func (s *SQLiteSessionStore) UpdateMessage(ctx context.Context, messageID int64, // AddSubSession creates a sub-session and links it to the parent. func (s *SQLiteSessionStore) AddSubSession(ctx context.Context, parentSessionID string, subSession *Session) error { + s.mu.Lock() + defer s.mu.Unlock() if parentSessionID == "" || subSession.ID == "" { return ErrEmptyID } @@ -1143,6 +1159,8 @@ func (s *SQLiteSessionStore) AddSummary(ctx context.Context, sessionID, summary // UpdateSessionTokens updates only token/cost fields. func (s *SQLiteSessionStore) UpdateSessionTokens(ctx context.Context, sessionID string, inputTokens, outputTokens int64, cost float64) error { + s.mu.Lock() + defer s.mu.Unlock() if sessionID == "" { return ErrEmptyID } @@ -1154,6 +1172,8 @@ func (s *SQLiteSessionStore) UpdateSessionTokens(ctx context.Context, sessionID // UpdateSessionTitle updates only the title. func (s *SQLiteSessionStore) UpdateSessionTitle(ctx context.Context, sessionID, title string) error { + s.mu.Lock() + defer s.mu.Unlock() if sessionID == "" { return ErrEmptyID } diff --git a/pkg/teamloader/teamloader.go b/pkg/teamloader/teamloader.go index 9d4d5029c..83cffeb95 100644 --- a/pkg/teamloader/teamloader.go +++ b/pkg/teamloader/teamloader.go @@ -31,6 +31,7 @@ import ( skillstool "github.com/docker/docker-agent/pkg/tools/builtin/skills" "github.com/docker/docker-agent/pkg/tools/builtin/transfertask" "github.com/docker/docker-agent/pkg/tools/codemode" + "github.com/docker/docker-agent/pkg/workflow" ) var defaultMaxTokens int64 = 32000 @@ -75,6 +76,8 @@ type LoadResult struct { Providers map[string]latest.ProviderConfig // AgentDefaultModels maps agent names to their configured default model references AgentDefaultModels map[string]string + // Workflow is set when the config defines a workflow; used by run/exec to run the workflow executor. + Workflow *workflow.Workflow } // Load loads an agent team from the given source @@ -280,6 +283,7 @@ func LoadWithConfig(ctx context.Context, agentSource config.Source, runConfig *c Models: cfg.Models, Providers: cfg.Providers, AgentDefaultModels: agentDefaultModels, + Workflow: cfg.Workflow, }, nil } diff --git a/pkg/workflow/context.go b/pkg/workflow/context.go new file mode 100644 index 000000000..caaee3d34 --- /dev/null +++ b/pkg/workflow/context.go @@ -0,0 +1,301 @@ +package workflow + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/docker/docker-agent/pkg/concurrent" + "github.com/docker/docker-agent/pkg/paths" + "github.com/docker/docker-agent/pkg/sqliteutil" +) + +func NewInMemoryCheckpointStore(workflowID string) *inMemoryCheckpointStore { + return &inMemoryCheckpointStore{ + workflowID: workflowID, + checkpoints: concurrent.NewMap[string, *Checkpoint](), + } +} + +func NewSQLiteCheckpointStore(ctx context.Context, workflowID string, path string) (*SQLiteCheckpointStore, error) { + db, err := sqliteutil.OpenDB(path) + if err != nil { + return nil, err + } + // Ensure we close the connection if table creation fails + // Note: We don't defer close here because we return the db on success + + _, err = db.ExecContext(context.Background(), "CREATE TABLE IF NOT EXISTS memories (id TEXT PRIMARY KEY, created_at TEXT, memory TEXT)") + if err != nil { + db.Close() + return nil, err + } + + return &SQLiteCheckpointStore{db: db, mu: sync.Mutex{}}, nil +} + +// NewWorkflowContext returns a new WorkflowContext. +func NewWorkflowContext(workflow *Workflow, checkpointID string) (*WorkflowContext, error) { + store, err := NewSQLiteCheckpointStore(context.Background(), checkpointID, filepath.Join(paths.GetHomeDir(), ".cagent", "session.db")) + if err != nil { + return nil, err + } + checkpoint, err := store.GetCheckpoint(context.Background(), checkpointID) + if err != nil { + return nil, err + } + memoryStore := NewInMemoryCheckpointStore(checkpointID) + var data map[string]any + if checkpoint != nil { + memoryStore.checkpoints.Store(checkpointID, checkpoint) + data = checkpoint.State + } else { + data = make(map[string]any) + } + + return &WorkflowContext{ + mu: sync.RWMutex{}, + data: data, + workflow: workflow, + store: store, + memoryStore: memoryStore, + }, nil +} + +// Snapshot returns a shallow copy of the internal data map for serialization/debugging. +func (c *WorkflowContext) Snapshot() map[string]any { + if c == nil { + return nil + } + c.mu.RLock() + defer c.mu.RUnlock() + out := make(map[string]any, len(c.data)) + for k, v := range c.data { + out[k] = v + } + return out +} + +func (s *SQLiteCheckpointStore) SaveCheckpoint(ctx context.Context, workflow *WorkflowContext) error { + workflow.mu.RLock() + defer workflow.mu.RUnlock() + + id := workflow.memoryStore.workflowID + checkpoint := &Checkpoint{ + Name: id, + workflowName: workflow.workflow.name, + State: workflow.data, + CreatedAt: time.Now(), + } + workflow.memoryStore.checkpoints.Store(id, checkpoint) + + data, err := json.Marshal(checkpoint) + if err != nil { + return err + } + + s.mu.Lock() + defer s.mu.Unlock() + _, err = s.db.ExecContext(ctx, "INSERT OR REPLACE INTO memories (id, created_at, memory) VALUES (?, ?, ?)", id, checkpoint.CreatedAt.Format(time.RFC3339), string(data)) + return err +} + +func (s *SQLiteCheckpointStore) GetCheckpoint(ctx context.Context, id string) (*Checkpoint, error) { + s.mu.Lock() + defer s.mu.Unlock() + + var createdAtStr, memoryStr string + err := s.db.QueryRowContext(ctx, "SELECT created_at, memory FROM memories WHERE id = ?", id).Scan(&createdAtStr, &memoryStr) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + + var cp Checkpoint + if err := json.Unmarshal([]byte(memoryStr), &cp); err != nil { + return nil, fmt.Errorf("failed to unmarshal checkpoint: %w", err) + } + return &cp, nil +} + +func (s *SQLiteCheckpointStore) Close() error { + return s.db.Close() +} + +func (s *SQLiteCheckpointStore) FlushCheckpointsToDB(workflowContext *WorkflowContext) error { + return nil +} + +// SaveCheckpoint saves the current checkpoint to the store. +func (c *WorkflowContext) SaveCheckpoint(ctx context.Context) error { + return c.store.SaveCheckpoint(ctx, c) +} + +// FlushCheckpointsToDB flushes checkpoints to the database. +func (c *WorkflowContext) FlushCheckpointsToDB() error { + return c.store.FlushCheckpointsToDB(c) +} + +// SetAgentOutput records the output of a single agent step by ID. +func (c *WorkflowContext) SetAgentOutput(stepID, output, agentName string) { + if c == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + if c.data == nil { + c.data = make(map[string]any) + } + c.data[stepID] = StepOutput{Output: output, Agent: agentName} +} + +// SetParallelOutput records the outputs of a parallel block by its step ID. +func (c *WorkflowContext) SetParallelOutput(stepID string, out *ParallelOutputs) { + if c == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + if c.data == nil { + c.data = make(map[string]any) + } + c.data[stepID] = out +} + +// GetOutput returns the StepOutput for a step ID if it is a single agent output. +func (c *WorkflowContext) GetOutput(stepID string) (StepOutput, bool) { + if c == nil { + return StepOutput{}, false + } + c.mu.RLock() + defer c.mu.RUnlock() + v, ok := c.data[stepID] + if !ok { + return StepOutput{}, false + } + so, ok := v.(StepOutput) + return so, ok +} + +// GetParallelOutput returns the ParallelOutputs for a step ID if it is a parallel block. +func (c *WorkflowContext) GetParallelOutput(stepID string) (*ParallelOutputs, bool) { + if c == nil { + return nil, false + } + c.mu.RLock() + defer c.mu.RUnlock() + v, ok := c.data[stepID] + if !ok { + return nil, false + } + po, ok := v.(*ParallelOutputs) + return po, ok +} + +// EvalCondition evaluates a condition string against this context. +// Supports simple template form: {{ $steps..output }} or {{ $steps..output.path }}. +// Returns (value, true) if the expression resolves to a boolean; otherwise (nil, false). +// Full implementation would use a proper expression evaluator; this provides the contract. +func (c *WorkflowContext) EvalCondition(condition string) (bool, bool) { + expr := strings.TrimSpace(condition) + expr = trimTemplateBraces(expr) + if !strings.HasPrefix(expr, "$steps.") { + return false, false + } + // Minimal path: $steps..output or $steps..outputs..output + parts := strings.Split(expr, ".") + if len(parts) < 3 { + return false, false + } + stepID := parts[1] + if len(parts) >= 5 && parts[2] == "outputs" { + // $steps.par_id.outputs.step_id.output + parID := parts[1] + po, ok := c.GetParallelOutput(parID) + if !ok { + return false, false + } + subID := parts[3] + so, ok := po.Steps[subID] + if !ok { + return false, false + } + if len(parts) == 5 && parts[4] == "output" { + return parseBool(so.Output), true + } + return false, false + } + so, ok := c.GetOutput(stepID) + if !ok { + return false, false + } + // $steps..output or $steps..output.path (e.g. is_approved) + if len(parts) == 3 && parts[2] == "output" { + return parseBool(so.Output), true + } + if len(parts) >= 4 && parts[2] == "output" { + // Try to parse so.Output as JSON and read path (e.g. is_approved) + var m map[string]any + if err := json.Unmarshal([]byte(so.Output), &m); err != nil { + return parseBool(so.Output), true + } + v := getPath(m, parts[3:]) + return boolFromAny(v), true + } + return false, false +} + +func trimTemplateBraces(s string) string { + s = strings.TrimSpace(s) + if strings.HasPrefix(s, "{{") { + s = strings.TrimPrefix(s, "{{") + } + if strings.HasSuffix(s, "}}") { + s = strings.TrimSuffix(s, "}}") + } + return strings.TrimSpace(s) +} + +func getPath(m map[string]any, path []string) any { + var v any = m + for _, key := range path { + if v == nil { + return nil + } + mp, ok := v.(map[string]any) + if !ok { + return nil + } + v, ok = mp[key] + if !ok { + return nil + } + } + return v +} + +func parseBool(s string) bool { + s = strings.TrimSpace(strings.ToLower(s)) + return s == "true" || s == "1" || s == "yes" +} + +func boolFromAny(v any) bool { + if v == nil { + return false + } + switch b := v.(type) { + case bool: + return b + case string: + return parseBool(b) + default: + return false + } +} diff --git a/pkg/workflow/context_test.go b/pkg/workflow/context_test.go new file mode 100644 index 000000000..4397187b0 --- /dev/null +++ b/pkg/workflow/context_test.go @@ -0,0 +1,27 @@ +package workflow + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWorkflowContext_EvalCondition(t *testing.T) { + t.Parallel() + + wf := &Workflow{name: "test_workflow"} + ctx, err := NewWorkflowContext(wf, "test_checkpoint") + require.NoError(t, err) + + ctx.SetAgentOutput("qa", `{"is_approved": true}`, "qa_agent") + + ok, resolved := ctx.EvalCondition("{{ $steps.qa.output.is_approved }}") + require.True(t, resolved) + assert.True(t, ok) + + ctx.SetAgentOutput("qa", `{"is_approved": false}`, "qa_agent") + ok, resolved = ctx.EvalCondition("{{ $steps.qa.output.is_approved }}") + require.True(t, resolved) + assert.False(t, ok) +} diff --git a/pkg/workflow/loop_counter.go b/pkg/workflow/loop_counter.go new file mode 100644 index 000000000..232999ba4 --- /dev/null +++ b/pkg/workflow/loop_counter.go @@ -0,0 +1,40 @@ +package workflow + +import ( + "context" + "fmt" + "sync" +) + +type loopCounterKey struct{} + +type loopCounter struct { + mu sync.Mutex + counts map[string]int + maxPerID int +} + +// NewLoopCounter attaches a loop counter to ctx. maxIterations is the maximum +// number of times any single step ID can be executed (loop back-edges). +func NewLoopCounter(ctx context.Context, maxIterations int) context.Context { + return context.WithValue(ctx, loopCounterKey{}, &loopCounter{ + counts: make(map[string]int), + maxPerID: maxIterations, + }) +} + +// IncLoopCounter increments the execution count for stepID and returns an error +// if the count exceeds the configured maximum (prevents infinite loops). +func IncLoopCounter(ctx context.Context, stepID string) error { + lc, ok := ctx.Value(loopCounterKey{}).(*loopCounter) + if !ok { + return nil + } + lc.mu.Lock() + defer lc.mu.Unlock() + lc.counts[stepID]++ + if lc.counts[stepID] > lc.maxPerID { + return fmt.Errorf("workflow: max loop iterations exceeded (step: %s, limit: %d)", stepID, lc.maxPerID) + } + return nil +} diff --git a/pkg/workflow/types.go b/pkg/workflow/types.go new file mode 100644 index 000000000..d902d9077 --- /dev/null +++ b/pkg/workflow/types.go @@ -0,0 +1,158 @@ +package workflow + +import ( + "database/sql" + "fmt" + "sync" + "time" + + "github.com/docker/docker-agent/pkg/concurrent" +) + +// StepType identifies the kind of workflow step. +type StepType string + +const ( + StepTypeAgent StepType = "agent" + StepTypeCondition StepType = "condition" + StepTypeSubWorkflow StepType = "subworkflow" + StepTypeParallel StepType = "parallel" +) + +// Step represents a single workflow step (agent, condition, or parallel block). +// Steps are defined in config and executed by the workflow executor. +type Step struct { + // ID is a unique identifier for this step. Used for output access (e.g. $steps..output) + // and loop detection. If empty, the executor may assign one (e.g. by index). + ID string `json:"id,omitempty" yaml:"id,omitempty"` + + // Type is one of: agent, condition, parallel. + Type StepType `json:"type" yaml:"type"` + + // Name is the agent name (for type=agent). Must reference an agent in config. + Name string `json:"name,omitempty" yaml:"name,omitempty"` + + // Condition is the expression for type=condition (e.g. "{{ $steps.qa.output.is_approved }}"). + // Evaluated after referenced steps have run; must resolve to a boolean. + Condition string `json:"condition,omitempty" yaml:"condition,omitempty"` + + // TrueSteps are executed when condition evaluates to true. + TrueSteps []Step `json:"true,omitempty" yaml:"true,omitempty"` + + // FalseSteps are executed when condition evaluates to false. + FalseSteps []Step `json:"false,omitempty" yaml:"false,omitempty"` + + // Steps are the child steps for type=parallel. All run concurrently. + Steps []Step `json:"steps,omitempty" yaml:"steps,omitempty"` + + StepTimeout time.Duration `json:"stepTimeout" yaml:"stepTimeout"` + + // Workflow is the child workflow for type=subworkflow. All run concurrently. + Workflow *Workflow `json:"workflow,omitempty" yaml:"workflow,omitempty"` + + // Retry configures per-step retry on failure. + Retry *RetryConfig `json:"retry,omitempty" yaml:"retry,omitempty"` +} + +// RetryConfig configures retry behavior for a step (agent or parallel block). +type RetryConfig struct { + // MaxAttempts is the maximum number of attempts (including the first). Default 0 = no retry. + MaxAttempts int `json:"max_attempts" yaml:"max_attempts"` + // Backoff is "fixed" (constant delay) or "exponential". Optional. + Backoff string `json:"backoff,omitempty" yaml:"backoff,omitempty"` + // InitialDelaySeconds is the delay before first retry. Used with Backoff. + InitialDelaySeconds int `json:"initial_delay_seconds,omitempty" yaml:"initial_delay_seconds,omitempty"` + // On lists error patterns to retry on (e.g. ["timeout", "rate_limit"]). Empty = retry on any error. + On []string `json:"on,omitempty" yaml:"on,omitempty"` +} + +type WorkflowContext struct { + workflow *Workflow + Checkpoints map[string]Checkpoint `json:"checkpoints" yaml:"checkpoints"` + currentStep Step + mu sync.RWMutex + data map[string]any + store *SQLiteCheckpointStore + memoryStore *inMemoryCheckpointStore +} + +// SQLiteSessionStore implements Store using SQLite +type SQLiteCheckpointStore struct { + mu sync.Mutex + db *sql.DB +} + +// inMemoryCheckpointStore implements Store using SQLite +type inMemoryCheckpointStore struct { + mu sync.Mutex + workflowID string + checkpoints *concurrent.Map[string, *Checkpoint] +} + +type Checkpoint struct { + Name string `json:"name" yaml:"name"` + workflowName string `json:"workflowName" yaml:"workflowName"` + CurrentStepIndex int `json:"currentStepIndex" yaml:"currentStepIndex"` + Iteration int `json:"iteration" yaml:"iteration"` + State map[string]any `json:"state" yaml:"state"` + CreatedAt time.Time `json:"createdAt" yaml:"createdAt"` +} + +// Config holds workflow-level settings and the root steps. +type Workflow struct { + name string `json:"name" yaml:"name"` + + fileName string `json:"fileName" yaml:"fileName"` + + StepTimeout time.Duration `json:"stepTimeout" yaml:"stepTimeout"` + + // Steps are the top-level workflow steps (sequential by default). + Steps []Step `json:"steps,omitempty" yaml:"steps,omitempty"` + + // MaxLoopIterations is the maximum number of times a step can be re-executed + // due to a conditional back-edge (loop). Default 100. Prevents infinite loops. + MaxLoopIterations int `json:"max_loop_iterations,omitempty" yaml:"max_loop_iterations,omitempty"` +} + +// UnmarshalYAML allows workflow to be specified as a list (steps only) or a map (steps + max_loop_iterations). +func (c *Workflow) UnmarshalYAML(unmarshal func(any) error) error { + var listForm []Step + if err := unmarshal(&listForm); err == nil { + c.Steps = listForm + return nil + } + type rawConfig Workflow + var mapForm rawConfig + if err := unmarshal(&mapForm); err != nil { + return fmt.Errorf("workflow: expected a list of steps or a map with 'steps' and optional 'max_loop_iterations': %w", err) + } + *c = Workflow(mapForm) + return nil +} + +// DefaultMaxLoopIterations is the default cap for loop iterations when not set in config. +const DefaultMaxLoopIterations = 100 + +// StepOutput holds the output of a single step (e.g. last assistant message content). +type StepOutput struct { + // Output is the last assistant message content from the step. + Output string `json:"output"` + // Agent is the agent name that produced this output (for type=agent). + Agent string `json:"agent,omitempty"` +} + +// ParallelOutputs is the structure passed to the next step after a parallel block. +// Keys are step IDs; order preserves deterministic indexing (e.g. outputs[0]). +type ParallelOutputs struct { + Steps map[string]StepOutput `json:"steps"` + Order []string `json:"order"` +} + +// GetByIndex returns the StepOutput at index i (using Order). Returns zero value if out of range. +func (p *ParallelOutputs) GetByIndex(i int) StepOutput { + if p == nil || i < 0 || i >= len(p.Order) { + return StepOutput{} + } + id := p.Order[i] + return p.Steps[id] +} diff --git a/pkg/workflowrun/executor.go b/pkg/workflowrun/executor.go new file mode 100644 index 000000000..d215769bf --- /dev/null +++ b/pkg/workflowrun/executor.go @@ -0,0 +1,348 @@ +package workflowrun + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/docker/docker-agent/pkg/paths" + "github.com/docker/docker-agent/pkg/runtime" + "github.com/docker/docker-agent/pkg/session" + "github.com/docker/docker-agent/pkg/sqliteutil" + "github.com/docker/docker-agent/pkg/workflow" +) + +// Event is the type of events emitted during workflow execution. +// The executor sends runtime.Event values on the channel. +type Event = any + +// Executor runs a workflow: sequential, conditional, and parallel steps. +// It drives the runtime (RunStream per agent step), maintains step outputs, +// evaluates conditions, and enforces max loop iterations. +type Executor interface { + // Run executes the workflow with the given session (initial user message) and sends events to the channel. + Run(ctx context.Context, cfg *workflow.Workflow, sess *session.Session, events chan Event) (*workflow.WorkflowContext, error) +} + +// SQLiteLoggerStore implements Store using SQLite +type SQLiteLoggerStore struct { + mu sync.Mutex + db *sql.DB +} + +func NewSQLiteLoggerStore(ctx context.Context, workflowID string, path string) (*SQLiteLoggerStore, error) { + db, err := sqliteutil.OpenDB(path) + if err != nil { + return nil, err + } + // Ensure we close the connection if table creation fails + // Note: We don't defer close here because we return the db on success + _, err = db.ExecContext(context.Background(), "CREATE TABLE IF NOT EXISTS logger (id TEXT PRIMARY KEY, created_at TEXT, memory TEXT)") + if err != nil { + db.Close() + return nil, err + } + + return &SQLiteLoggerStore{db: db, mu: sync.Mutex{}}, nil +} + +// Runner is the minimal runtime interface needed to run agent steps. +// Callers pass a runtime.Runtime (or adapter) that implements Runner. +type Runner interface { + CurrentAgentName() string + SetCurrentAgent(agentName string) error + RunStream(ctx context.Context, sess *session.Session) <-chan runtime.Event +} + +// LocalExecutor executes workflows using a LocalRuntime (or any Runner). +type LocalExecutor struct { + Runner Runner + // runnerMu serializes SetCurrentAgent + RunStream calls so the Runner's + // internal goroutine captures the correct agent name before the next + // parallel step changes it. + runnerMu sync.Mutex +} + +// NewLocalExecutor returns an executor that uses the given Runner. +func NewLocalExecutor(r Runner) *LocalExecutor { + return &LocalExecutor{Runner: r} +} + +// Run executes the workflow. Sequential steps run in order; conditional steps +// evaluate and run true/false branches; parallel steps run concurrently and +// all must succeed before the next sequential step. +func (e *LocalExecutor) Run(ctx context.Context, cfg *workflow.Workflow, sess *session.Session, events chan Event) (*workflow.WorkflowContext, error) { + if cfg == nil || len(cfg.Steps) == 0 { + return nil, fmt.Errorf("workflow: no steps WorkflowWorkflowured") + } + maxLoop := cfg.MaxLoopIterations + if maxLoop <= 0 { + maxLoop = workflow.DefaultMaxLoopIterations + } + ctx = workflow.NewLoopCounter(ctx, maxLoop) + stepCtx, err := workflow.NewWorkflowContext(cfg, sess.ID) + if err != nil { + return nil, err + } + err = e.runSteps(ctx, cfg.Steps, stepCtx, sess, cfg, events) + if err != nil { + return nil, err + } + + // Print step context for debugging. + if b, jerr := json.MarshalIndent(stepCtx.Snapshot(), "", " "); jerr == nil { + fmt.Fprintf(os.Stderr, "\n--- Step Context ---\n%s\n", string(b)) + } + // save workflow run log to db + loggerPath := filepath.Join(paths.GetHomeDir(), ".cagent", "logger.db") + loggerStore, err := NewSQLiteLoggerStore(ctx, sess.ID, loggerPath) + if err != nil { + return nil, err + } + + logData, _ := json.Marshal(stepCtx.Snapshot()) + + loggerStore.mu.Lock() + defer loggerStore.mu.Unlock() + _, err = loggerStore.db.ExecContext(ctx, "INSERT OR REPLACE INTO logger (id, created_at, memory) VALUES (?, ?, ?)", sess.ID, time.Now().Format(time.RFC3339), string(logData)) + if err != nil { + return nil, fmt.Errorf("failed to save workflow log: %w", err) + } + + return stepCtx, nil +} + +func (e *LocalExecutor) runSteps(ctx context.Context, steps []workflow.Step, stepCtx *workflow.WorkflowContext, sess *session.Session, config *workflow.Workflow, events chan Event) error { + for i := range steps { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := e.runStepWithTimeout(ctx, &steps[i], stepCtx, sess, config, events); err != nil { + return err + } + } + return nil +} + +func (e *LocalExecutor) runStepWithTimeout(ctx context.Context, step *workflow.Step, stepCtx *workflow.WorkflowContext, sess *session.Session, config *workflow.Workflow, events chan Event) error { + if config.StepTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, config.StepTimeout) + defer cancel() + } + err := e.runStep(ctx, step, stepCtx, sess, config, events) + if err != nil { + return err + } + if err := stepCtx.SaveCheckpoint(ctx); err != nil { + return err + } + return nil +} + +func (e *LocalExecutor) runStep(ctx context.Context, step *workflow.Step, stepCtx *workflow.WorkflowContext, sess *session.Session, config *workflow.Workflow, events chan Event) error { + stepID := step.ID + if stepID == "" { + stepID = fmt.Sprintf("step_%s", step.Type) + } + switch step.Type { + case workflow.StepTypeAgent: + return e.runAgentStep(ctx, stepID, step, stepCtx, sess, config, events) + case workflow.StepTypeCondition: + return e.runConditionStep(ctx, stepID, step, stepCtx, sess, config, events) + case workflow.StepTypeParallel: + return e.runParallelStep(ctx, stepID, step, stepCtx, sess, config, events) + case workflow.StepTypeSubWorkflow: + return e.runSubWorkflowStep(ctx, stepID, step, stepCtx, sess, config, events) + default: + return fmt.Errorf("workflow: unknown step type %q", step.Type) + } +} + +func (e *LocalExecutor) runAgentStep(ctx context.Context, stepID string, step *workflow.Step, stepCtx *workflow.WorkflowContext, sess *session.Session, config *workflow.Workflow, events chan Event) error { + if err := workflow.IncLoopCounter(ctx, stepID); err != nil { + return err + } + + runSess := e.buildSessionForStep(step, stepCtx, sess) + + // Protect SetCurrentAgent + RunStream so the Runner's internal goroutine + // captures the correct agent before another parallel step changes it. + e.runnerMu.Lock() + if err := e.Runner.SetCurrentAgent(step.Name); err != nil { + e.runnerMu.Unlock() + return fmt.Errorf("workflow: set agent %q: %w", step.Name, err) + } + eventsCh := e.Runner.RunStream(ctx, runSess) + e.runnerMu.Unlock() + + for ev := range eventsCh { + select { + case events <- ev: + case <-ctx.Done(): + return ctx.Err() + } + } + var lastOutput string + if runSess != nil { + lastOutput = runSess.GetLastAssistantMessageContent() + } + stepCtx.SetAgentOutput(stepID, lastOutput, step.Name) + return nil +} + +func (e *LocalExecutor) buildSessionForStep(step *workflow.Step, stepCtx *workflow.WorkflowContext, initial *session.Session) *session.Session { + opts := []session.Opt{ + session.WithMaxIterations(initial.MaxIterations), + session.WithToolsApproved(initial.ToolsApproved), + session.WithSendUserMessage(true), + } + + // Build the user message: original user prompt + context from prior steps. + var userMsg string + if initial != nil && initial.Messages != nil { + for _, item := range initial.Messages { + if item.IsMessage() && item.Message.Message.Role == "user" { + userMsg = item.Message.Message.Content + break + } + } + } + if userMsg == "" { + userMsg = "Please proceed with the workflow step." + } + + // Inject prior step outputs as context for the current step. + if prior := buildPriorContext(stepCtx); prior != "" { + userMsg = prior + "\n\n" + userMsg + } + + opts = append(opts, session.WithUserMessage(userMsg)) + return session.New(opts...) +} + +// buildPriorContext formats all prior step outputs into a context block +// that is injected into the next step's user message. +func buildPriorContext(stepCtx *workflow.WorkflowContext) string { + snapshot := stepCtx.Snapshot() + if len(snapshot) == 0 { + return "" + } + + var sb strings.Builder + sb.WriteString("--- Prior Step Outputs ---") + for id, v := range snapshot { + switch out := v.(type) { + case workflow.StepOutput: + if out.Output != "" { + sb.WriteString("\n\n[") + sb.WriteString(id) + sb.WriteString(" (agent: ") + sb.WriteString(out.Agent) + sb.WriteString(")]:\n") + sb.WriteString(out.Output) + } + case *workflow.ParallelOutputs: + if out != nil { + for _, subID := range out.Order { + so := out.Steps[subID] + if so.Output != "" { + sb.WriteString("\n\n[") + sb.WriteString(id) + sb.WriteString("/") + sb.WriteString(subID) + sb.WriteString(" (agent: ") + sb.WriteString(so.Agent) + sb.WriteString(")]:\n") + sb.WriteString(so.Output) + } + } + } + } + } + sb.WriteString("\n\n--- End Prior Step Outputs ---") + return sb.String() +} + +func (e *LocalExecutor) runConditionStep(ctx context.Context, stepID string, step *workflow.Step, stepCtx *workflow.WorkflowContext, sess *session.Session, config *workflow.Workflow, events chan Event) error { + ok, resolved := stepCtx.EvalCondition(step.Condition) + if !resolved { + return fmt.Errorf("workflow: condition did not resolve to boolean: %q", step.Condition) + } + if ok { + return e.runSteps(ctx, step.TrueSteps, stepCtx, sess, config, events) + } + return e.runSteps(ctx, step.FalseSteps, stepCtx, sess, config, events) +} + +func (e *LocalExecutor) runParallelStep(ctx context.Context, stepID string, step *workflow.Step, stepCtx *workflow.WorkflowContext, sess *session.Session, config *workflow.Workflow, events chan Event) error { + if len(step.Steps) == 0 { + return nil + } + + var wg sync.WaitGroup + outputs := make(map[string]workflow.StepOutput) + order := make([]string, 0, len(step.Steps)) + var mu sync.Mutex + var firstErr error + + for i := range step.Steps { + child := &step.Steps[i] + childID := child.ID + if childID == "" { + childID = fmt.Sprintf("%s_%d", stepID, i) + } + order = append(order, childID) + stepCopy := *child + stepCopy.ID = childID + wg.Add(1) + go func(s *workflow.Step, id string) { + defer wg.Done() + + // Each parallel goroutine gets its own sub-session. + // PersistentRuntime skips all persistence for sub-sessions, + // avoiding concurrent SQLite writes. + subSess := e.buildSessionForStep(s, stepCtx, sess) + subSess.ParentID = sess.ID + + if err := e.runAgentStep(ctx, id, s, stepCtx, subSess, config, events); err != nil { + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() + return + } + if so, ok := stepCtx.GetOutput(id); ok { + mu.Lock() + outputs[id] = so + mu.Unlock() + } + }(&stepCopy, childID) + } + wg.Wait() + if firstErr != nil { + return firstErr + } + stepCtx.SetParallelOutput(stepID, &workflow.ParallelOutputs{Steps: outputs, Order: order}) + return nil +} + +func (e *LocalExecutor) runSubWorkflowStep(ctx context.Context, stepID string, step *workflow.Step, stepCtx *workflow.WorkflowContext, sess *session.Session, config *workflow.Workflow, events chan Event) error { + if step.Workflow == nil { + return fmt.Errorf("sub-workflow step %s has no workflow", stepID) + } + // Create a copy of the sub-workflow and add it to the context. + subWorkflow := *step.Workflow + // We don't need to set it in the context's data map - the executor will handle it. + // Run the sub-workflow's steps sequentially using the same logic as normal steps. + return e.runSteps(ctx, subWorkflow.Steps, stepCtx, sess, config, events) +} diff --git a/pkg/workflowrun/executor_test.go b/pkg/workflowrun/executor_test.go new file mode 100644 index 000000000..33ea86562 --- /dev/null +++ b/pkg/workflowrun/executor_test.go @@ -0,0 +1,109 @@ +package workflowrun + +import ( + "context" + "testing" + "time" + + "github.com/docker/docker-agent/pkg/chat" + "github.com/docker/docker-agent/pkg/runtime" + "github.com/docker/docker-agent/pkg/session" + "github.com/docker/docker-agent/pkg/workflow" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockRunner struct { + agentName string + output string +} + +func (m *mockRunner) CurrentAgentName() string { + return m.agentName +} + +func (m *mockRunner) SetCurrentAgent(agentName string) error { + m.agentName = agentName + return nil +} + +func (m *mockRunner) RunStream(ctx context.Context, sess *session.Session) <-chan runtime.Event { + ch := make(chan runtime.Event, 1) + + // Add an assistant message so GetLastAssistantMessageContent returns the output + sess.AddMessage(&session.Message{ + AgentName: m.agentName, + Message: chat.Message{ + Role: chat.MessageRoleAssistant, + Content: m.output, + }, + }) + + close(ch) + return ch +} + +func TestLocalExecutor_Run(t *testing.T) { + t.Setenv("HOME", t.TempDir()) + + runner := &mockRunner{output: "success"} + exec := NewLocalExecutor(runner) + + cfg := &workflow.Workflow{ + Steps: []workflow.Step{ + { + ID: "step1", + Type: workflow.StepTypeAgent, + Name: "test_agent", + }, + }, + } + + sess := session.New() + events := make(chan Event, 10) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + stepCtx, err := exec.Run(ctx, cfg, sess, events) + require.NoError(t, err) + require.NotNil(t, stepCtx) + + out, ok := stepCtx.GetOutput("step1") + assert.True(t, ok) + assert.Equal(t, "success", out.Output) +} + +func TestLocalExecutor_RunParallel(t *testing.T) { + t.Setenv("HOME", t.TempDir()) + runner := &mockRunner{output: "parallel_success"} + exec := NewLocalExecutor(runner) + + cfg := &workflow.Workflow{ + Steps: []workflow.Step{ + { + ID: "parallel_step", + Type: workflow.StepTypeParallel, + Steps: []workflow.Step{ + {ID: "sub1", Type: workflow.StepTypeAgent, Name: "agent1"}, + {ID: "sub2", Type: workflow.StepTypeAgent, Name: "agent2"}, + }, + }, + }, + } + + sess := session.New() + events := make(chan Event, 10) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + stepCtx, err := exec.Run(ctx, cfg, sess, events) + require.NoError(t, err) + require.NotNil(t, stepCtx) + + po, ok := stepCtx.GetParallelOutput("parallel_step") + require.True(t, ok) + require.Len(t, po.Steps, 2) + assert.Equal(t, "parallel_success", po.Steps["sub1"].Output) + assert.Equal(t, "parallel_success", po.Steps["sub2"].Output) +}