Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/packages/core/agent_framework/_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1968,6 +1968,9 @@ async def function_invocation_wrapper(

if response.conversation_id is not None:
_update_conversation_id(kwargs, response.conversation_id)
# Also update options dict so subsequent API calls use the new conversation_id
if options is not None:
options["conversation_id"] = response.conversation_id
prepped_messages = []

# we load the tools here, since middleware might have changed them compared to before calling func.
Expand Down Expand Up @@ -2173,6 +2176,9 @@ async def streaming_function_invocation_wrapper(
# In this case, we need to update kwargs with conversation id and also clear messages
if response.conversation_id is not None:
_update_conversation_id(kwargs, response.conversation_id)
# Also update options dict so subsequent API calls use the new conversation_id
if options is not None:
options["conversation_id"] = response.conversation_id
prepped_messages = []

# we load the tools here, since middleware might have changed them compared to before calling func.
Expand Down
150 changes: 150 additions & 0 deletions python/packages/core/tests/core/test_function_invocation_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2462,3 +2462,153 @@ def ai_func(arg1: str) -> str:

# Verify the second streaming response is still in the queue (wasn't consumed)
assert len(chat_client_base.streaming_responses) == 1


async def test_conversation_id_updated_in_options_between_tool_iterations():
"""Test that conversation_id is updated in options dict between tool invocation iterations.

This regression test ensures that when a tool call returns a new conversation_id,
subsequent API calls in the same function invocation loop use the updated conversation_id.
Without this fix, the old conversation_id would be used, causing "No tool call found"
errors when submitting tool results to APIs like OpenAI Responses.
"""
from collections.abc import AsyncIterable, MutableSequence
from typing import Any
from unittest.mock import patch

from agent_framework import (
BaseChatClient,
ChatMessage,
ChatResponse,
ChatResponseUpdate,
Content,
tool,
use_chat_middleware,
use_function_invocation,
)

# Track the conversation_id passed to each call
conversation_ids_received: list[str | None] = []
call_count = 0

@use_chat_middleware
class TrackingChatClient(BaseChatClient):
def __init__(self) -> None:
super().__init__()
self.run_responses: list[ChatResponse] = []

async def _inner_get_response(
self,
*,
messages: MutableSequence[ChatMessage],
options: dict[str, Any],
**kwargs: Any,
) -> ChatResponse:
nonlocal call_count
call_count += 1
# Track what conversation_id was passed
conversation_ids_received.append(options.get("conversation_id"))

if not self.run_responses:
return ChatResponse(messages=ChatMessage(role="assistant", text="done"))
return self.run_responses.pop(0)

async def _inner_get_streaming_response(
self,
*,
messages: MutableSequence[ChatMessage],
options: dict[str, Any],
**kwargs: Any,
) -> AsyncIterable[ChatResponseUpdate]:
nonlocal call_count
call_count += 1
# Track what conversation_id was passed
conversation_ids_received.append(options.get("conversation_id"))

if not self.run_responses:
yield ChatResponseUpdate(text="done", role="assistant")
return

response = self.run_responses.pop(0)
# Convert ChatResponse to streaming updates
for msg in response.messages:
yield ChatResponseUpdate(
contents=msg.contents,
role=msg.role.value if hasattr(msg.role, "value") else msg.role,
response_id=response.response_id,
conversation_id=response.conversation_id, # Include conversation_id in streaming updates
)

@tool(name="test_func", approval_mode="never_require")
def test_func(arg1: str) -> str:
return f"Result {arg1}"

# Test non-streaming: conversation_id should be updated after first response
with patch("agent_framework._tools.DEFAULT_MAX_ITERATIONS", 5):
client = use_function_invocation(TrackingChatClient)()

# First response returns a function call WITH a new conversation_id
# Second response (after tool execution) should receive the updated conversation_id
client.run_responses = [
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[Content.from_function_call(call_id="call_1", name="test_func", arguments='{"arg1": "v1"}')],
),
conversation_id="conv_after_first_call", # New conversation_id from API
),
ChatResponse(
messages=ChatMessage(role="assistant", text="done"),
conversation_id="conv_after_second_call",
),
]

# Start with initial conversation_id
await client.get_response(
"hello",
options={"tool_choice": "auto", "tools": [test_func], "conversation_id": "conv_initial"},
)

assert call_count == 2
# First call should receive the initial conversation_id
assert conversation_ids_received[0] == "conv_initial"
# Second call (after tool execution) MUST receive the updated conversation_id
assert conversation_ids_received[1] == "conv_after_first_call", (
"conversation_id should be updated in options after receiving new conversation_id from API"
)

# Test streaming version too
conversation_ids_received.clear()
call_count = 0

with patch("agent_framework._tools.DEFAULT_MAX_ITERATIONS", 5):
streaming_client = use_function_invocation(TrackingChatClient)()

streaming_client.run_responses = [
ChatResponse(
messages=ChatMessage(
role="assistant",
contents=[Content.from_function_call(call_id="call_2", name="test_func", arguments='{"arg1": "v2"}')],
),
conversation_id="stream_conv_after_first",
),
ChatResponse(
messages=ChatMessage(role="assistant", text="streaming done"),
conversation_id="stream_conv_after_second",
),
]

updates = []
async for update in streaming_client.get_streaming_response(
"hello",
options={"tool_choice": "auto", "tools": [test_func], "conversation_id": "stream_conv_initial"},
):
updates.append(update)

assert call_count == 2
# First call should receive the initial conversation_id
assert conversation_ids_received[0] == "stream_conv_initial"
# Second call (after tool execution) MUST receive the updated conversation_id
assert conversation_ids_received[1] == "stream_conv_after_first", (
"streaming: conversation_id should be updated in options after receiving new conversation_id from API"
)
Loading