Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/AgentResponseEvent.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Agents.AI.Workflows;
Expand All @@ -19,6 +20,28 @@ public AgentResponseEvent(string executorId, AgentResponse response) : base(resp
this.Response = Throw.IfNull(response);
}

/// <summary>
/// Initializes a new instance of the <see cref="AgentResponseEvent"/> class with the given output tag.
/// </summary>
/// <param name="executorId">The identifier of the executor that generated this event.</param>
/// <param name="response">The agent response.</param>
/// <param name="tag">The output tag to associate with this event.</param>
public AgentResponseEvent(string executorId, AgentResponse response, OutputTag tag) : base(response, executorId, tag)
{
this.Response = Throw.IfNull(response);
}

/// <summary>
/// Initializes a new instance of the <see cref="AgentResponseEvent"/> class with the given output tags.
/// </summary>
/// <param name="executorId">The identifier of the executor that generated this event.</param>
/// <param name="response">The agent response.</param>
/// <param name="tags">The output tags to associate with this event. May be <see langword="null"/> or empty.</param>
public AgentResponseEvent(string executorId, AgentResponse response, IEnumerable<OutputTag>? tags) : base(response, executorId, tags)
{
this.Response = Throw.IfNull(response);
}

/// <summary>
/// Gets the agent response.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@ public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update) :
this.Update = Throw.IfNull(update);
}

/// <summary>
/// Initializes a new instance of the <see cref="AgentResponseUpdateEvent"/> class with the given output tag.
/// </summary>
/// <param name="executorId">The identifier of the executor that generated this event.</param>
/// <param name="update">The agent run response update.</param>
/// <param name="tag">The output tag to associate with this event.</param>
public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update, OutputTag tag) : base(update, executorId, tag)
{
this.Update = Throw.IfNull(update);
}

/// <summary>
/// Initializes a new instance of the <see cref="AgentResponseUpdateEvent"/> class with the given output tags.
/// </summary>
/// <param name="executorId">The identifier of the executor that generated this event.</param>
/// <param name="update">The agent run response update.</param>
/// <param name="tags">The output tags to associate with this event. May be <see langword="null"/> or empty.</param>
public AgentResponseUpdateEvent(string executorId, AgentResponseUpdate update, IEnumerable<OutputTag>? tags) : base(update, executorId, tags)
{
this.Update = Throw.IfNull(update);
}

/// <summary>
/// Gets the agent run response update.
/// </summary>
Expand Down
93 changes: 35 additions & 58 deletions dotnet/src/Microsoft.Agents.AI.Workflows/AgentWorkflowBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows.Specialized;
using Microsoft.Extensions.AI;
using Microsoft.Shared.Diagnostics;

Expand Down Expand Up @@ -37,31 +34,10 @@ private static Workflow BuildSequentialCore(string? workflowName, params IEnumer
{
Throw.IfNullOrEmpty(agents);

// Create a builder that chains the agents together in sequence. The workflow simply begins
// with the first agent in the sequence.

AIAgentHostOptions options = new()
{
ReassignOtherAgentsAsUsers = true,
ForwardIncomingMessages = true,
};

List<ExecutorBinding> agentExecutors = agents.Select(agent => agent.BindAsExecutor(options)).ToList();

ExecutorBinding previous = agentExecutors[0];
WorkflowBuilder builder = new(previous);

foreach (ExecutorBinding next in agentExecutors.Skip(1))
{
builder.AddEdge(previous, next);
previous = next;
}

OutputMessagesExecutor end = new();
builder = builder.AddEdge(previous, end).WithOutputFrom(end);
SequentialWorkflowBuilder builder = new(agents);
if (workflowName is not null)
{
builder = builder.WithName(workflowName);
builder.WithName(workflowName);
}
return builder.Build();
}
Expand Down Expand Up @@ -107,41 +83,14 @@ private static Workflow BuildConcurrentCore(
{
Throw.IfNull(agents);

// A workflow needs a starting executor, so we create one that forwards everything to each agent.
ChatForwardingExecutor start = new("Start");
WorkflowBuilder builder = new(start);

// For each agent, we create an executor to host it and an accumulator to batch up its output messages,
// so that the final accumulator receives a single list of messages from each agent. Otherwise, the
// accumulator would not be able to determine what came from what agent, as there's currently no
// provenance tracking exposed in the workflow context passed to a handler.

ExecutorBinding[] agentExecutors = (from agent in agents
select agent.BindAsExecutor(new AIAgentHostOptions() { ReassignOtherAgentsAsUsers = true })).ToArray();
ExecutorBinding[] accumulators = [.. from agent in agentExecutors select (ExecutorBinding)new AggregateTurnMessagesExecutor($"Batcher/{agent.Id}")];
builder.AddFanOutEdge(start, agentExecutors);

for (int i = 0; i < agentExecutors.Length; i++)
ConcurrentWorkflowBuilder builder = new(agents);
if (workflowName is not null)
{
builder.AddEdge(agentExecutors[i], accumulators[i]);
builder.WithName(workflowName);
}

// Create the accumulating executor that will gather the results from each agent, and connect
// each agent's accumulator to it. If no aggregation function was provided, we default to returning
// the last message from each agent
aggregator ??= static lists => (from list in lists where list.Count > 0 select list.Last()).ToList();

Func<string, string, ValueTask<ConcurrentEndExecutor>> endFactory =
(_, __) => new(new ConcurrentEndExecutor(agentExecutors.Length, aggregator));

ExecutorBinding end = endFactory.BindExecutor(ConcurrentEndExecutor.ExecutorId);

builder.AddFanInBarrierEdge(accumulators, end);

builder = builder.WithOutputFrom(end);
if (workflowName is not null)
if (aggregator is not null)
{
builder = builder.WithName(workflowName);
builder.WithAggregator(aggregator);
}
return builder.Build();
}
Expand Down Expand Up @@ -179,4 +128,32 @@ public static GroupChatWorkflowBuilder CreateGroupChatBuilderWith(Func<IReadOnly
Throw.IfNull(managerFactory);
return new GroupChatWorkflowBuilder(managerFactory);
}

/// <summary>Creates a new <see cref="SequentialWorkflowBuilder"/> with the given pipeline of <paramref name="agents"/>.</summary>
/// <param name="agents">The sequence of agents to compose into a sequential workflow.</param>
/// <returns>The builder for creating a sequential workflow.</returns>
public static SequentialWorkflowBuilder CreateSequentialBuilderWith(params IEnumerable<AIAgent> agents)
{
Throw.IfNull(agents);
return new SequentialWorkflowBuilder(agents);
}

/// <summary>Creates a new <see cref="ConcurrentWorkflowBuilder"/> with the given participating <paramref name="agents"/>.</summary>
/// <param name="agents">The set of agents to compose into a concurrent workflow.</param>
/// <returns>The builder for creating a concurrent workflow.</returns>
public static ConcurrentWorkflowBuilder CreateConcurrentBuilderWith(params IEnumerable<AIAgent> agents)
{
Throw.IfNull(agents);
return new ConcurrentWorkflowBuilder(agents);
}

/// <summary>Creates a new <see cref="MagenticWorkflowBuilder"/> with the given <paramref name="managerAgent"/>.</summary>
/// <param name="managerAgent">The LLM-powered manager agent that coordinates the team.</param>
/// <returns>The builder for creating a Magentic workflow.</returns>
[Experimental(DiagnosticConstants.ExperimentalFeatureDiagnostic)]
public static MagenticWorkflowBuilder CreateMagenticBuilderWith(AIAgent managerAgent)
{
Throw.IfNull(managerAgent);
return new MagenticWorkflowBuilder(managerAgent);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json.Serialization;
Expand All @@ -15,14 +16,14 @@ internal WorkflowInfo(
Dictionary<string, List<EdgeInfo>> edges,
HashSet<RequestPortInfo> requestPorts,
string startExecutorId,
HashSet<string>? outputExecutorIds)
Dictionary<string, HashSet<OutputTag>>? outputExecutorIds)
{
this.Executors = Throw.IfNull(executors);
this.Edges = Throw.IfNull(edges);
this.RequestPorts = Throw.IfNull(requestPorts);

this.StartExecutorId = Throw.IfNullOrEmpty(startExecutorId);
this.OutputExecutorIds = outputExecutorIds ?? [];
this.OutputExecutorIds = outputExecutorIds ?? new Dictionary<string, HashSet<OutputTag>>(StringComparer.Ordinal);
}

public Dictionary<string, ExecutorInfo> Executors { get; }
Expand All @@ -32,7 +33,15 @@ internal WorkflowInfo(
public TypeId? InputType { get; }
public string StartExecutorId { get; }

public HashSet<string> OutputExecutorIds { get; }
/// <summary>
/// Map of executor id to the set of <see cref="OutputTag"/>s under which the executor is registered.
/// An empty set means the executor is registered as a regular (untagged) output source.
/// JSON shape: <c>{ "executorId": ["intermediate"], ... }</c>. Legacy payloads using the
/// older <c>string[]</c> shape are read by <see cref="WorkflowInfoOutputExecutorsConverter"/> and
/// each id is treated as registered with an empty tag set.
/// </summary>
[JsonConverter(typeof(WorkflowInfoOutputExecutorsConverter))]
public Dictionary<string, HashSet<OutputTag>> OutputExecutorIds { get; }

public bool IsMatch(Workflow workflow)
{
Expand Down Expand Up @@ -80,9 +89,12 @@ public bool IsMatch(Workflow workflow)
return false;
}

// Validate the outputs
// Validate the outputs (key set + tag set per id must match)
if (workflow.OutputExecutors.Count != this.OutputExecutorIds.Count ||
this.OutputExecutorIds.Any(id => !workflow.OutputExecutors.Contains(id)))
this.OutputExecutorIds.Any(kvp =>
!workflow.OutputExecutors.TryGetValue(kvp.Key, out HashSet<OutputTag>? tags) ||
tags.Count != kvp.Value.Count ||
!tags.SetEquals(kvp.Value)))
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace Microsoft.Agents.AI.Workflows.Checkpointing;

/// <summary>
/// JSON converter for <see cref="WorkflowInfo.OutputExecutorIds"/> that supports both the new
/// map shape (<c>{ "id": ["intermediate"] }</c>) and the legacy array shape
/// (<c>["id1", "id2"]</c>). Legacy-shaped payloads are read as if every id had been registered
/// as a regular (untagged) output source; output is always written in the new map shape.
/// </summary>
internal sealed class WorkflowInfoOutputExecutorsConverter : JsonConverter<Dictionary<string, HashSet<OutputTag>>>
{
public override Dictionary<string, HashSet<OutputTag>> Read(
ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
Dictionary<string, HashSet<OutputTag>> result = new(StringComparer.Ordinal);

if (reader.TokenType == JsonTokenType.Null)
{
return result;
}

if (reader.TokenType == JsonTokenType.StartArray)
{
// Legacy shape: a flat array of executor ids. Treat each as a registered
// (untagged) output executor.
while (reader.Read())
{
if (reader.TokenType == JsonTokenType.EndArray)
{
return result;
}

if (reader.TokenType != JsonTokenType.String)
{
throw new JsonException($"Expected a string in legacy outputExecutorIds array, got {reader.TokenType}.");
}

string id = reader.GetString()!;
result[id] = [];
}

throw new JsonException("Unexpected end of legacy outputExecutorIds array.");
}

if (reader.TokenType != JsonTokenType.StartObject)
{
throw new JsonException($"Expected object or array for outputExecutorIds, got {reader.TokenType}.");
}

while (reader.Read())
{
if (reader.TokenType == JsonTokenType.EndObject)
{
return result;
}

if (reader.TokenType != JsonTokenType.PropertyName)
{
throw new JsonException($"Expected property name in outputExecutorIds object, got {reader.TokenType}.");
}

string id = reader.GetString()!;
reader.Read();

HashSet<OutputTag> tags = [];
if (reader.TokenType == JsonTokenType.StartArray)
{
while (reader.Read() && reader.TokenType != JsonTokenType.EndArray)
{
if (reader.TokenType != JsonTokenType.String)
{
throw new JsonException($"Expected a string tag, got {reader.TokenType}.");
}

tags.Add(ReadTag(reader.GetString()!));
}
}
else
{
throw new JsonException($"Expected array of tags for outputExecutorIds[{id}], got {reader.TokenType}.");
}

result[id] = tags;
}

throw new JsonException("Unexpected end of outputExecutorIds object.");
}

private static OutputTag ReadTag(string value)
{
if (string.Equals(value, OutputTag.Intermediate.Value, StringComparison.Ordinal))
{
return OutputTag.Intermediate;
}
return new OutputTag(value);
}

public override void Write(
Utf8JsonWriter writer,
Dictionary<string, HashSet<OutputTag>> value,
JsonSerializerOptions options)
{
writer.WriteStartObject();
foreach (KeyValuePair<string, HashSet<OutputTag>> kvp in value)
{
writer.WritePropertyName(kvp.Key);
writer.WriteStartArray();
foreach (OutputTag tag in kvp.Value)
{
writer.WriteStringValue(tag.Value);
}
writer.WriteEndArray();
}
writer.WriteEndObject();
}
}
Loading
Loading