-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Python: Add bulk executor registration method to WorkflowBuilder #3565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Python: Add bulk executor registration method to WorkflowBuilder #3565
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds a bulk executor registration method to the Python WorkflowBuilder class to reduce boilerplate when registering multiple executors. The change addresses issue #3551 by providing a dict-based API as an alternative to multiple individual register_executor() calls.
Changes:
- Added
register_executors()method that accepts adict[str, Callable[[], Executor]]for bulk registration - Added unit test
test_register_executors_bulk()to verify the new method works correctly and supports method chaining
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
python/packages/core/agent_framework/_workflows/_workflow_builder.py |
Added register_executors() method with proper validation, documentation, and examples following existing patterns |
python/packages/core/tests/workflow/test_workflow_builder.py |
Added basic test for bulk registration functionality and chaining behavior |
| def test_register_executors_bulk(): | ||
| """Test bulk executor registration with a mapping of factories.""" | ||
| builder = WorkflowBuilder() | ||
|
|
||
| result = builder.register_executors({ | ||
| "ExecutorA": lambda: MockExecutor(id="ExecutorA"), | ||
| "ExecutorB": lambda: MockExecutor(id="ExecutorB"), | ||
| }) | ||
|
|
||
| assert result is builder | ||
|
|
||
| workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build() | ||
|
|
||
| assert "ExecutorA" in workflow.executors | ||
| assert "ExecutorB" in workflow.executors | ||
| assert workflow.start_executor_id == "ExecutorA" | ||
|
|
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation validates all names for duplicates before adding any to the registry, which is good. However, dictionary iteration order is guaranteed in Python 3.7+, so if an error occurs during validation, the behavior is consistent. Consider adding a test case to verify the error handling behavior when attempting to register executors with duplicate names (e.g., mixing register_executor followed by register_executors, or register_executors with an already-registered name).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We can introduce a guard variable such as _uses_bulk_executor_registration and set it to True when register_executors() is used. The register_executor() method would then check this flag accordingly?
| def test_register_executors_bulk(): | ||
| """Test bulk executor registration with a mapping of factories.""" | ||
| builder = WorkflowBuilder() | ||
|
|
||
| result = builder.register_executors({ | ||
| "ExecutorA": lambda: MockExecutor(id="ExecutorA"), | ||
| "ExecutorB": lambda: MockExecutor(id="ExecutorB"), | ||
| }) | ||
|
|
||
| assert result is builder | ||
|
|
||
| workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build() | ||
|
|
||
| assert "ExecutorA" in workflow.executors | ||
| assert "ExecutorB" in workflow.executors | ||
| assert workflow.start_executor_id == "ExecutorA" | ||
|
|
Copilot
AI
Jan 31, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a test case for edge cases such as an empty dictionary being passed to register_executors. This would verify the method handles this gracefully (which it currently does by simply returning self without error).
|
|
||
| return self | ||
|
|
||
| def register_executors(self, executor_factories: dict[str, Callable[[], Executor]]) -> Self: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can replace the original register_executor API with this new one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay! Do we plan to add deprecation notes for the old approach, or are we considering a big bang removal of register_executor() entirely? The latter would be a larger change, since it would require touching more files.
|
Hi, thank you for your contribution! This is actually something we wanted to improve too! |
Motivation and Context
Registering many executors currently needs repetitive
register_executorcalls. It solves this by adding a dict-based bulk registration method that reduces boilerplate. This is most useful in complex workflows with many executors.Fixes: #3551
Description
Added a bulk executor registration API to
WorkflowBuilderthat accepts adict[str, Callable[[], Executor]]. This reduces boilerplate for workflows that register many executors, while keeping the existingregister_executor()API intact. Also added a small unit test covering the new bulk registration path and chaining behavior.Contribution Checklist