Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codex-rs/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ serial_test = { workspace = true }
tempfile = { workspace = true }
test-log = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-subscriber = { workspace = true, features = ["json"] }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Refresh Bazel lockfile after enabling json feature

When this crate is built or tested through Bazel, the new tracing-subscriber json feature changes the Cargo dependency metadata, but MODULE.bazel.lock is not part of this commit. Regenerate and include the Bazel lockfile update so Bazel/CI does not reject the dependency drift.

AGENTS.md reference: AGENTS.md:L37-L39

Useful? React with 👍 / 👎.

tracing-test = { workspace = true, features = ["no-env-filter"] }
walkdir = { workspace = true }
wiremock = { workspace = true }
Expand Down
68 changes: 47 additions & 21 deletions codex-rs/core/src/agent/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::agent::registry::AgentRegistry;
use crate::agent::role::DEFAULT_ROLE_NAME;
use crate::agent::role::resolve_role_config;
use crate::agent::status::is_final;
use crate::agent_communication::AgentCommunicationContext;
use crate::agent_communication::AgentCommunicationKind;
use crate::codex_thread::ThreadConfigSnapshot;
use crate::config::Config;
use crate::config::RolloutBudgetConfig;
Expand Down Expand Up @@ -67,6 +69,7 @@ pub(crate) struct SpawnAgentOptions {
pub(crate) fork_mode: Option<SpawnAgentForkMode>,
pub(crate) parent_thread_id: Option<ThreadId>,
pub(crate) environments: Option<Vec<TurnEnvironmentSelection>>,
pub(crate) agent_communication_context: Option<AgentCommunicationContext>,
}

#[derive(Clone, Debug)]
Expand All @@ -76,6 +79,11 @@ pub(crate) struct LiveAgent {
pub(crate) status: AgentStatus,
}

enum SubmissionId<'a> {
Generated,
Provided(&'a str),
}

#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
pub(crate) struct ListedAgent {
pub(crate) agent_name: String,
Expand Down Expand Up @@ -144,28 +152,38 @@ impl AgentControl {
let state = self.upgrade()?;
self.ensure_execution_capacity_for_op(agent_id, &initial_operation)
.await?;
self.send_input_after_capacity_check(agent_id, &state, initial_operation)
.await
self.send_input_after_capacity_check(
agent_id,
&state,
initial_operation,
SubmissionId::Generated,
)
.await
}

async fn send_input_after_capacity_check(
&self,
agent_id: ThreadId,
state: &Arc<ThreadManagerState>,
initial_operation: Op,
submission_id: SubmissionId<'_>,
) -> CodexResult<String> {
let last_task_message = match &initial_operation {
Op::InterAgentCommunication { communication } => {
last_task_message_from_communication(communication)
}
_ => non_empty_task_message(render_input_preview(&initial_operation)),
};
let send_result = match submission_id {
SubmissionId::Generated => state.send_op(agent_id, initial_operation).await,
SubmissionId::Provided(id) => {
state
.send_op_with_id(agent_id, id.to_string(), initial_operation)
.await
}
};
let result = self
.handle_thread_request_result(
agent_id,
state,
state.send_op(agent_id, initial_operation).await,
)
.handle_thread_request_result(agent_id, state, send_result)
.await;
if result.is_ok() {
match last_task_message {
Expand All @@ -182,23 +200,23 @@ impl AgentControl {
&self,
agent_id: ThreadId,
communication: InterAgentCommunication,
context: AgentCommunicationContext,
) -> CodexResult<String> {
let last_task_message = last_task_message_from_communication(&communication);
crate::agent_communication::emit_agent_communication_created(
&context,
&communication,
agent_id,
);
let state = self.upgrade()?;
let op = Op::InterAgentCommunication { communication };
self.ensure_execution_capacity_for_op(agent_id, &op).await?;
let result = self
.handle_thread_request_result(agent_id, &state, state.send_op(agent_id, op).await)
.await;
if result.is_ok() {
match last_task_message {
Some(last_task_message) => self
.state
.update_last_task_message(agent_id, last_task_message),
None => self.state.clear_last_task_message(agent_id),
}
}
result
self.send_input_after_capacity_check(
agent_id,
&state,
op,
SubmissionId::Provided(context.id()),
)
.await
}

/// Interrupt the current task for an existing agent thread.
Expand Down Expand Up @@ -480,8 +498,16 @@ impl AgentControl {
message,
/*trigger_turn*/ false,
);
let communication_context = AgentCommunicationContext::without_source_call(
AgentCommunicationKind::Result,
child_thread_id,
);
let _ = control
.send_inter_agent_communication(parent_thread_id, communication)
.send_inter_agent_communication(
parent_thread_id,
communication,
communication_context,
)
.await;
return;
}
Expand Down
22 changes: 20 additions & 2 deletions codex-rs/core/src/agent/control/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,26 @@ impl AgentControl {
)
.await;

self.send_input_after_capacity_check(new_thread.thread_id, &state, initial_operation)
.await?;
if let (Some(context), Op::InterAgentCommunication { communication }) = (
options.agent_communication_context.as_ref(),
&initial_operation,
) {
crate::agent_communication::emit_agent_communication_created(
context,
communication,
new_thread.thread_id,
);
}
self.send_input_after_capacity_check(
new_thread.thread_id,
&state,
initial_operation,
match options.agent_communication_context.as_ref() {
Some(context) => SubmissionId::Provided(context.id()),
None => SubmissionId::Generated,
},
)
.await?;
if multi_agent_version != MultiAgentVersion::V2 {
let child_reference = agent_metadata
.agent_path
Expand Down
119 changes: 97 additions & 22 deletions codex-rs/core/src/agent/control_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::CodexThread;
use crate::StateDbHandle;
use crate::ThreadManager;
use crate::agent::agent_status_from_event;
use crate::agent_communication::AgentCommunicationKind;
use crate::config::AgentRoleConfig;
use crate::config::Config;
use crate::config::ConfigBuilder;
Expand Down Expand Up @@ -431,6 +432,63 @@ async fn send_input_errors_when_thread_missing() {
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
}

#[tokio::test]
async fn failed_communication_send_emits_created_without_enqueued() {
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::filter::Targets;
use tracing_subscriber::layer::SubscriberExt;
use tracing_test::internal::MockWriter;

let output: &'static std::sync::Mutex<Vec<u8>> =
Box::leak(Box::new(std::sync::Mutex::new(Vec::new())));
let subscriber = tracing_subscriber::registry()
.with(
Targets::new()
.with_default(LevelFilter::OFF)
.with_target("codex_core::agent_communication", LevelFilter::TRACE),
)
.with(
tracing_subscriber::fmt::layer()
.json()
.with_current_span(false)
.with_span_list(false)
.with_writer(MockWriter::new(output)),
);
let _guard = tracing::subscriber::set_default(subscriber);

let harness = AgentControlHarness::new().await;
let sender_thread_id = ThreadId::new();
let receiver_thread_id = ThreadId::new();
let context = crate::agent_communication::AgentCommunicationContext::from_tool_call(
AgentCommunicationKind::Message,
sender_thread_id,
"call-1",
);
let id = context.id().to_string();
let communication = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/missing").expect("agent path"),
Vec::new(),
"mail".to_string(),
/*trigger_turn*/ false,
);
let err = harness
.control
.send_inter_agent_communication(receiver_thread_id, communication, context)
.await
.expect_err("send should fail for missing receiver");
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == receiver_thread_id);

let events = String::from_utf8(output.lock().expect("buffer lock").clone())
.expect("JSON logs should be UTF-8")
.lines()
.map(|line| serde_json::from_str::<serde_json::Value>(line).expect("valid JSON log event"))
.collect::<Vec<_>>();
assert_eq!(events.len(), 1);
assert_eq!(events[0]["fields"]["communication_id"], id);
assert_eq!(events[0]["fields"]["state"], "created");
}

#[tokio::test]
async fn get_status_returns_not_found_for_missing_thread() {
let harness = AgentControlHarness::new().await;
Expand Down Expand Up @@ -529,12 +587,17 @@ async fn send_inter_agent_communication_without_turn_queues_message_without_trig
/*trigger_turn*/ false,
);

let context = crate::agent_communication::AgentCommunicationContext::without_source_call(
AgentCommunicationKind::Message,
ThreadId::new(),
);
let communication_id = context.id().to_string();
let submission_id = harness
.control
.send_inter_agent_communication(thread_id, communication.clone())
.send_inter_agent_communication(thread_id, communication.clone(), context)
.await
.expect("send_inter_agent_communication should succeed");
assert!(!submission_id.is_empty());
assert_eq!(submission_id, communication_id);

let expected = (
thread_id,
Expand Down Expand Up @@ -656,7 +719,14 @@ async fn ensure_v2_agent_loaded_reloads_registered_unloaded_agent() {
);
harness
.control
.send_inter_agent_communication(spawned_agent.thread_id, communication.clone())
.send_inter_agent_communication(
spawned_agent.thread_id,
communication.clone(),
crate::agent_communication::AgentCommunicationContext::without_source_call(
AgentCommunicationKind::Message,
ThreadId::new(),
),
)
.await
.expect("send_inter_agent_communication should succeed after reload");
let expected = (
Expand Down Expand Up @@ -803,7 +873,14 @@ async fn encrypted_inter_agent_communication_clears_existing_last_task_message()
);
harness
.control
.send_inter_agent_communication(spawned_agent.thread_id, communication)
.send_inter_agent_communication(
spawned_agent.thread_id,
communication,
crate::agent_communication::AgentCommunicationContext::without_source_call(
AgentCommunicationKind::Followup,
ThreadId::new(),
),
)
.await
.expect("send_inter_agent_communication should succeed");

Expand Down Expand Up @@ -2112,28 +2189,26 @@ async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
&AgentStatus::Completed(Some("done".to_string())),
)
.expect("completed status should render");
let expected = (
worker_thread_id,
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
tester_path.clone(),
worker_path.clone(),
Vec::new(),
expected_message.clone(),
/*trigger_turn*/ false,
),
},
let expected = InterAgentCommunication::new(
tester_path.clone(),
worker_path.clone(),
Vec::new(),
expected_message.clone(),
/*trigger_turn*/ false,
);

timeout(Duration::from_secs(5), async {
loop {
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
if captured == Some(expected.clone()) {
break;
for (thread_id, op) in harness.manager.captured_ops() {
if thread_id != worker_thread_id {
continue;
}
let Op::InterAgentCommunication { communication } = op else {
continue;
};
if communication == expected {
return;
}
}
sleep(Duration::from_millis(10)).await;
}
Expand Down
Loading
Loading