-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Expand file tree
/
Copy pathagent_executor.py
More file actions
83 lines (64 loc) · 3.05 KB
/
agent_executor.py
File metadata and controls
83 lines (64 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# Copyright (c) Microsoft. All rights reserved.
"""AgentExecutor bridge between the a2a-sdk server and Agent Framework agents.
Implements the a2a-sdk ``AgentExecutor`` interface so that incoming A2A
requests are forwarded to an Agent Framework agent and the response is
published back through the a2a-sdk event queue.
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from a2a.helpers import new_task_from_user_message
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.server.tasks import TaskUpdater
from a2a.types import Part, TaskState
if TYPE_CHECKING:
from a2a.server.agent_execution.context import RequestContext
from a2a.server.events.event_queue import EventQueue
from agent_framework import Agent
class AgentFrameworkExecutor(AgentExecutor):
"""Bridges A2A protocol requests to an Agent Framework agent.
For each incoming ``execute`` call the executor:
1. Extracts the user's text from the A2A ``RequestContext``.
2. Runs the Agent Framework agent (non-streaming).
3. Publishes the result as an A2A ``Message`` to the ``EventQueue``.
"""
def __init__(self, agent: Agent) -> None:
self.agent = agent
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Run the agent and publish the response."""
user_text = context.get_user_input()
if not user_text:
user_text = "Hello"
# v1.0 requires a Task object in the queue before any TaskStatusUpdateEvent
task = context.current_task
if not task and context.message:
task = new_task_from_user_message(context.message)
await event_queue.enqueue_event(task)
task_id = task.id if task else context.task_id
updater = TaskUpdater(event_queue, task_id, context.context_id)
# Signal that the agent is working
await updater.start_work()
try:
response = await self.agent.run(user_text)
# Build response text from agent messages
response_parts: list[Part] = []
for msg in response.messages:
if msg.text:
response_parts.append(Part(text=msg.text))
if not response_parts:
response_parts.append(Part(text=str(response)))
# Publish the agent's response and mark as completed
await updater.complete(
message=updater.new_agent_message(response_parts),
)
except asyncio.CancelledError:
raise
except Exception as e:
await updater.update_status(
state=TaskState.TASK_STATE_FAILED,
message=updater.new_agent_message([Part(text=f"Agent error: {e}")]),
)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
"""Handle cancellation by publishing a canceled status."""
updater = TaskUpdater(event_queue, context.task_id, context.context_id)
await updater.update_status(state=TaskState.TASK_STATE_CANCELED)