Skip to content

Conversation

@holtvogt
Copy link

Motivation and Context

Registering many executors currently needs repetitive register_executor calls. It solves this by adding a dict-based bulk registration method that reduces boilerplate. This is most useful in complex workflows with many executors.

Fixes: #3551

Description

Added a bulk executor registration API to WorkflowBuilder that accepts a dict[str, Callable[[], Executor]]. This reduces boilerplate for workflows that register many executors, while keeping the existing register_executor() API intact. Also added a small unit test covering the new bulk registration path and chaining behavior.

Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.

Copilot AI review requested due to automatic review settings January 31, 2026 21:04
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a bulk executor registration method to the Python WorkflowBuilder class to reduce boilerplate when registering multiple executors. The change addresses issue #3551 by providing a dict-based API as an alternative to multiple individual register_executor() calls.

Changes:

  • Added register_executors() method that accepts a dict[str, Callable[[], Executor]] for bulk registration
  • Added unit test test_register_executors_bulk() to verify the new method works correctly and supports method chaining

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
python/packages/core/agent_framework/_workflows/_workflow_builder.py Added register_executors() method with proper validation, documentation, and examples following existing patterns
python/packages/core/tests/workflow/test_workflow_builder.py Added basic test for bulk registration functionality and chaining behavior

Comment on lines +262 to +278
def test_register_executors_bulk():
"""Test bulk executor registration with a mapping of factories."""
builder = WorkflowBuilder()

result = builder.register_executors({
"ExecutorA": lambda: MockExecutor(id="ExecutorA"),
"ExecutorB": lambda: MockExecutor(id="ExecutorB"),
})

assert result is builder

workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build()

assert "ExecutorA" in workflow.executors
assert "ExecutorB" in workflow.executors
assert workflow.start_executor_id == "ExecutorA"

Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation validates all names for duplicates before adding any to the registry, which is good. However, dictionary iteration order is guaranteed in Python 3.7+, so if an error occurs during validation, the behavior is consistent. Consider adding a test case to verify the error handling behavior when attempting to register executors with duplicate names (e.g., mixing register_executor followed by register_executors, or register_executors with an already-registered name).

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. We can introduce a guard variable such as _uses_bulk_executor_registration and set it to True when register_executors() is used. The register_executor() method would then check this flag accordingly?

Comment on lines +262 to +278
def test_register_executors_bulk():
"""Test bulk executor registration with a mapping of factories."""
builder = WorkflowBuilder()

result = builder.register_executors({
"ExecutorA": lambda: MockExecutor(id="ExecutorA"),
"ExecutorB": lambda: MockExecutor(id="ExecutorB"),
})

assert result is builder

workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build()

assert "ExecutorA" in workflow.executors
assert "ExecutorB" in workflow.executors
assert workflow.start_executor_id == "ExecutorA"

Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a test case for edge cases such as an empty dictionary being passed to register_executors. This would verify the method handles this gracefully (which it currently does by simply returning self without error).

Copilot uses AI. Check for mistakes.

return self

def register_executors(self, executor_factories: dict[str, Callable[[], Executor]]) -> Self:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can replace the original register_executor API with this new one.

Copy link
Author

@holtvogt holtvogt Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay! Do we plan to add deprecation notes for the old approach, or are we considering a big bang removal of register_executor() entirely? The latter would be a larger change, since it would require touching more files.

@TaoChenOSU
Copy link
Contributor

Hi, thank you for your contribution!

This is actually something we wanted to improve too!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Python: Simplify executor registration with dict-based API

3 participants