From 06bcd9538d0aa1598addde00f2452bb883f4cbec Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sun, 28 Jun 2026 16:17:58 +0000 Subject: [PATCH 01/17] security: fix authorization bypass via middleware cache --- cmd/sam-node/middleware.go | 2 +- cmd/sam-node/middleware_test.go | 80 +++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/cmd/sam-node/middleware.go b/cmd/sam-node/middleware.go index 174d7dc..0c26cbc 100644 --- a/cmd/sam-node/middleware.go +++ b/cmd/sam-node/middleware.go @@ -130,7 +130,7 @@ func (n *SamNode) WithBiscuitAuth(next func(network.Stream, RequestContext)) net // Check verification cache tokenHash := sha256.Sum256(authFrame.Biscuit) - hashStr := hex.EncodeToString(tokenHash[:]) + ":" + remotePeer.String() + hashStr := hex.EncodeToString(tokenHash[:]) + ":" + remotePeer.String() + ":" + reqCtx.Protocol + ":" + reqCtx.Target if pubKeyStr, ok := n.verificationCache.Get(hashStr); ok { n.keysMu.RLock() diff --git a/cmd/sam-node/middleware_test.go b/cmd/sam-node/middleware_test.go index 0ad1e74..bc5814e 100644 --- a/cmd/sam-node/middleware_test.go +++ b/cmd/sam-node/middleware_test.go @@ -508,3 +508,83 @@ func TestVerifyBiscuitCache(t *testing.T) { t.Fatal("Expected verifyBiscuit to FAIL after key rotation, but it succeeded") } } + +func TestAuthorizationCacheBypass(t *testing.T) { + pub, priv, err := ed25519.GenerateKey(nil) + if err != nil { + t.Fatal(err) + } + + dummyPeer := peer.ID("dummy-peer-id") + + // Mint token allowing ONLY query_db + builder := biscuit.NewBuilder(priv) + _ = builder.AddAuthorityFact(biscuit.Fact{Predicate: biscuit.Predicate{ + Name: "node", + IDs: []biscuit.Term{biscuit.String(dummyPeer.String())}, + }}) + _ = builder.AddAuthorityFact(biscuit.Fact{Predicate: biscuit.Predicate{ + Name: "client_peer_id", + IDs: []biscuit.Term{biscuit.String(dummyPeer.String())}, + }}) + _ = builder.AddAuthorityFact(biscuit.Fact{Predicate: biscuit.Predicate{ + Name: "allow_mcp_server", + IDs: []biscuit.Term{biscuit.String("query_db")}, + }}) + + b, _ := builder.Build() + tokenBytes, _ := b.Serialize() + + cache, _ := lru.New[string, string](10) + revCache, _ := lru.New[string, int64](10) + rl, _ := NewPeerRateLimiter(100) + node := &SamNode{ + trustedKeys: []TrustedKey{{Key: pub, ReceivedAt: time.Now()}}, + verificationCache: cache, + revokedPeers: revCache, + rateLimiter: rl, + TrustHubRBAC: true, + } + + // Helper to simulate request + doRequest := func(target string) bool { + pr1, pw1 := io.Pipe() + pr2, pw2 := io.Pipe() + serverStream := &mockStream{r: pr1, w: pw2, protocol: protocol.ID("mcp"), conn: &mockConn{remotePeer: dummyPeer}} + + done := make(chan bool, 1) + go func() { + handler := node.WithBiscuitAuth(func(s network.Stream, reqCtx RequestContext) { + done <- true + }) + handler(serverStream) + close(done) + }() + + writer := msgio.NewVarintWriter(pw1) + authFrame := &api.AuthFrame{Biscuit: tokenBytes, TargetService: target} + data, _ := proto.Marshal(authFrame) + _ = writer.WriteMsg(data) + + reader := msgio.NewVarintReaderSize(pr2, 1024*64) + msg, err := reader.ReadMsg() + if err != nil { + return false + } + var resp api.AuthResponse + _ = proto.Unmarshal(msg, &resp) + + success := <-done + return resp.Success && success + } + + // 1. Authorized target should succeed + if !doRequest("query_db") { + t.Fatal("Expected authorized target to succeed") + } + + // 2. Unauthorized target with the same token should FAIL + if doRequest("reboot_server") { + t.Fatal("SECURITY BUG: Unauthorized target succeeded due to cache bypass!") + } +} From 09c62268ffff7136a5b735412bc6f88014c228e9 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Sun, 28 Jun 2026 16:18:23 +0000 Subject: [PATCH 02/17] security: remove unauthenticated inline re-enrollment fallback --- cmd/sam-node/middleware.go | 48 -------------------------------------- 1 file changed, 48 deletions(-) diff --git a/cmd/sam-node/middleware.go b/cmd/sam-node/middleware.go index 0c26cbc..b72a2e2 100644 --- a/cmd/sam-node/middleware.go +++ b/cmd/sam-node/middleware.go @@ -15,13 +15,10 @@ package main import ( - "context" "crypto/ed25519" "crypto/sha256" "encoding/hex" "fmt" - "os" - "strings" "github.com/biscuit-auth/biscuit-go/v2" "github.com/biscuit-auth/biscuit-go/v2/datalog" @@ -180,52 +177,7 @@ func (n *SamNode) WithBiscuitAuth(next func(network.Stream, RequestContext)) net } } - if !authorized { - logger.Infof("[Auth] All keys failed, triggering re-enrollment fallback for %s", remotePeer) - var jwtStr string - var err error - if oidcIssuerFlag != "" { - tokenURL, err := n.DiscoverTokenURL(context.Background(), oidcIssuerFlag) - if err != nil { - logger.Errorf("[Auth] Failed to discover OIDC endpoints for fallback: %v", err) - } else { - jwtStr, err = n.FetchJWT(context.Background(), tokenURL, clientIDFlag, clientSecretFlag) - if err != nil { - logger.Errorf("[Auth] Failed to fetch JWT for fallback: %v", err) - } - } - } else if jwtPathFlag != "" { - data, err := os.ReadFile(jwtPathFlag) - if err != nil { - logger.Errorf("[Auth] Failed to read JWT file for fallback: %v", err) - } else { - jwtStr = strings.TrimSpace(string(data)) - } - } - - if jwtStr != "" { - err = n.Enroll(context.Background(), jwtStr) - if err != nil { - logger.Errorf("[Auth] Fallback enrollment failed: %v", err) - } else { - // Retry authorization with new keys - n.keysMu.RLock() - keys = n.trustedKeys - n.keysMu.RUnlock() - - for _, pubKey := range keys { - logger.Infof("[Auth] Retrying with key: %x", pubKey.Key) - if err := n.Authorize(authFrame.Biscuit, reqCtx, pubKey.Key); err == nil { - authorized = true - break - } else { - lastErr = err - } - } - } - } - } if !authorized { logger.Warnf("[Auth] AuthZ Denied %s: %v", remotePeer, lastErr) From 89a7d95b8103457109f363608f26bd581feb7eb8 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 08:04:57 +0000 Subject: [PATCH 03/17] security: address PR review feedback --- cmd/sam-node/middleware.go | 4 +- cmd/sam-node/middleware_test.go | 3 +- tests/integration/fallback_test.go | 204 +---------------------------- 3 files changed, 4 insertions(+), 207 deletions(-) diff --git a/cmd/sam-node/middleware.go b/cmd/sam-node/middleware.go index b72a2e2..3d638d4 100644 --- a/cmd/sam-node/middleware.go +++ b/cmd/sam-node/middleware.go @@ -127,7 +127,7 @@ func (n *SamNode) WithBiscuitAuth(next func(network.Stream, RequestContext)) net // Check verification cache tokenHash := sha256.Sum256(authFrame.Biscuit) - hashStr := hex.EncodeToString(tokenHash[:]) + ":" + remotePeer.String() + ":" + reqCtx.Protocol + ":" + reqCtx.Target + hashStr := hex.EncodeToString(tokenHash[:]) + "\x00" + remotePeer.String() + "\x00" + reqCtx.Protocol + "\x00" + reqCtx.Target if pubKeyStr, ok := n.verificationCache.Get(hashStr); ok { n.keysMu.RLock() @@ -177,8 +177,6 @@ func (n *SamNode) WithBiscuitAuth(next func(network.Stream, RequestContext)) net } } - - if !authorized { logger.Warnf("[Auth] AuthZ Denied %s: %v", remotePeer, lastErr) resp := &api.AuthResponse{Success: false, Error: "Authorization failed"} diff --git a/cmd/sam-node/middleware_test.go b/cmd/sam-node/middleware_test.go index bc5814e..e811e38 100644 --- a/cmd/sam-node/middleware_test.go +++ b/cmd/sam-node/middleware_test.go @@ -565,6 +565,7 @@ func TestAuthorizationCacheBypass(t *testing.T) { authFrame := &api.AuthFrame{Biscuit: tokenBytes, TargetService: target} data, _ := proto.Marshal(authFrame) _ = writer.WriteMsg(data) + pw1.Close() reader := msgio.NewVarintReaderSize(pr2, 1024*64) msg, err := reader.ReadMsg() @@ -573,7 +574,7 @@ func TestAuthorizationCacheBypass(t *testing.T) { } var resp api.AuthResponse _ = proto.Unmarshal(msg, &resp) - + success := <-done return resp.Success && success } diff --git a/tests/integration/fallback_test.go b/tests/integration/fallback_test.go index e3e06d8..8e03776 100644 --- a/tests/integration/fallback_test.go +++ b/tests/integration/fallback_test.go @@ -19,7 +19,7 @@ import ( "context" "crypto/ed25519" "encoding/json" - "fmt" + "os" "os/exec" "path/filepath" @@ -28,17 +28,14 @@ import ( "testing" "time" - "github.com/biscuit-auth/biscuit-go/v2" "github.com/biscuit-auth/biscuit-go/v2/parser" "github.com/google/sam/api" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio" - "github.com/multiformats/go-multiaddr" "google.golang.org/protobuf/proto" "io" "net/http" @@ -52,205 +49,6 @@ func init() { _, _ = parser.FromStringFact(`warmup("cache")`) } -func TestFallbackReEnrollment(t *testing.T) { - nodeBin := buildBinary(t, "./cmd/sam-node") - - // 1. Generate Keys for Biscuit - pubA, _, _ := ed25519.GenerateKey(nil) - pubB, privB, _ := ed25519.GenerateKey(nil) - - // 2. Generate Client Peer Identity BEFORE starting any libp2p hosts - // This avoids data race with FromStringFact later! - clientPriv, clientPub, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) - if err != nil { - t.Fatal(err) - } - clientID, err := peer.IDFromPublicKey(clientPub) - if err != nil { - t.Fatal(err) - } - - // 3. Create Biscuit token signed by Key B (using pre-computed clientID) - builder := biscuit.NewBuilder(privB) - - nodeFact, _ := parser.FromStringFact(fmt.Sprintf(`node("%s")`, clientID)) - if err := builder.AddAuthorityFact(nodeFact); err != nil { - t.Fatal(err) - } - - clientPeerFact, _ := parser.FromStringFact(fmt.Sprintf(`client_peer_id("%s")`, clientID)) - if err := builder.AddAuthorityFact(clientPeerFact); err != nil { - t.Fatal(err) - } - - toolFact, _ := parser.FromStringFact(`allow_mcp_server("*")`) - if err := builder.AddAuthorityFact(toolFact); err != nil { - t.Fatal(err) - } - - b, _ := builder.Build() - - // Verify it in test - authorizer, err := b.Authorizer(pubB) - if err != nil { - t.Fatal(err) - } - policy, _ := parser.FromStringPolicy("allow if true") - authorizer.AddPolicy(policy) - if err := authorizer.Authorize(); err != nil { - t.Fatalf("Token generated in test is invalid: %v", err) - } - - tokenBytes, _ := b.Serialize() - - // 4. Start Mock Hub with dynamic key response - _, hubAddr := startMockHubDynamic(t, pubA, pubB) - - tmpHome := t.TempDir() - env := append(os.Environ(), - "HOME="+tmpHome, - "XDG_CONFIG_HOME="+filepath.Join(tmpHome, ".config"), - ) - - // Create a dummy JWT file for fallback - jwtPath := filepath.Join(tmpHome, "jwt.txt") - if err := os.WriteFile(jwtPath, []byte("dummy-jwt"), 0644); err != nil { - t.Fatal(err) - } - - // 5. Run Node in background - cmd := exec.Command(nodeBin, "run", "--trust-hub-rbac", "--hub", hubAddr, "--listen", "/ip4/127.0.0.1/tcp/0", "--jwt-path", jwtPath, "--bind-addr", "127.0.0.1:0", "--api-token", "dummy-token", "--allow-loopback") - cmd.Dir = repoRoot(t) - cmd.Env = env - - var stdout safeBuffer - var stderr safeBuffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr - - if err := cmd.Start(); err != nil { - t.Fatal(err) - } - defer func() { - _ = cmd.Process.Kill() - }() - - // Wait for it to be ready (print address) - var out string - for i := 0; i < 50; i++ { - out = stdout.String() + stderr.String() - if strings.Contains(out, "Listening on:") { - break - } - time.Sleep(100 * time.Millisecond) - } - - if !strings.Contains(out, "Listening on:") { - t.Fatalf("Node failed to start or didn't print address in time.\nOutput:\n%s", out) - } - - // 6. Extract Node address - var nodeAddr string - lines := strings.Split(out, "\n") - for _, line := range lines { - if strings.Contains(line, "Listening on:") { - idx := strings.Index(line, "[") - idx2 := strings.Index(line, "]") - if idx != -1 && idx2 != -1 { - addrsStr := line[idx+1 : idx2] - addrs := strings.Split(addrsStr, " ") - for _, a := range addrs { - if strings.Contains(a, "/tcp/") { - nodeAddr = a - break - } - } - } - } - } - - if nodeAddr == "" { - t.Fatalf("Failed to extract node address from output:\n%s", out) - } - - // Extract Peer ID - var nodePeerID string - for _, line := range lines { - if strings.Contains(line, "PeerID:") { - parts := strings.Split(line, " ") - if len(parts) >= 2 { - nodePeerID = strings.TrimSpace(parts[len(parts)-1]) - } - } - } - - if nodePeerID == "" { - t.Fatalf("Failed to extract node PeerID from output:\n%s", out) - } - - nodeFullAddr := nodeAddr + "/p2p/" + nodePeerID - - // 7. Start client host with the pre-generated identity - clientHost, err := libp2p.New( - libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), - libp2p.Identity(clientPriv), - ) - if err != nil { - t.Fatal(err) - } - defer func() { _ = clientHost.Close() }() - - targetAddr, err := multiaddr.NewMultiaddr(nodeFullAddr) - if err != nil { - t.Fatal(err) - } - targetInfo, err := peer.AddrInfoFromP2pAddr(targetAddr) - if err != nil { - t.Fatal(err) - } - - if err := clientHost.Connect(context.Background(), *targetInfo); err != nil { - t.Fatal(err) - } - - // 8. Connect to Node via libp2p on api.MCPProtocolID - s, err := clientHost.NewStream(context.Background(), targetInfo.ID, api.MCPProtocolID) - if err != nil { - t.Fatal(err) - } - defer func() { _ = s.Close() }() - - // 9. Send AuthFrame with token B - writer := msgio.NewVarintWriter(s) - authFrame := &api.AuthFrame{Biscuit: tokenBytes} - authFrameBytes, _ := proto.Marshal(authFrame) - if err := writer.WriteMsg(authFrameBytes); err != nil { - t.Fatal(err) - } - - // 10. Read response - reader := msgio.NewVarintReaderSize(s, 1024*64) - respMsg, err := reader.ReadMsg() - if err != nil { - t.Fatal(err) - } - defer reader.ReleaseMsg(respMsg) - - var resp api.AuthResponse - if err := proto.Unmarshal(respMsg, &resp); err != nil { - t.Fatal(err) - } - - if !resp.Success { - t.Fatalf("Auth failed: %s\nNode Output:\n%s", resp.Error, stdout.String()+stderr.String()) - } - - // Verify log shows fallback was triggered - out = stdout.String() + stderr.String() - if !strings.Contains(out, "triggering re-enrollment fallback") { - t.Errorf("Expected log to contain fallback trigger message\nNode Output:\n%s", out) - } -} func startMockHubDynamic(t *testing.T, pubA, pubB ed25519.PublicKey) (peer.ID, string) { t.Helper() From 9d30770beae7a74eefff8a7236543f3ca0294fa2 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 08:24:45 +0000 Subject: [PATCH 04/17] Fix linter --- cmd/sam-node/middleware_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/sam-node/middleware_test.go b/cmd/sam-node/middleware_test.go index e811e38..99da71a 100644 --- a/cmd/sam-node/middleware_test.go +++ b/cmd/sam-node/middleware_test.go @@ -565,7 +565,7 @@ func TestAuthorizationCacheBypass(t *testing.T) { authFrame := &api.AuthFrame{Biscuit: tokenBytes, TargetService: target} data, _ := proto.Marshal(authFrame) _ = writer.WriteMsg(data) - pw1.Close() + pw1.Close() //nolint:errcheck reader := msgio.NewVarintReaderSize(pr2, 1024*64) msg, err := reader.ReadMsg() From 0c2d920147302454a7ce8dacc1544b7c84dd456c Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 08:31:57 +0000 Subject: [PATCH 05/17] Fix linter and flake --- cmd/sam-node/middleware_test.go | 1 + tests/integration/fallback_test.go | 90 +----------------------------- 2 files changed, 4 insertions(+), 87 deletions(-) diff --git a/cmd/sam-node/middleware_test.go b/cmd/sam-node/middleware_test.go index 99da71a..a2f0f51 100644 --- a/cmd/sam-node/middleware_test.go +++ b/cmd/sam-node/middleware_test.go @@ -544,6 +544,7 @@ func TestAuthorizationCacheBypass(t *testing.T) { revokedPeers: revCache, rateLimiter: rl, TrustHubRBAC: true, + BiscuitTimeout: 500 * time.Millisecond, } // Helper to simulate request diff --git a/tests/integration/fallback_test.go b/tests/integration/fallback_test.go index 8e03776..72760aa 100644 --- a/tests/integration/fallback_test.go +++ b/tests/integration/fallback_test.go @@ -16,7 +16,6 @@ package integration_test import ( "bytes" - "context" "crypto/ed25519" "encoding/json" @@ -28,18 +27,16 @@ import ( "testing" "time" + "net/http" + "net/http/httptest" + "github.com/biscuit-auth/biscuit-go/v2/parser" "github.com/google/sam/api" "github.com/libp2p/go-libp2p" - dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio" "google.golang.org/protobuf/proto" - "io" - "net/http" - "net/http/httptest" ) // init forces the biscuit-go parser to build its underlying participle @@ -49,87 +46,6 @@ func init() { _, _ = parser.FromStringFact(`warmup("cache")`) } - -func startMockHubDynamic(t *testing.T, pubA, pubB ed25519.PublicKey) (peer.ID, string) { - t.Helper() - - h, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) - if err != nil { - t.Fatalf("failed to create mock libp2p host: %v", err) - } - - h.SetStreamHandler(api.AuthProtocolID, func(s network.Stream) { - defer func() { _ = s.Close() }() - reader := msgio.NewVarintReaderSize(s, 1024*64) - msg, err := reader.ReadMsg() - if err != nil { - return - } - defer reader.ReleaseMsg(msg) - - writer := msgio.NewVarintWriter(s) - resp := &api.AuthResponse{Success: true} - respBytes, _ := proto.Marshal(resp) - _ = writer.WriteMsg(respBytes) - }) - - kdht, err := dht.New(context.Background(), h, dht.Mode(dht.ModeServer), dht.ProtocolPrefix("/sam")) - if err != nil { - t.Fatalf("failed to create DHT on mock hub: %v", err) - } - - var callCount int - mux := http.NewServeMux() - mux.HandleFunc("/register", func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - body, err := io.ReadAll(r.Body) - if err != nil { - http.Error(w, "Failed to read body", http.StatusBadRequest) - return - } - var req api.EnrollRequest - if err := proto.Unmarshal(body, &req); err != nil { - http.Error(w, "Invalid request format", http.StatusBadRequest) - return - } - - callCount++ - var pub []byte - if callCount == 1 { - pub = pubA - } else { - pub = pubB - } - - resp := &api.EnrollResponse{ - BiscuitToken: []byte("mock-biscuit-token"), - HubPublicKey: pub, - HubAddresses: []string{h.Addrs()[0].String() + "/p2p/" + h.ID().String()}, - } - data, err := proto.Marshal(resp) - if err != nil { - http.Error(w, "Failed to marshal response", http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/x-protobuf") - w.WriteHeader(http.StatusOK) - _, _ = w.Write(data) - }) - - httpServer := httptest.NewServer(mux) - - t.Cleanup(func() { - httpServer.Close() - _ = kdht.Close() - _ = h.Close() - }) - - return h.ID(), httpServer.URL -} - type safeBuffer struct { mu sync.Mutex buf bytes.Buffer From 59e0b70be01be5064f4da03d302b7320a18e4fc9 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 08:54:06 +0000 Subject: [PATCH 06/17] debug kind mesh failure --- .github/workflows/kind-mesh-e2e.yml | 34 ++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/.github/workflows/kind-mesh-e2e.yml b/.github/workflows/kind-mesh-e2e.yml index 0e787c6..86e702e 100644 --- a/.github/workflows/kind-mesh-e2e.yml +++ b/.github/workflows/kind-mesh-e2e.yml @@ -82,14 +82,36 @@ jobs: echo "== discover services / find calculator (poll) ==" peer="" - for _ in $(seq 1 30); do - tools=$(mcp -tool find_remote_tools -args '{}' || true) - peer=$(printf '%s' "$tools" | jq -r '.[]? | select(.tool_name=="calculator.add") | .peer_id' 2>/dev/null | head -n1) - [ -n "$peer" ] && break + for i in $(seq 1 90); do + tools="$(mcp -tool discover_services -args '{}' 2>/dev/null || true)" + echo "discover attempt $i: $tools" + + peer="$(printf '%s' "$tools" \ + | jq -r '.[]? | select(.tool_name=="calculator.add") | .peer_id' 2>/dev/null \ + | head -n1)" + + if [ -n "$peer" ]; then + echo "calculator.add discovered on peer: $peer (attempt $i)" + break + fi + + # periodic mesh debug + if [ $((i % 10)) -eq 0 ]; then + echo "mesh info (attempt $i):" + mcp -tool get_mesh_info -args '{}' || true + fi + sleep 1 done - [ -n "$peer" ] || { echo "calculator.add not discovered"; exit 1; } - echo "calculator host: $peer" + + [ -n "$peer" ] || { + echo "calculator.add not discovered after 90s" + echo "final discover_services:" + mcp -tool discover_services -args '{}' || true + echo "final mesh info:" + mcp -tool get_mesh_info -args '{}' || true + exit 1 + } echo "== call calculator.add(2,3) ==" result=$(mcp -tool call_remote_tool \ From b93c7397cbec31c1e416d6c80b3127ef8ff7d5c2 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 09:08:28 +0000 Subject: [PATCH 07/17] feat: Implement unauthenticated HTTP MCP server and remove stdio transport - Move MCP endpoints to listen cleanly on /mcp - Fallback to unauthenticated HTTP sidecar when running without identity - Add get_login_instructions tool and help_user_login prompt for graceful failure - Remove --stdio flag and StdioTransport logic - Fix E2E test timeout race condition in policy.bats - Update all documentation and test cases to use the new /mcp endpoint --- .github/k8s/sam-node-cop-template.yaml | 2 +- .github/k8s/sam-node-openclaw-template.yaml | 4 +- .github/workflows/kind-mesh-e2e.yml | 2 +- cmd/mcp-client/main.go | 2 +- cmd/sam-node/main.go | 19 ++++- cmd/sam-node/mcp.go | 71 +++++++++++++++++-- cmd/sam-node/mcp_handlers_test.go | 4 +- cmd/sam-node/mcp_test.go | 6 +- cmd/sam-node/sidecar.go | 44 ++++++++++++ development/kind/run.sh | 2 +- sam-mcp-python/examples/gemini_agent.py | 2 +- sam-mcp-python/src/sam_mcp/client.py | 2 +- sam-mcp-python/test_client.py | 2 +- .../docs/development/testnet-validation.md | 8 +-- site/content/docs/integrations/_index.md | 4 +- site/content/docs/integrations/claude-code.md | 4 +- .../docs/integrations/claude-desktop.md | 2 +- site/content/docs/integrations/gemini.md | 2 +- site/content/docs/integrations/openclaw.md | 4 +- site/content/docs/quickstart.md | 8 +-- site/content/docs/snippets/agent_demo.py | 2 +- .../docs/snippets/banana_bot_playground.py | 4 +- site/content/docs/user/agent-usage.md | 2 +- tests/e2e/datapath.bats | 2 +- tests/e2e/docs_snippets.bats | 2 +- tests/e2e/find_remote_tools.bats | 16 ++--- tests/e2e/lib/container_mesh.bash | 10 +-- tests/e2e/policy.bats | 8 +-- tests/e2e/revocation_test.bats | 4 +- tests/e2e/sam.bats | 29 ++++++-- tests/e2e/services.bats | 4 +- tests/integration/catalog_test.go | 4 +- tests/integration/federation_test.go | 4 +- tests/integration/p2p_test.go | 2 +- tests/integration/pubsub_test.go | 2 +- 35 files changed, 214 insertions(+), 75 deletions(-) diff --git a/.github/k8s/sam-node-cop-template.yaml b/.github/k8s/sam-node-cop-template.yaml index e2730f6..73c9466 100644 --- a/.github/k8s/sam-node-cop-template.yaml +++ b/.github/k8s/sam-node-cop-template.yaml @@ -46,7 +46,7 @@ spec: python -u /app/banana_bot_playground.py env: - name: SAM_MCP_URL - value: "http://127.0.0.1:8080/mcp/events" + value: "http://127.0.0.1:8080/mcp" - name: SAM_API_TOKEN value: "secret-token" - name: GEMINI_API_KEY diff --git a/.github/k8s/sam-node-openclaw-template.yaml b/.github/k8s/sam-node-openclaw-template.yaml index 2f02ce3..0ce717d 100644 --- a/.github/k8s/sam-node-openclaw-template.yaml +++ b/.github/k8s/sam-node-openclaw-template.yaml @@ -27,7 +27,7 @@ data: "servers": { "sam-mesh": { "enabled": true, - "url": "http://127.0.0.1:8080/mcp/events" + "url": "http://127.0.0.1:8080/mcp" } } } @@ -74,7 +74,7 @@ spec: ) >/dev/null 2>&1 /dev/null; } echo "== get_mesh_info ==" diff --git a/cmd/mcp-client/main.go b/cmd/mcp-client/main.go index d4dab67..c7e44bf 100644 --- a/cmd/mcp-client/main.go +++ b/cmd/mcp-client/main.go @@ -72,7 +72,7 @@ func main() { } // Construct URL - baseURL := strings.TrimSuffix(*serverURL, "/mcp/events") + baseURL := strings.TrimSuffix(*serverURL, "/mcp") baseURL = strings.TrimSuffix(baseURL, "/") if !strings.Contains(baseURL, "/sam/service/discover") { baseURL = baseURL + "/sam/service/discover" diff --git a/cmd/sam-node/main.go b/cmd/sam-node/main.go index 7991d17..9c8b915 100644 --- a/cmd/sam-node/main.go +++ b/cmd/sam-node/main.go @@ -182,7 +182,20 @@ func main() { if jwtStr == "" { token, _ := store.LoadIdentity() if len(token) == 0 { - logger.Fatal("No JWT or stored identity found. Cannot authenticate.") + displayHub := hubAddr + if displayHub == "" { + if h, err := store.LoadHubURL(); err == nil && h != "" { + displayHub = h + } else { + displayHub = "https://bananas.sam-mesh.dev" + } + } + logger.Infof("No identity found. Starting unauthenticated sidecar for enrollment over MCP...") + if err := startUnauthSidecarServer(displayHub, bindAddrFlag, tlsCertFlag, tlsKeyFlag); err != nil { + logger.Fatalf("Failed to start unauthenticated sidecar: %v", err) + } + <-ctx.Done() + return } logger.Infoln("Using stored identity.") @@ -554,7 +567,7 @@ func resolveDataDir() string { func getOrGenerateKey(s *Store) crypto.PrivKey { kb, _ := s.LoadKey() if len(kb) == 0 { - fmt.Println("[Store] Generating new Peer Identity...") + logger.Info("[Store] Generating new Peer Identity...") priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) if err != nil { logger.Fatalf("Failed to generate key: %v", err) @@ -667,6 +680,6 @@ func (n *SamNode) Enroll(ctx context.Context, jwt string) error { return fmt.Errorf("failed to connect and authenticate with any hub after HTTP enrollment (last error: %v)", lastAuthErr) } - fmt.Println("Successfully enrolled via HTTP and stored identity and hub config.") + logger.Info("Successfully enrolled via HTTP and stored identity and hub config.") return nil } diff --git a/cmd/sam-node/mcp.go b/cmd/sam-node/mcp.go index 461cfc4..6790efa 100644 --- a/cmd/sam-node/mcp.go +++ b/cmd/sam-node/mcp.go @@ -46,9 +46,8 @@ Other useful tools: discover_remote_services browses services by type, get_mesh_ Remote tool names are namespaced as '.' (e.g. 'code-reviewer.review_pr'). Prefer discovering and describing a tool before calling it rather than guessing arguments.` -// NewMCPHandler creates a new HTTP handler for the MCP server using the official SDK. -func NewMCPHandler(node *SamNode) http.Handler { - // Create an MCP server. +// NewMCPServer creates a new MCP server instance with all tools registered. +func NewMCPServer(node *SamNode) *mcp.Server { mcpServer := mcp.NewServer(&mcp.Implementation{ Name: "sam-node-mcp", Version: "0.1.0", @@ -144,14 +143,76 @@ func NewMCPHandler(node *SamNode) http.Handler { Description: "Returns the last few lines of the node's log output.", }, node.handleGetRecentLogs) + return mcpServer +} + +// NewUnauthenticatedMCPServer creates a minimal MCP server that instructs the client on how to authenticate. +func NewUnauthenticatedMCPServer(hubURL string) *mcp.Server { + mcpServer := mcp.NewServer(&mcp.Implementation{ + Name: "sam-node-mcp-unauth", + Version: "0.1.0", + }, &mcp.ServerOptions{Instructions: "This node is unauthenticated. Use the get_login_instructions tool or help_user_login prompt."}) + + mcp.AddTool(mcpServer, &mcp.Tool{ + Name: "get_login_instructions", + Description: "Get instructions on how to authenticate this node so it can join the SAM mesh.", + }, func(ctx context.Context, req *mcp.CallToolRequest, params map[string]any) (*mcp.CallToolResult, any, error) { + msg := fmt.Sprintf("The node is unauthenticated. Please open a regular terminal and run:\n\n sam-node join %s\n\nOnce complete, restart this MCP client.", hubURL) + return &mcp.CallToolResult{ + Content: []mcp.Content{ + &mcp.TextContent{Text: msg}, + }, + }, nil, nil + }) + + mcpServer.AddPrompt(&mcp.Prompt{ + Name: "help_user_login", + Description: "Provides the user with login instructions when the node is unauthenticated.", + }, func(ctx context.Context, req *mcp.GetPromptRequest) (*mcp.GetPromptResult, error) { + return &mcp.GetPromptResult{ + Description: "Instructions for joining the SAM mesh.", + Messages: []*mcp.PromptMessage{ + { + Role: "user", + Content: &mcp.TextContent{ + Text: fmt.Sprintf("Please open a terminal and run:\n\n`sam-node join %s`\n\nAfter you finish, please restart this MCP connection.", hubURL), + }, + }, + }, + }, nil + }) + + return mcpServer +} + +// NewUnauthenticatedMCPHandler creates an HTTP handler for the unauthenticated MCP server. +func NewUnauthenticatedMCPHandler(hubURL string) http.Handler { + mcpServer := NewUnauthenticatedMCPServer(hubURL) + + sseHandler := mcp.NewSSEHandler(func(request *http.Request) *mcp.Server { + return mcpServer + }, nil) + + mux := http.NewServeMux() + mux.Handle("/mcp", sseHandler) + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logger.Debugf("Unauth MCP Request: %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr) + mux.ServeHTTP(w, r) + }) +} + +// NewMCPHandler creates a new HTTP handler for the MCP server using the official SDK. +func NewMCPHandler(node *SamNode) http.Handler { + mcpServer := NewMCPServer(node) + // Create the SSE handler using the SDK sseHandler := mcp.NewSSEHandler(func(request *http.Request) *mcp.Server { return mcpServer }, nil) mux := http.NewServeMux() - mux.Handle("/mcp/events", sseHandler) - mux.Handle("/mcp/message", sseHandler) + mux.Handle("/mcp", sseHandler) // Wrap in logging middleware to debug incoming requests wrappedHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/cmd/sam-node/mcp_handlers_test.go b/cmd/sam-node/mcp_handlers_test.go index 998e0ad..05db489 100644 --- a/cmd/sam-node/mcp_handlers_test.go +++ b/cmd/sam-node/mcp_handlers_test.go @@ -439,7 +439,7 @@ func TestNewMCPHandler_RegistersFindRemoteTools(t *testing.T) { defer srv.Close() client := mcp.NewClient(&mcp.Implementation{Name: "tc", Version: "0.0.1"}, nil) - transport := &mcp.SSEClientTransport{Endpoint: srv.URL + "/mcp/events"} + transport := &mcp.SSEClientTransport{Endpoint: srv.URL + "/mcp"} session, err := client.Connect(ctx, transport, nil) if err != nil { t.Fatalf("connect: %v", err) @@ -703,7 +703,7 @@ func TestNewMCPHandler_RegistersDescribeRemoteTool(t *testing.T) { defer srv.Close() client := mcp.NewClient(&mcp.Implementation{Name: "tc", Version: "0.0.1"}, nil) - transport := &mcp.SSEClientTransport{Endpoint: srv.URL + "/mcp/events"} + transport := &mcp.SSEClientTransport{Endpoint: srv.URL + "/mcp"} session, err := client.Connect(ctx, transport, nil) if err != nil { t.Fatalf("connect: %v", err) diff --git a/cmd/sam-node/mcp_test.go b/cmd/sam-node/mcp_test.go index 71ed7b7..0fe4464 100644 --- a/cmd/sam-node/mcp_test.go +++ b/cmd/sam-node/mcp_test.go @@ -53,8 +53,8 @@ func TestMCPHandler_HTTP(t *testing.T) { t.Errorf("Expected status NotFound on root, got %d", resp.StatusCode) } - // Test GET on /mcp/events - req2, err := http.NewRequest("GET", ts.URL+"/mcp/events", nil) + // Test GET on /mcp + req2, err := http.NewRequest("GET", ts.URL+"/mcp", nil) if err != nil { t.Fatal(err) } @@ -66,7 +66,7 @@ func TestMCPHandler_HTTP(t *testing.T) { defer func() { _ = resp2.Body.Close() }() if resp2.StatusCode != http.StatusOK && resp2.StatusCode != http.StatusBadRequest { - t.Errorf("Expected status OK or BadRequest on /mcp/events, got %d", resp2.StatusCode) + t.Errorf("Expected status OK or BadRequest on /mcp, got %d", resp2.StatusCode) } } diff --git a/cmd/sam-node/sidecar.go b/cmd/sam-node/sidecar.go index dfabc65..9d08169 100644 --- a/cmd/sam-node/sidecar.go +++ b/cmd/sam-node/sidecar.go @@ -116,6 +116,50 @@ func startSidecarServer(node *SamNode, addr, token, certFile, keyFile, caFile st return nil } +func startUnauthSidecarServer(hubURL, addr, certFile, keyFile string) error { + mux := http.NewServeMux() + + // Public endpoints + mux.HandleFunc("/healthz", handleHealthz) + mux.HandleFunc("/readyz", handleReadyz) + + // Mount Unauthenticated MCP handler + mcpHandler := NewUnauthenticatedMCPHandler(hubURL) + mux.Handle("/", mcpHandler) + + server := &http.Server{ + Handler: mux, + } + + listener, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", addr, err) + } + + actualAddr := listener.Addr().String() + + if (certFile != "") != (keyFile != "") { + return fmt.Errorf("both --tls-cert and --tls-key must be provided to enable TLS") + } + + if certFile != "" && keyFile != "" { + logger.Infof("Starting Unauthenticated MCP server on TCP address %s (with TLS Sidecar)", actualAddr) + go func() { + if err := server.ServeTLS(listener, certFile, keyFile); err != nil && err != http.ErrServerClosed { + logger.Errorf("Unauth Sidecar API server error: %v", err) + } + }() + } else { + logger.Infof("Starting Unauthenticated MCP server on TCP address %s", actualAddr) + go func() { + if err := server.Serve(listener); err != nil && err != http.ErrServerClosed { + logger.Errorf("Unauth Sidecar API server error: %v", err) + } + }() + } + return nil +} + func handleHealthz(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) if _, err := w.Write([]byte("OK")); err != nil { diff --git a/development/kind/run.sh b/development/kind/run.sh index 5fabb83..4bd6a12 100755 --- a/development/kind/run.sh +++ b/development/kind/run.sh @@ -140,7 +140,7 @@ echo echo "Mesh up. To call a node's MCP API, port-forward it in another shell, e.g.:" echo " kubectl --context ${KCTX} -n ${NAMESPACE} port-forward deploy/node-a 9091:8080" echo "then:" -echo " ./bin/mcp-client -url http://127.0.0.1:9091/mcp/events -tool find_remote_tools -args '{}'" +echo " ./bin/mcp-client -url http://127.0.0.1:9091/mcp -tool find_remote_tools -args '{}'" if [[ "${1:-}" != "-s" ]]; then show_cluster_logs diff --git a/sam-mcp-python/examples/gemini_agent.py b/sam-mcp-python/examples/gemini_agent.py index abac6c3..7a2bb43 100644 --- a/sam-mcp-python/examples/gemini_agent.py +++ b/sam-mcp-python/examples/gemini_agent.py @@ -26,7 +26,7 @@ def to_gemini_schema(mcp_schema: dict) -> dict: async def run_agent(): # 1. Connect to local SAM Node - # By default, connects to http://localhost:8080/mcp/events + # By default, connects to http://localhost:8080/mcp print("Connecting to local SAM Node...") client = SamClient() try: diff --git a/sam-mcp-python/src/sam_mcp/client.py b/sam-mcp-python/src/sam_mcp/client.py index 09ad8ed..f210ec5 100644 --- a/sam-mcp-python/src/sam_mcp/client.py +++ b/sam-mcp-python/src/sam_mcp/client.py @@ -9,7 +9,7 @@ class SamClient: def __init__(self, server_url: Optional[str] = None, token: Optional[str] = None): if server_url is None: - server_url = os.environ.get("SAM_MCP_URL", "http://localhost:8080/mcp/events") + server_url = os.environ.get("SAM_MCP_URL", "http://localhost:8080/mcp") if token is None: token = os.environ.get("SAM_API_TOKEN") self.server_url = server_url diff --git a/sam-mcp-python/test_client.py b/sam-mcp-python/test_client.py index 0944457..d8eeb97 100644 --- a/sam-mcp-python/test_client.py +++ b/sam-mcp-python/test_client.py @@ -4,7 +4,7 @@ from sam_mcp.client import SamClient async def main(): - url = os.environ.get("SAM_MCP_URL", "http://sam-node-1:8080/mcp/events") + url = os.environ.get("SAM_MCP_URL", "http://sam-node-1:8080/mcp") print(f"Connecting to {url}") try: async with SamClient(server_url=url) as client: diff --git a/site/content/docs/development/testnet-validation.md b/site/content/docs/development/testnet-validation.md index 620de7a..53e995f 100644 --- a/site/content/docs/development/testnet-validation.md +++ b/site/content/docs/development/testnet-validation.md @@ -66,23 +66,23 @@ You can invoke control plane tools on your local `sam-node` to locate, query, an ### List Local Control Plane Tools ```bash -bin/mcp-client -url http://localhost:8080/mcp/events -token secret-token -list +bin/mcp-client -url http://localhost:8080/mcp -token secret-token -list ``` ### Find Remote Tools on a Specific Peer ```bash -bin/mcp-client -url http://localhost:8080/mcp/events -token secret-token -tool find_remote_tools -args '{"peer_id":"12D3KooWKquLDsMiFc5BXsaHmVoJLDDGddqEWbVYYCpUnHR9u6RR"}' +bin/mcp-client -url http://localhost:8080/mcp -token secret-token -tool find_remote_tools -args '{"peer_id":"12D3KooWKquLDsMiFc5BXsaHmVoJLDDGddqEWbVYYCpUnHR9u6RR"}' ``` This returns the list of available tools (e.g., `everything.get-sum`, `everything.echo`). ### Inspect Tool Schemas ```bash -bin/mcp-client -url http://localhost:8080/mcp/events -token secret-token -tool describe_remote_tool -args '{"peer_id":"12D3KooWKquLDsMiFc5BXsaHmVoJLDDGddqEWbVYYCpUnHR9u6RR", "tool_name":"everything.get-sum"}' +bin/mcp-client -url http://localhost:8080/mcp -token secret-token -tool describe_remote_tool -args '{"peer_id":"12D3KooWKquLDsMiFc5BXsaHmVoJLDDGddqEWbVYYCpUnHR9u6RR", "tool_name":"everything.get-sum"}' ``` ### Call the Remote Tool ```bash -bin/mcp-client -url http://localhost:8080/mcp/events -token secret-token -tool call_remote_tool -args '{"peer_id":"12D3KooWKquLDsMiFc5BXsaHmVoJLDDGddqEWbVYYCpUnHR9u6RR", "tool_name":"everything.get-sum", "arguments":{"a":37.5, "b":5.2}}' +bin/mcp-client -url http://localhost:8080/mcp -token secret-token -tool call_remote_tool -args '{"peer_id":"12D3KooWKquLDsMiFc5BXsaHmVoJLDDGddqEWbVYYCpUnHR9u6RR", "tool_name":"everything.get-sum", "arguments":{"a":37.5, "b":5.2}}' ``` Response: ```text diff --git a/site/content/docs/integrations/_index.md b/site/content/docs/integrations/_index.md index c0c5215..2b6ade2 100644 --- a/site/content/docs/integrations/_index.md +++ b/site/content/docs/integrations/_index.md @@ -47,7 +47,7 @@ from sam_mcp.client import SamClient async def main(): # Connect to the local SAM node's MCP SSE endpoint # By default, sam-node listens at 127.0.0.1:8080 - url = os.environ.get("SAM_MCP_URL", "http://127.0.0.1:8080/mcp/events") + url = os.environ.get("SAM_MCP_URL", "http://127.0.0.1:8080/mcp") print(f"Connecting to SAM Node at {url}") try: @@ -79,7 +79,7 @@ if __name__ == "__main__": When you run the demo while a `sam-node` is running locally, you'll see output similar to this: ``` -Connecting to SAM Node at http://127.0.0.1:8080/mcp/events +Connecting to SAM Node at http://127.0.0.1:8080/mcp Discovered tools: - get_mesh_info: Get information about the mesh network - call_remote_tool: Call an MCP tool on a remote agent diff --git a/site/content/docs/integrations/claude-code.md b/site/content/docs/integrations/claude-code.md index c22fea1..d13d27d 100644 --- a/site/content/docs/integrations/claude-code.md +++ b/site/content/docs/integrations/claude-code.md @@ -20,7 +20,7 @@ Register the node as an SSE MCP server. Replace `` with your `--api- ```bash claude mcp add --transport sse p2p-mesh-node \ - http://localhost:8080/mcp/events \ + http://localhost:8080/mcp \ --header "Authorization: Bearer " ``` @@ -33,7 +33,7 @@ Alternatively, add it to a project `.mcp.json` directly: "mcpServers": { "p2p-mesh-node": { "type": "sse", - "url": "http://localhost:8080/mcp/events", + "url": "http://localhost:8080/mcp", "headers": { "Authorization": "Bearer " } diff --git a/site/content/docs/integrations/claude-desktop.md b/site/content/docs/integrations/claude-desktop.md index f86520d..79c1ab2 100644 --- a/site/content/docs/integrations/claude-desktop.md +++ b/site/content/docs/integrations/claude-desktop.md @@ -33,7 +33,7 @@ Add the node through the `mcp-remote` bridge (replace `` with your ` "args": [ "mcp-remote", "--sse", - "http://localhost:8080/mcp/events", + "http://localhost:8080/mcp", "--allow-http", "--header", "Authorization: Bearer " diff --git a/site/content/docs/integrations/gemini.md b/site/content/docs/integrations/gemini.md index bc3a764..3a06181 100644 --- a/site/content/docs/integrations/gemini.md +++ b/site/content/docs/integrations/gemini.md @@ -49,7 +49,7 @@ python3 examples/gemini_agent.py ``` Upon starting, the script will: -1. Connect to the local SAM node's MCP server at `http://localhost:8080/mcp/events`. +1. Connect to the local SAM node's MCP server at `http://localhost:8080/mcp`. 2. Discover all tools currently available in the mesh. 3. Map the mesh tools to Gemini-compatible OpenAPI schemas. 4. Spin up a chat loop with Gemini (`gemini-2.5-flash`). diff --git a/site/content/docs/integrations/openclaw.md b/site/content/docs/integrations/openclaw.md index 8a752ba..e8603d0 100644 --- a/site/content/docs/integrations/openclaw.md +++ b/site/content/docs/integrations/openclaw.md @@ -16,7 +16,7 @@ To bridge your local `sam-node` into your OpenClaw agent runtime, use the `openc # Add your local sam-node as an MCP server # Replace with the token used in --api-token openclaw mcp set p2p-mesh-node '{ - "url": "http://localhost:8080/mcp/events", + "url": "http://localhost:8080/mcp", "transport": "sse", "headers": { "Authorization": "Bearer " @@ -54,6 +54,6 @@ Because these tools are surfaced automatically, no remote tool needs to be regis ## Troubleshooting -* Connection Issues: Ensure `sam-node` is reachable at the configured URL (default `http://localhost:8080/mcp/events`). +* Connection Issues: Ensure `sam-node` is reachable at the configured URL (default `http://localhost:8080/mcp`). * Authentication: Double-check that the `Authorization` header matches the `--api-token` provided to your `sam-node`. * Gateway Status: Use `openclaw status` to confirm the gateway is running and the MCP bridge is active. diff --git a/site/content/docs/quickstart.md b/site/content/docs/quickstart.md index 128344e..c12e28f 100644 --- a/site/content/docs/quickstart.md +++ b/site/content/docs/quickstart.md @@ -95,14 +95,14 @@ Your SAM node exposes a standard Model Context Protocol (MCP) server. The easies Query the list of tools available on your local node (e.g. peer discovery, message broadcast, and remote tool execution): ```bash -mcp-client -url http://localhost:8080/mcp/events -token my-secret-token -list +mcp-client -url http://localhost:8080/mcp -token my-secret-token -list ``` ### Discover Remote Services in the Mesh List active MCP services currently registered across the public mesh network: ```bash -mcp-client -url http://localhost:8080/mcp/events \ +mcp-client -url http://localhost:8080/mcp \ -tool discover_remote_services \ -args '{"type":"mcp"}' ``` @@ -111,7 +111,7 @@ mcp-client -url http://localhost:8080/mcp/events \ Using a `peer_id` returned from the service discovery, find the tools available on that peer: ```bash -mcp-client -url http://localhost:8080/mcp/events \ +mcp-client -url http://localhost:8080/mcp \ -tool find_remote_tools \ -args '{"peer_id":""}' ``` @@ -120,7 +120,7 @@ mcp-client -url http://localhost:8080/mcp/events \ Call a tool hosted on a remote peer through your local node's P2P stream reverse proxy: ```bash -mcp-client -url http://localhost:8080/mcp/events \ +mcp-client -url http://localhost:8080/mcp \ -tool call_remote_tool \ -args '{"peer_id":"","tool_name":"everything.get-sum","arguments":{"a":12.5,"b":7.5}}' ``` diff --git a/site/content/docs/snippets/agent_demo.py b/site/content/docs/snippets/agent_demo.py index 83bf128..260038c 100644 --- a/site/content/docs/snippets/agent_demo.py +++ b/site/content/docs/snippets/agent_demo.py @@ -6,7 +6,7 @@ async def main(): # Connect to the local SAM node's MCP SSE endpoint # By default, sam-node listens at 127.0.0.1:8080 - url = os.environ.get("SAM_MCP_URL", "http://127.0.0.1:8080/mcp/events") + url = os.environ.get("SAM_MCP_URL", "http://127.0.0.1:8080/mcp") print(f"Connecting to SAM Node at {url}") try: diff --git a/site/content/docs/snippets/banana_bot_playground.py b/site/content/docs/snippets/banana_bot_playground.py index aea7296..039d5d7 100644 --- a/site/content/docs/snippets/banana_bot_playground.py +++ b/site/content/docs/snippets/banana_bot_playground.py @@ -12,7 +12,7 @@ class SamClient: """Inlined SAM Client for self-contained execution.""" def __init__(self, server_url: Optional[str] = None, token: Optional[str] = None): if server_url is None: - server_url = os.environ.get("SAM_MCP_URL", "http://localhost:8080/mcp/events") + server_url = os.environ.get("SAM_MCP_URL", "http://localhost:8080/mcp") if token is None: token = os.environ.get("SAM_API_TOKEN") self.server_url = server_url @@ -323,7 +323,7 @@ async def poll_chat_messages(client: SamClient): async def run_banana_bot(): api_key = os.environ.get("GEMINI_API_KEY") - url = os.environ.get("SAM_MCP_URL", "http://127.0.0.1:8080/mcp/events") + url = os.environ.get("SAM_MCP_URL", "http://127.0.0.1:8080/mcp") token = os.environ.get("SAM_API_TOKEN", "secret-token") if not api_key: diff --git a/site/content/docs/user/agent-usage.md b/site/content/docs/user/agent-usage.md index 6bbc34d..fdf2af0 100644 --- a/site/content/docs/user/agent-usage.md +++ b/site/content/docs/user/agent-usage.md @@ -88,7 +88,7 @@ Your AI agent connects to the node's local MCP server. The local server translat ### Exposing the API The local MCP endpoint is served via **HTTP Server-Sent Events (SSE)** at: -`http://127.0.0.1:8080/mcp/events` +`http://127.0.0.1:8080/mcp` ### Authentication When configuring your agent client, you must pass the API token in the headers: diff --git a/tests/e2e/datapath.bats b/tests/e2e/datapath.bats index fa31fd3..6aefab6 100644 --- a/tests/e2e/datapath.bats +++ b/tests/e2e/datapath.bats @@ -47,7 +47,7 @@ teardown() { # Explicitly connect Node 1 to Node 2 (DHT auto-discovery is slow/unreliable in this E2E setup) echo "[$(date +%T)] Explicitly connecting Node 1 to Node 2" local node2_addr="/dns4/sam-node-2/tcp/5002/p2p/${node2_peer_id}" - run docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-1:8080/mcp/events" -tool "connect_peer" -args "{\"peer_addr\":\"${node2_addr}\"}" + run docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-1:8080/mcp" -tool "connect_peer" -args "{\"peer_addr\":\"${node2_addr}\"}" [[ "$status" -eq 0 ]] # Verify connection diff --git a/tests/e2e/docs_snippets.bats b/tests/e2e/docs_snippets.bats index 6896ced..405a999 100644 --- a/tests/e2e/docs_snippets.bats +++ b/tests/e2e/docs_snippets.bats @@ -34,7 +34,7 @@ teardown() { -v "$(pwd)/sam-mcp-python:/sam-mcp-python" \ -v "$(pwd)/site/content/docs/snippets:/snippets" \ -e PYTHONPATH=/sam-mcp-python/src \ - -e SAM_MCP_URL="http://sam-node-1:8080/mcp/events" \ + -e SAM_MCP_URL="http://sam-node-1:8080/mcp" \ python:3.12 \ bash -c 'pip install mcp httpx && python3 /snippets/agent_demo.py' diff --git a/tests/e2e/find_remote_tools.bats b/tests/e2e/find_remote_tools.bats index 9710e70..19eb0ea 100644 --- a/tests/e2e/find_remote_tools.bats +++ b/tests/e2e/find_remote_tools.bats @@ -60,14 +60,14 @@ teardown() { echo "[$(date +%T)] Connecting Node 1 to Node 2" local node2_addr="/dns4/sam-node-2/tcp/5002/p2p/${node2_peer_id}" - run docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-1:8080/mcp/events" -tool "connect_peer" -args "{\"peer_addr\":\"${node2_addr}\"}" + run docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-1:8080/mcp" -tool "connect_peer" -args "{\"peer_addr\":\"${node2_addr}\"}" [[ "$status" -eq 0 ]] mesh_wait_for_peer_connection 1 "${node2_peer_id}" 20 echo "[$(date +%T)] Calling find_remote_tools from Node 1, targeting Node 2" run docker run --rm --network "${MESH_NETWORK}" \ "${MESH_RUNTIME_IMAGE}" mcp-client \ - -url "http://sam-node-1:8080/mcp/events" \ + -url "http://sam-node-1:8080/mcp" \ -tool "find_remote_tools" \ -args "{\"peer_id\":\"${node2_peer_id}\"}" echo "find_remote_tools output: $output" @@ -116,7 +116,7 @@ teardown() { local node2_addr="/dns4/sam-node-2/tcp/5002/p2p/${node2_peer_id}" run docker run --rm --network "${MESH_NETWORK}" \ "${MESH_RUNTIME_IMAGE}" mcp-client \ - -url "http://sam-node-1:8080/mcp/events" \ + -url "http://sam-node-1:8080/mcp" \ -tool "connect_peer" \ -args "{\"peer_addr\":\"${node2_addr}\"}" [[ "$status" -eq 0 ]] @@ -127,7 +127,7 @@ teardown() { call_args="{\"peer_id\":\"${node2_peer_id}\",\"tool_name\":\"calculator.add\",\"arguments\":{\"a\":2,\"b\":3}}" run docker run --rm --network "${MESH_NETWORK}" \ "${MESH_RUNTIME_IMAGE}" mcp-client \ - -url "http://sam-node-1:8080/mcp/events" \ + -url "http://sam-node-1:8080/mcp" \ -tool "call_remote_tool" \ -args "${call_args}" echo "call_remote_tool output: $output" @@ -167,7 +167,7 @@ teardown() { local node2_addr="/dns4/sam-node-2/tcp/5002/p2p/${node2_peer_id}" run docker run --rm --network "${MESH_NETWORK}" \ "${MESH_RUNTIME_IMAGE}" mcp-client \ - -url "http://sam-node-1:8080/mcp/events" \ + -url "http://sam-node-1:8080/mcp" \ -tool "connect_peer" \ -args "{\"peer_addr\":\"${node2_addr}\"}" [[ "$status" -eq 0 ]] @@ -178,7 +178,7 @@ teardown() { describe_args="{\"peer_id\":\"${node2_peer_id}\",\"tool_name\":\"calculator.add\"}" run docker run --rm --network "${MESH_NETWORK}" \ "${MESH_RUNTIME_IMAGE}" mcp-client \ - -url "http://sam-node-1:8080/mcp/events" \ + -url "http://sam-node-1:8080/mcp" \ -tool "describe_remote_tool" \ -args "${describe_args}" echo "describe_remote_tool output: $output" @@ -232,7 +232,7 @@ teardown() { local node2_addr="/dns4/sam-node-2/tcp/5002/p2p/${node2_peer_id}" run docker run --rm --network "${MESH_NETWORK}" \ "${MESH_RUNTIME_IMAGE}" mcp-client \ - -url "http://sam-node-1:8080/mcp/events" \ + -url "http://sam-node-1:8080/mcp" \ -tool "connect_peer" \ -args "{\"peer_addr\":\"${node2_addr}\"}" [[ "$status" -eq 0 ]] @@ -243,7 +243,7 @@ teardown() { describe_args="{\"peer_id\":\"${node2_peer_id}\",\"tool_name\":\"calculator.does-not-exist\"}" run docker run --rm --network "${MESH_NETWORK}" \ "${MESH_RUNTIME_IMAGE}" mcp-client \ - -url "http://sam-node-1:8080/mcp/events" \ + -url "http://sam-node-1:8080/mcp" \ -tool "describe_remote_tool" \ -args "${describe_args}" echo "describe_remote_tool output: $output" diff --git a/tests/e2e/lib/container_mesh.bash b/tests/e2e/lib/container_mesh.bash index 79c7dab..c03cf6f 100644 --- a/tests/e2e/lib/container_mesh.bash +++ b/tests/e2e/lib/container_mesh.bash @@ -115,7 +115,7 @@ if [[ -z "${MESH_HELPERS_LOADED:-}" ]]; then local timeout_s="${2:-20}" local i for ((i=0; i/dev/null)" + output="$(timeout 15s docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-${idx}:8080/mcp" -tool "get_mesh_info" 2>/dev/null)" echo "${output}" | jq 'if .connected_peers then (.connected_peers | length) - 1 else 0 end' } @@ -137,7 +137,7 @@ if [[ -z "${MESH_HELPERS_LOADED:-}" ]]; then local i for ((i=0; i/dev/null)" + output="$(timeout 15s docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-${idx}:8080/mcp" -tool "get_mesh_info" 2>/dev/null)" echo "Node ${idx} get_mesh_info raw output: ${output}" local count count="$(echo "${output}" | jq 'if .connected_peers then (.connected_peers | length) - 1 else 0 end')" @@ -157,7 +157,7 @@ if [[ -z "${MESH_HELPERS_LOADED:-}" ]]; then local i for ((i=0; i/dev/null)" + output="$(timeout 15s docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-${idx}:8080/mcp" -tool "get_mesh_info" 2>/dev/null)" echo "[$(date +%T)] Node ${idx} get_mesh_info raw output: ${output}" local connected connected="$(echo "${output}" | jq -r --arg peer "$target_peer" '.connected_peers | index($peer) != null')" @@ -177,7 +177,7 @@ if [[ -z "${MESH_HELPERS_LOADED:-}" ]]; then local i for ((i=0; i/dev/null)" + output="$(timeout 15s docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-${idx}:8080/mcp" -tool "get_mesh_info" 2>/dev/null)" echo "[$(date +%T)] Node ${idx} get_mesh_info raw output: ${output}" local connected connected="$(echo "${output}" | jq -r --arg peer "$target_peer" '.connected_peers | index($peer) != null')" diff --git a/tests/e2e/policy.bats b/tests/e2e/policy.bats index a29c252..7e0c0d1 100644 --- a/tests/e2e/policy.bats +++ b/tests/e2e/policy.bats @@ -131,7 +131,7 @@ mesh_call_remote_tool() { local args="{\"peer_id\":\"${target_peer_id}\",\"tool_name\":\"${tool_name}\",\"arguments\":{}}" - docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-${caller_idx}:8080/mcp/events" -tool "call_remote_tool" -args "${args}" + docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-${caller_idx}:8080/mcp" -tool "call_remote_tool" -args "${args}" } setup() { @@ -221,9 +221,9 @@ EOF" # Start Node 2 (Caller) without specific local policy mesh_start_node 2 + mesh_wait_for_log "${MESH_PREFIX}-node-2" "SAM Node Online" 20 mesh_wait_for_mcp_ready 2 - mesh_wait_for_log "${MESH_PREFIX}-node-2" "SAM Node Online" 20 local node2_id node2_id=$(docker logs "${MESH_PREFIX}-node-2" 2>&1 | grep "PeerID:" | grep -oE '12D3Koo[a-zA-Z0-9]+' | head -n 1) @@ -235,7 +235,7 @@ EOF" for ((i=0; i<40; i++)); do local output - output="$(docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-2:8080/mcp/events" -tool "get_mesh_info" 2>/dev/null)" + output="$(docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-2:8080/mcp" -tool "get_mesh_info" 2>/dev/null)" TARGET_PEER_ID=$(echo "${output}" | grep -oE '12D3Koo[a-zA-Z0-9]+' | grep -v "${hub_id}" | grep -v "${node2_id}" | head -n 1) if [[ -n "${TARGET_PEER_ID}" ]]; then break @@ -253,7 +253,7 @@ EOF" # Explicitly connect Node 2 to Node 1 to avoid "no addresses" error local node1_addr="/dns4/sam-node-1/tcp/5002/p2p/${TARGET_PEER_ID}" - docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-2:8080/mcp/events" -tool "connect_peer" -args "{\"peer_addr\":\"${node1_addr}\"}" >/dev/null + docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-2:8080/mcp" -tool "connect_peer" -args "{\"peer_addr\":\"${node1_addr}\"}" >/dev/null } teardown() { diff --git a/tests/e2e/revocation_test.bats b/tests/e2e/revocation_test.bats index b6939f3..3d9a900 100644 --- a/tests/e2e/revocation_test.bats +++ b/tests/e2e/revocation_test.bats @@ -67,7 +67,7 @@ teardown() { # Explicitly connect Node 1 to Node 2 echo "[$(date +%T)] Explicitly connecting Node 1 to Node 2" local node2_addr="/dns4/sam-node-2/tcp/5002/p2p/${node2_peer_id}" - run docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-1:8080/mcp/events" -tool "connect_peer" -args "{\"peer_addr\":\"${node2_addr}\"}" + run docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-1:8080/mcp" -tool "connect_peer" -args "{\"peer_addr\":\"${node2_addr}\"}" [[ "$status" -eq 0 ]] # Verify Node 1 connects to Node 2 @@ -109,7 +109,7 @@ teardown() { # Verify Node 1 cannot reconnect to Node 2 echo "[$(date +%T)] Attempting to reconnect (should fail)" local node2_addr="/dns4/sam-node-2/tcp/5002/p2p/${node2_peer_id}" - run docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-1:8080/mcp/events" -tool "connect_peer" -args "{\"peer_addr\":\"${node2_addr}\"}" + run docker run --rm --network "${MESH_NETWORK}" "${MESH_RUNTIME_IMAGE}" mcp-client -url "http://sam-node-1:8080/mcp" -tool "connect_peer" -args "{\"peer_addr\":\"${node2_addr}\"}" echo "Reconnect output: $output" [[ "$output" == *"gater disallows connection"* ]] } diff --git a/tests/e2e/sam.bats b/tests/e2e/sam.bats index 2b7af2f..955d089 100644 --- a/tests/e2e/sam.bats +++ b/tests/e2e/sam.bats @@ -3,6 +3,7 @@ setup() { export SAM_NODE_BINARY="${SAM_NODE_BINARY:-./bin/sam-node}" export SAM_HUB_BINARY="${SAM_HUB_BINARY:-./bin/sam-hub}" + export MCP_CLIENT_BINARY="${MCP_CLIENT_BINARY:-./bin/mcp-client}" export TEST_TMPDIR TEST_TMPDIR="$(mktemp -d)" @@ -26,10 +27,30 @@ teardown() { -@test "sam-node run --trust-hub-rbac without identity fails" { - run "$SAM_NODE_BINARY" run - [[ "$status" -ne 0 ]] - [[ "$output" == *"No JWT or stored identity found"* ]] +@test "sam-node run without identity starts unauthenticated sidecar and serves mcp" { + # Start sam-node in background with a specific port to avoid conflicts + "$SAM_NODE_BINARY" run --bind-addr "127.0.0.1:8085" > "$TEST_TMPDIR/unauth-node.log" 2>&1 & + local node_pid=$! + + # Give it a moment to start + sleep 2 + + # Call get_login_instructions using mcp-client + run "$MCP_CLIENT_BINARY" -url "http://127.0.0.1:8085/mcp" -tool "get_login_instructions" -args "{}" + + # Clean up + kill "$node_pid" || true + wait "$node_pid" || true + + # Check the output of mcp-client + [[ "$status" -eq 0 ]] + [[ "$output" == *"The node is unauthenticated. Please open a regular terminal and run:"* ]] + [[ "$output" == *"sam-node join"* ]] + + # Check node logs for the initial message + local log_output + log_output=$(cat "$TEST_TMPDIR/unauth-node.log") + [[ "$log_output" == *"No identity found. Starting unauthenticated sidecar for enrollment over MCP"* ]] } diff --git a/tests/e2e/services.bats b/tests/e2e/services.bats index b250b8b..c811786 100644 --- a/tests/e2e/services.bats +++ b/tests/e2e/services.bats @@ -63,7 +63,7 @@ teardown() { local node2_addr="/dns4/sam-node-2/tcp/5002/p2p/${node2_peer_id}" run docker run --rm --network "${MESH_NETWORK}" \ "${MESH_RUNTIME_IMAGE}" mcp-client \ - -url "http://sam-node-1:8080/mcp/events" \ + -url "http://sam-node-1:8080/mcp" \ -tool "connect_peer" \ -args "{\"peer_addr\":\"${node2_addr}\"}" [[ "$status" -eq 0 ]] @@ -75,7 +75,7 @@ teardown() { echo "[$(date +%T)] Discovering MCP services from Node 1 (type-only)" run docker run --rm --network "${MESH_NETWORK}" \ "${MESH_RUNTIME_IMAGE}" mcp-client \ - -url "http://sam-node-1:8080/mcp/events" \ + -url "http://sam-node-1:8080/mcp" \ -tool "discover_remote_services" \ -args '{"type":"mcp"}' echo "Discovery output: $output" diff --git a/tests/integration/catalog_test.go b/tests/integration/catalog_test.go index 38a2229..4796df8 100644 --- a/tests/integration/catalog_test.go +++ b/tests/integration/catalog_test.go @@ -75,7 +75,7 @@ func callMCP(t *testing.T, mcpAddr string, toolName string, params map[string]an Version: "0.1.0", }, nil) - session, err := client.Connect(ctx, &mcp.SSEClientTransport{Endpoint: "http://" + mcpAddr + "/mcp/events"}, nil) + session, err := client.Connect(ctx, &mcp.SSEClientTransport{Endpoint: "http://" + mcpAddr + "/mcp"}, nil) if err != nil { t.Fatalf("Failed to connect: %v", err) } @@ -172,7 +172,7 @@ func TestCatalogRoutingAndFailover(t *testing.T) { for time.Now().Before(deadline) { client := mcp.NewClient(&mcp.Implementation{Name: "test-client", Version: "0.1.0"}, nil) - session, err := client.Connect(context.Background(), &mcp.SSEClientTransport{Endpoint: "http://" + mcpAddrA + "/mcp/events"}, nil) + session, err := client.Connect(context.Background(), &mcp.SSEClientTransport{Endpoint: "http://" + mcpAddrA + "/mcp"}, nil) if err != nil { t.Logf("Poll: failed to connect: %v", err) time.Sleep(500 * time.Millisecond) diff --git a/tests/integration/federation_test.go b/tests/integration/federation_test.go index 9fede8c..8ef36ff 100644 --- a/tests/integration/federation_test.go +++ b/tests/integration/federation_test.go @@ -179,7 +179,7 @@ roles: // Search for the service from Node B t.Log("Searching for tool from Node B...") searchCmd := exec.Command(clientBin, - "-url", fmt.Sprintf("http://127.0.0.1:%d/mcp/events", apiPortB), + "-url", fmt.Sprintf("http://127.0.0.1:%d/mcp", apiPortB), "-token", "tokenB", "-tool", "discover_remote_services", "-args", `{"type": "mcp"}`, @@ -302,7 +302,7 @@ func waitForDHTReady(t *testing.T, clientBin string, apiPort int, token string) t.Helper() deadline := time.Now().Add(10 * time.Second) cmdArgs := []string{ - "-url", fmt.Sprintf("http://127.0.0.1:%d/mcp/events", apiPort), + "-url", fmt.Sprintf("http://127.0.0.1:%d/mcp", apiPort), "-token", token, "-tool", "get_mesh_info", "-args", `{}`, diff --git a/tests/integration/p2p_test.go b/tests/integration/p2p_test.go index 19c86b2..cfb3011 100644 --- a/tests/integration/p2p_test.go +++ b/tests/integration/p2p_test.go @@ -34,7 +34,7 @@ func TestSamNodeRunWithoutIdentity(t *testing.T) { t.Fatalf("expected sam-node run without identity to fail, but it succeeded\nstdout:\n%s\nstderr:\n%s", stdout, stderr) } out := stdout + stderr - if !strings.Contains(out, "No JWT or stored identity found") { + if !strings.Contains(out, "No identity found. Starting unauthenticated sidecar for enrollment over MCP") { t.Fatalf("expected missing identity message, got:\n%s", out) } } diff --git a/tests/integration/pubsub_test.go b/tests/integration/pubsub_test.go index a4afb27..20dd826 100644 --- a/tests/integration/pubsub_test.go +++ b/tests/integration/pubsub_test.go @@ -105,7 +105,7 @@ func TestPubSubTools(t *testing.T) { Version: "0.1.0", }, nil) - session, err := client.Connect(context.Background(), &mcp.SSEClientTransport{Endpoint: "http://" + mcpAddr + "/mcp/events"}, nil) + session, err := client.Connect(context.Background(), &mcp.SSEClientTransport{Endpoint: "http://" + mcpAddr + "/mcp"}, nil) if err != nil { t.Fatalf("Failed to connect: %v", err) } From 2a949035f034b690d1a56a991c2e705ae5b4dd07 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 09:39:09 +0000 Subject: [PATCH 08/17] fix kind mesh e2e permissions --- .github/workflows/kind-mesh-e2e.yml | 66 +-------------- Makefile | 4 + .../examples/calc-mcp/sam-node-config.yaml | 2 +- .../examples/greeter-mcp/sam-node-config.yaml | 2 +- development/kind/test-mesh-e2e.sh | 83 +++++++++++++++++++ 5 files changed, 91 insertions(+), 66 deletions(-) create mode 100755 development/kind/test-mesh-e2e.sh diff --git a/.github/workflows/kind-mesh-e2e.yml b/.github/workflows/kind-mesh-e2e.yml index 320a3a5..5919bc7 100644 --- a/.github/workflows/kind-mesh-e2e.yml +++ b/.github/workflows/kind-mesh-e2e.yml @@ -55,70 +55,8 @@ jobs: - name: Bring up the kind mesh run: make kind-up ARGS="-s" - - name: Enroll a local sam-node - run: | - ./development/kind/run-local-node.sh > "$RUNNER_TEMP/local-node.log" 2>&1 & - PID=$! - for _ in $(seq 1 60); do - grep -q "SAM Node Online" "$RUNNER_TEMP/local-node.log" && break - kill -0 "$PID" 2>/dev/null || { echo "local node exited early:"; cat "$RUNNER_TEMP/local-node.log"; exit 1; } - sleep 1 - done - grep -q "SAM Node Online" "$RUNNER_TEMP/local-node.log" \ - || { echo "local node did not come online:"; cat "$RUNNER_TEMP/local-node.log"; exit 1; } - echo "local node online" - - - name: Mesh info, discover, find + call calculator - run: | - set -euo pipefail - URL=http://127.0.0.1:9099/mcp - mcp() { ./bin/mcp-client -url "$URL" -timeout 20 "$@" 2>/dev/null; } - - echo "== get_mesh_info ==" - info=$(mcp -tool get_mesh_info -args '{}') - echo "$info" - echo "$info" | jq -e '.hub_peer_id != "" and (.dht_size > 0)' >/dev/null \ - || { echo "get_mesh_info assertion failed"; exit 1; } - - echo "== discover services / find calculator (poll) ==" - peer="" - for i in $(seq 1 90); do - tools="$(mcp -tool discover_services -args '{}' 2>/dev/null || true)" - echo "discover attempt $i: $tools" - - peer="$(printf '%s' "$tools" \ - | jq -r '.[]? | select(.tool_name=="calculator.add") | .peer_id' 2>/dev/null \ - | head -n1)" - - if [ -n "$peer" ]; then - echo "calculator.add discovered on peer: $peer (attempt $i)" - break - fi - - # periodic mesh debug - if [ $((i % 10)) -eq 0 ]; then - echo "mesh info (attempt $i):" - mcp -tool get_mesh_info -args '{}' || true - fi - - sleep 1 - done - - [ -n "$peer" ] || { - echo "calculator.add not discovered after 90s" - echo "final discover_services:" - mcp -tool discover_services -args '{}' || true - echo "final mesh info:" - mcp -tool get_mesh_info -args '{}' || true - exit 1 - } - - echo "== call calculator.add(2,3) ==" - result=$(mcp -tool call_remote_tool \ - -args "{\"peer_id\":\"$peer\",\"tool_name\":\"calculator.add\",\"arguments\":{\"a\":2,\"b\":3}}") - echo "result: $result" - printf '%s' "$result" | grep -q 5 || { echo "calculator.add did not return 5"; exit 1; } - echo "OK: calculator.add(2,3) == 5" + - name: Run e2e mesh tests + run: make kind-e2e-mesh - name: Collect logs on failure if: failure() diff --git a/Makefile b/Makefile index 49a5b0a..e7ed784 100644 --- a/Makefile +++ b/Makefile @@ -31,6 +31,10 @@ kind-down: kind-local-node: ./development/kind/run-local-node.sh $(ARGS) +.PHONY: kind-e2e-mesh +kind-e2e-mesh: build + ./development/kind/test-mesh-e2e.sh + test: CGO_ENABLED=1 go test -v -race -count 1 $(if $(WHAT),-run $(WHAT)) ./... diff --git a/development/examples/calc-mcp/sam-node-config.yaml b/development/examples/calc-mcp/sam-node-config.yaml index c53bdbb..12a306b 100644 --- a/development/examples/calc-mcp/sam-node-config.yaml +++ b/development/examples/calc-mcp/sam-node-config.yaml @@ -3,7 +3,7 @@ attenuation: # Allow cross-node MCP only when the caller's biscuit carries a # node($peer) fact — i.e. it was issued by our hub upon enrollment. policies: - - 'allow if operation("/sam/mcp/1.0.0"), node($peer)' + - 'allow if operation("calculator"), node($peer)' services: - type: "mcp" name: "calculator" diff --git a/development/examples/greeter-mcp/sam-node-config.yaml b/development/examples/greeter-mcp/sam-node-config.yaml index 465fba0..8ccaa8e 100644 --- a/development/examples/greeter-mcp/sam-node-config.yaml +++ b/development/examples/greeter-mcp/sam-node-config.yaml @@ -1,7 +1,7 @@ version: "v1alpha1" attenuation: policies: - - 'allow if operation("/sam/mcp/1.0.0"), node($peer)' + - 'allow if operation("greeter"), node($peer)' services: - type: "mcp" name: "greeter" diff --git a/development/kind/test-mesh-e2e.sh b/development/kind/test-mesh-e2e.sh new file mode 100755 index 0000000..41613a5 --- /dev/null +++ b/development/kind/test-mesh-e2e.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +# If running locally, we might want to store logs in a temp dir +LOG_DIR="${RUNNER_TEMP:-$(mktemp -d)}" + +echo "== Enrolling a local sam-node ==" +./development/kind/run-local-node.sh > "$LOG_DIR/local-node.log" 2>&1 & +PID=$! +for _ in $(seq 1 60); do + grep -q "SAM Node Online" "$LOG_DIR/local-node.log" && break + kill -0 "$PID" 2>/dev/null || { echo "local node exited early:"; cat "$LOG_DIR/local-node.log"; exit 1; } + sleep 1 +done +grep -q "SAM Node Online" "$LOG_DIR/local-node.log" \ + || { echo "local node did not come online:"; cat "$LOG_DIR/local-node.log"; exit 1; } +echo "local node online" + +# Make sure we clean up the local node when the script exits +trap 'kill $PID 2>/dev/null || true' EXIT + +URL=http://127.0.0.1:9099/mcp +mcp() { ./bin/mcp-client -url "$URL" -timeout 20 "$@" 2>/dev/null; } + +echo "== get_mesh_info ==" +info=$(mcp -tool get_mesh_info -args '{}') +echo "$info" +echo "$info" | jq -e '.hub_peer_id != "" and (.dht_size > 0)' >/dev/null \ + || { echo "get_mesh_info assertion failed"; exit 1; } + +echo "== discover services / find calculator (poll) ==" +peer="" +for i in $(seq 1 90); do + tools="$(mcp -tool find_remote_tools -args '{}' 2>/dev/null || true)" + echo "discover attempt $i: $tools" + + peer="$(printf '%s' "$tools" \ + | jq -r '.items[]? | select(.tool_name=="calculator.add") | .peer_id' 2>/dev/null \ + | head -n1 || true)" + + if [ -n "$peer" ]; then + echo "calculator.add discovered on peer: $peer (attempt $i)" + break + fi + + # periodic mesh debug + if [ $((i % 10)) -eq 0 ]; then + echo "mesh info (attempt $i):" + mcp -tool get_mesh_info -args '{}' || true + fi + + sleep 1 +done + +[ -n "$peer" ] || { + echo "calculator.add not discovered after 90s" + echo "final find_remote_tools:" + mcp -tool find_remote_tools -args '{}' || true + echo "final mesh info:" + mcp -tool get_mesh_info -args '{}' || true + exit 1 +} + +echo "== call calculator.add(2,3) ==" +result=$(mcp -tool call_remote_tool \ + -args "{\"peer_id\":\"$peer\",\"tool_name\":\"calculator.add\",\"arguments\":{\"a\":2,\"b\":3}}") +echo "result: $result" +printf '%s' "$result" | grep -q 5 || { echo "calculator.add did not return 5"; exit 1; } +echo "OK: calculator.add(2,3) == 5" From 94b96527beab8dc8f805d6937139aee3894af16f Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 09:40:12 +0000 Subject: [PATCH 09/17] add mcp prompts and pagination --- cmd/sam-node/mcp.go | 81 +++++ cmd/sam-node/mcp_handlers.go | 473 +++++++++++++++++++++++++++++- cmd/sam-node/mcp_handlers_test.go | 22 +- tests/e2e/find_remote_tools.bats | 2 +- 4 files changed, 567 insertions(+), 11 deletions(-) diff --git a/cmd/sam-node/mcp.go b/cmd/sam-node/mcp.go index 6790efa..78e3e57 100644 --- a/cmd/sam-node/mcp.go +++ b/cmd/sam-node/mcp.go @@ -143,6 +143,87 @@ func NewMCPServer(node *SamNode) *mcp.Server { Description: "Returns the last few lines of the node's log output.", }, node.handleGetRecentLogs) + // Add the find_remote_resources tool. + mcp.AddTool(mcpServer, &mcp.Tool{ + Name: "find_remote_resources", + Description: "Discover MCP resources available on hosted services across the mesh. Returns name + description per resource.", + }, node.handleFindRemoteResources) + + // Add the read_remote_resource tool. + mcp.AddTool(mcpServer, &mcp.Tool{ + Name: "read_remote_resource", + Description: "Read a specific resource from a remote peer.", + }, node.handleReadRemoteResource) + + // Add the find_remote_prompts tool. + mcp.AddTool(mcpServer, &mcp.Tool{ + Name: "find_remote_prompts", + Description: "Discover MCP prompts available on hosted services across the mesh.", + }, node.handleFindRemotePrompts) + + // Add the get_remote_prompt tool. + mcp.AddTool(mcpServer, &mcp.Tool{ + Name: "get_remote_prompt", + Description: "Get a specific prompt from a remote peer.", + }, node.handleGetRemotePrompt) + + // Add a local resource for mesh state + mcpServer.AddResource(&mcp.Resource{ + URI: "mesh://state", + Name: "Mesh State", + Description: "Current state of the SAM node and mesh connections", + MIMEType: "application/json", + }, func(ctx context.Context, req *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) { + peers := node.Host.Network().Peers() + connectedPeers := make([]string, 0, len(peers)) + for _, p := range peers { + connectedPeers = append(connectedPeers, p.String()) + } + resData := map[string]any{ + "connected_peers": connectedPeers, + "dht_size": node.DHT.RoutingTable().Size(), + "hub_peer_id": node.HubPeerID.String(), + "peer_id": node.Host.ID().String(), + } + data, _ := json.MarshalIndent(resData, "", " ") + return &mcp.ReadResourceResult{ + Contents: []*mcp.ResourceContents{ + { + URI: "mesh://state", + MIMEType: "application/json", + Text: string(data), + }, + }, + }, nil + }) + + // Add a local prompt + mcpServer.AddPrompt(&mcp.Prompt{ + Name: "mesh_debugging", + Description: "A standard prompt for debugging mesh connectivity issues", + Arguments: []*mcp.PromptArgument{ + { + Name: "peer_id", + Description: "The ID of the peer you are trying to reach", + Required: true, + }, + }, + }, func(ctx context.Context, req *mcp.GetPromptRequest) (*mcp.GetPromptResult, error) { + peerID, _ := req.Params.Arguments["peer_id"] + text := fmt.Sprintf("I am trying to debug connectivity to peer %s. Please check the local mesh state using the mesh://state resource, and then use the check_connectivity tool to diagnose the network path. Finally, use discover_remote_services to see if they are advertising the expected service.", peerID) + return &mcp.GetPromptResult{ + Description: "Debug mesh connectivity", + Messages: []*mcp.PromptMessage{ + { + Role: "user", + Content: &mcp.TextContent{ + Text: text, + }, + }, + }, + }, nil + }) + return mcpServer } diff --git a/cmd/sam-node/mcp_handlers.go b/cmd/sam-node/mcp_handlers.go index e6dd5a7..49ede54 100644 --- a/cmd/sam-node/mcp_handlers.go +++ b/cmd/sam-node/mcp_handlers.go @@ -1,9 +1,24 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package main import ( "context" "encoding/json" "fmt" + "strconv" "strings" "sync" "time" @@ -242,6 +257,7 @@ type FindRemoteToolsParams struct { Intent string `json:"intent,omitempty" jsonschema:"Natural-language description of what the caller is looking for. Reserved for future semantic ranking; currently accepted but ignored."` PeerID string `json:"peer_id,omitempty" jsonschema:"Restrict the search to a single peer. Empty means search the whole mesh."` ServiceName string `json:"service_name,omitempty" jsonschema:"Restrict results to tools whose name starts with this service prefix (e.g. 'code-reviewer'). Empty means no service filter."` + Cursor string `json:"cursor,omitempty" jsonschema:"Optional pagination cursor. Pass the nextCursor from a previous response to get the next page."` } // remoteToolRow is one entry in the find_remote_tools response. @@ -311,7 +327,20 @@ func (n *SamNode) handleFindRemoteTools(ctx context.Context, req *mcp.CallToolRe if rows == nil { rows = []remoteToolRow{} } - respData, err := json.Marshal(rows) + + paginatedRows, nextCursor, err := PaginateSlice(rows, params.Cursor, 50) + if err != nil { + return nil, nil, err + } + + respObj := map[string]any{ + "items": paginatedRows, + } + if nextCursor != "" { + respObj["nextCursor"] = nextCursor + } + + respData, err := json.Marshal(respObj) if err != nil { return nil, nil, err } @@ -609,3 +638,445 @@ func (n *SamNode) handleGetRecentLogs(ctx context.Context, req *mcp.CallToolRequ Content: []mcp.Content{&mcp.TextContent{Text: string(data)}}, }, nil, nil } + +// FindRemoteResourcesParams defines the parameters for find_remote_resources. +type FindRemoteResourcesParams struct { + PeerID string `json:"peer_id,omitempty" jsonschema:"Restrict the search to a single peer. Empty means search the whole mesh."` + ServiceName string `json:"service_name,omitempty" jsonschema:"Restrict results to resources whose name starts with this service prefix."` + Cursor string `json:"cursor,omitempty" jsonschema:"Optional pagination cursor."` +} + +// remoteResourceRow is one entry in the find_remote_resources response. +type remoteResourceRow struct { + PeerID string `json:"peer_id"` + ResourceURI string `json:"resource_uri"` + Name string `json:"name"` + Description string `json:"description"` +} + +func (n *SamNode) handleFindRemoteResources(ctx context.Context, req *mcp.CallToolRequest, params FindRemoteResourcesParams) (*mcp.CallToolResult, any, error) { + selfID := n.Host.ID().String() + if params.PeerID != "" && params.PeerID == selfID { + return nil, nil, fmt.Errorf("peer_id %q is this node; cross-mesh discovery cannot target self", params.PeerID) + } + + var rows []remoteResourceRow + + if params.PeerID != "" { + pid, err := peer.Decode(params.PeerID) + if err != nil { + return nil, nil, fmt.Errorf("invalid peer_id %q: %w", params.PeerID, err) + } + resources, err := n.fetchRemoteResourceCatalogue(ctx, pid) + if err != nil { + return nil, nil, err + } + rows = appendFilteredResourceRows(rows, params.PeerID, resources, params.ServiceName) + } else { + providers, err := n.DiscoverRemoteServices(ctx, api.ServiceType_SERVICE_TYPE_MCP, "") + if err != nil { + return nil, nil, fmt.Errorf("discover providers: %w", err) + } + seen := map[string]bool{} + var peerIDs []peer.ID + for _, p := range providers { + if p.PeerId == selfID || seen[p.PeerId] { + continue + } + seen[p.PeerId] = true + pid, err := peer.Decode(p.PeerId) + if err != nil { + continue + } + peerIDs = append(peerIDs, pid) + } + + rows = n.fanOutFetchResources(ctx, peerIDs, params.ServiceName) + } + + if rows == nil { + rows = []remoteResourceRow{} + } + + paginatedRows, nextCursor, err := PaginateSlice(rows, params.Cursor, 50) + if err != nil { + return nil, nil, err + } + + respObj := map[string]any{ + "items": paginatedRows, + } + if nextCursor != "" { + respObj["nextCursor"] = nextCursor + } + + respData, err := json.Marshal(respObj) + if err != nil { + return nil, nil, err + } + return &mcp.CallToolResult{ + Content: []mcp.Content{&mcp.TextContent{Text: string(respData)}}, + }, nil, nil +} + +func (n *SamNode) fetchRemoteResourceCatalogue(ctx context.Context, targetPeer peer.ID) ([]*mcp.Resource, error) { + services, err := n.fetchRemoteServiceCatalog(ctx, targetPeer, "MCP") + if err != nil { + return nil, fmt.Errorf("fetch remote service catalog: %w", err) + } + + var allResources []*mcp.Resource + + for _, svc := range services { + if svc.Type != api.ServiceType_SERVICE_TYPE_MCP { + continue + } + + n.preparePeerAddrs(ctx, targetPeer) + session, cleanup, err := n.ConnectMCPSession(ctx, targetPeer, svc.Name) + if err != nil { + continue + } + + listRes, err := session.ListResources(ctx, &mcp.ListResourcesParams{}) + if err == nil && listRes != nil { + for _, r := range listRes.Resources { + // Namespace the URI scheme or name if we want, but resources are URIs. + // We'll prefix the Name to indicate origin. + r.Name = svc.Name + "." + r.Name + allResources = append(allResources, r) + } + } + cleanup() + } + + return allResources, nil +} + +func appendFilteredResourceRows(rows []remoteResourceRow, peerID string, resources []*mcp.Resource, serviceName string) []remoteResourceRow { + for _, r := range resources { + if serviceName != "" && !strings.HasPrefix(r.Name, serviceName+".") { + continue + } + rows = append(rows, remoteResourceRow{ + PeerID: peerID, + ResourceURI: r.URI, + Name: r.Name, + Description: r.Description, + }) + } + return rows +} + +func (n *SamNode) fanOutFetchResources(ctx context.Context, peers []peer.ID, serviceName string) []remoteResourceRow { + const maxConcurrent = 8 + sem := make(chan struct{}, maxConcurrent) + + var ( + mu sync.Mutex + rows []remoteResourceRow + ) + + var wg sync.WaitGroup + for _, pid := range peers { + pid := pid + wg.Add(1) + go func() { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + + peerCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + resources, err := n.fetchRemoteResourceCatalogue(peerCtx, pid) + if err != nil { + return + } + mu.Lock() + rows = appendFilteredResourceRows(rows, pid.String(), resources, serviceName) + mu.Unlock() + }() + } + wg.Wait() + return rows +} + +// ReadRemoteResourceParams defines the parameters for read_remote_resource. +type ReadRemoteResourceParams struct { + PeerID string `json:"peer_id" jsonschema:"The Peer ID of the target agent"` + URI string `json:"uri" jsonschema:"The URI of the remote resource"` +} + +func (n *SamNode) handleReadRemoteResource(ctx context.Context, req *mcp.CallToolRequest, params ReadRemoteResourceParams) (*mcp.CallToolResult, any, error) { + targetPeer, err := peer.Decode(params.PeerID) + if err != nil { + return nil, nil, err + } + + // Try all MCP services on the remote peer to see which one has the resource + services, err := n.fetchRemoteServiceCatalog(ctx, targetPeer, "MCP") + if err != nil { + return nil, nil, err + } + + for _, svc := range services { + if svc.Type != api.ServiceType_SERVICE_TYPE_MCP { + continue + } + + session, cleanup, err := n.ConnectMCPSession(ctx, targetPeer, svc.Name) + if err != nil { + continue + } + + res, err := session.ReadResource(ctx, &mcp.ReadResourceParams{ + URI: params.URI, + }) + + if err == nil && res != nil && len(res.Contents) > 0 { + defer cleanup() + + // Marshal the contents + data, _ := json.Marshal(res.Contents) + return &mcp.CallToolResult{ + Content: []mcp.Content{ + &mcp.TextContent{Text: string(data)}, + }, + }, nil, nil + } + cleanup() + } + + return nil, nil, fmt.Errorf("resource %s not found on peer %s", params.URI, params.PeerID) +} + +// FindRemotePromptsParams defines the parameters for find_remote_prompts. +type FindRemotePromptsParams struct { + PeerID string `json:"peer_id,omitempty" jsonschema:"Restrict the search to a single peer. Empty means search the whole mesh."` + ServiceName string `json:"service_name,omitempty" jsonschema:"Restrict results to prompts whose name starts with this service prefix."` + Cursor string `json:"cursor,omitempty" jsonschema:"Optional pagination cursor."` +} + +// remotePromptRow is one entry in the find_remote_prompts response. +type remotePromptRow struct { + PeerID string `json:"peer_id"` + Name string `json:"name"` + Description string `json:"description"` + Arguments []*mcp.PromptArgument `json:"arguments"` +} + +func (n *SamNode) handleFindRemotePrompts(ctx context.Context, req *mcp.CallToolRequest, params FindRemotePromptsParams) (*mcp.CallToolResult, any, error) { + selfID := n.Host.ID().String() + if params.PeerID != "" && params.PeerID == selfID { + return nil, nil, fmt.Errorf("peer_id %q is this node; cross-mesh discovery cannot target self", params.PeerID) + } + + var rows []remotePromptRow + + if params.PeerID != "" { + pid, err := peer.Decode(params.PeerID) + if err != nil { + return nil, nil, fmt.Errorf("invalid peer_id %q: %w", params.PeerID, err) + } + prompts, err := n.fetchRemotePromptCatalogue(ctx, pid) + if err != nil { + return nil, nil, err + } + rows = appendFilteredPromptRows(rows, params.PeerID, prompts, params.ServiceName) + } else { + providers, err := n.DiscoverRemoteServices(ctx, api.ServiceType_SERVICE_TYPE_MCP, "") + if err != nil { + return nil, nil, fmt.Errorf("discover providers: %w", err) + } + seen := map[string]bool{} + var peerIDs []peer.ID + for _, p := range providers { + if p.PeerId == selfID || seen[p.PeerId] { + continue + } + seen[p.PeerId] = true + pid, err := peer.Decode(p.PeerId) + if err != nil { + continue + } + peerIDs = append(peerIDs, pid) + } + + rows = n.fanOutFetchPrompts(ctx, peerIDs, params.ServiceName) + } + + if rows == nil { + rows = []remotePromptRow{} + } + + paginatedRows, nextCursor, err := PaginateSlice(rows, params.Cursor, 50) + if err != nil { + return nil, nil, err + } + + respObj := map[string]any{ + "items": paginatedRows, + } + if nextCursor != "" { + respObj["nextCursor"] = nextCursor + } + + respData, err := json.Marshal(respObj) + if err != nil { + return nil, nil, err + } + return &mcp.CallToolResult{ + Content: []mcp.Content{&mcp.TextContent{Text: string(respData)}}, + }, nil, nil +} + +func (n *SamNode) fetchRemotePromptCatalogue(ctx context.Context, targetPeer peer.ID) ([]*mcp.Prompt, error) { + services, err := n.fetchRemoteServiceCatalog(ctx, targetPeer, "MCP") + if err != nil { + return nil, fmt.Errorf("fetch remote service catalog: %w", err) + } + + var allPrompts []*mcp.Prompt + + for _, svc := range services { + if svc.Type != api.ServiceType_SERVICE_TYPE_MCP { + continue + } + + n.preparePeerAddrs(ctx, targetPeer) + session, cleanup, err := n.ConnectMCPSession(ctx, targetPeer, svc.Name) + if err != nil { + continue + } + + listRes, err := session.ListPrompts(ctx, &mcp.ListPromptsParams{}) + if err == nil && listRes != nil { + for _, p := range listRes.Prompts { + p.Name = svc.Name + "." + p.Name + allPrompts = append(allPrompts, p) + } + } + cleanup() + } + + return allPrompts, nil +} + +func appendFilteredPromptRows(rows []remotePromptRow, peerID string, prompts []*mcp.Prompt, serviceName string) []remotePromptRow { + for _, p := range prompts { + if serviceName != "" && !strings.HasPrefix(p.Name, serviceName+".") { + continue + } + rows = append(rows, remotePromptRow{ + PeerID: peerID, + Name: p.Name, + Description: p.Description, + Arguments: p.Arguments, + }) + } + return rows +} + +func (n *SamNode) fanOutFetchPrompts(ctx context.Context, peers []peer.ID, serviceName string) []remotePromptRow { + const maxConcurrent = 8 + sem := make(chan struct{}, maxConcurrent) + + var ( + mu sync.Mutex + rows []remotePromptRow + ) + + var wg sync.WaitGroup + for _, pid := range peers { + pid := pid + wg.Add(1) + go func() { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + + peerCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + prompts, err := n.fetchRemotePromptCatalogue(peerCtx, pid) + if err != nil { + return + } + mu.Lock() + rows = appendFilteredPromptRows(rows, pid.String(), prompts, serviceName) + mu.Unlock() + }() + } + wg.Wait() + return rows +} + +// GetRemotePromptParams defines the parameters for get_remote_prompt. +type GetRemotePromptParams struct { + PeerID string `json:"peer_id" jsonschema:"The Peer ID of the target agent"` + Name string `json:"name" jsonschema:"The namespaced name of the remote prompt (e.g. 'service.prompt')"` + Arguments map[string]string `json:"arguments,omitempty" jsonschema:"Arguments to pass to the prompt"` +} + +func (n *SamNode) handleGetRemotePrompt(ctx context.Context, req *mcp.CallToolRequest, params GetRemotePromptParams) (*mcp.CallToolResult, any, error) { + targetPeer, err := peer.Decode(params.PeerID) + if err != nil { + return nil, nil, err + } + + targetService := api.CatalogTarget + originalPromptName := params.Name + if parts := strings.SplitN(params.Name, ".", 2); len(parts) == 2 { + targetService = parts[0] + originalPromptName = parts[1] + } + + session, cleanup, err := n.ConnectMCPSession(ctx, targetPeer, targetService) + if err != nil { + return nil, nil, err + } + defer cleanup() + + res, err := session.GetPrompt(ctx, &mcp.GetPromptParams{ + Name: originalPromptName, + Arguments: params.Arguments, + }) + + if err != nil { + return nil, nil, fmt.Errorf("failed to get prompt %s: %w", params.Name, err) + } + + data, _ := json.Marshal(res) + return &mcp.CallToolResult{ + Content: []mcp.Content{ + &mcp.TextContent{Text: string(data)}, + }, + }, nil, nil +} + +// PaginateSlice ... +func PaginateSlice[T any](items []T, cursor string, limit int) ([]T, string, error) { + if limit <= 0 { + limit = 50 + } + startIdx := 0 + if cursor != "" { + idx, err := strconv.Atoi(cursor) + if err != nil { + return nil, "", err + } + startIdx = idx + } + if startIdx >= len(items) { + return []T{}, "", nil + } + endIdx := startIdx + limit + nextCursor := "" + if endIdx < len(items) { + nextCursor = strconv.Itoa(endIdx) + } else { + endIdx = len(items) + } + return items[startIdx:endIdx], nextCursor, nil +} diff --git a/cmd/sam-node/mcp_handlers_test.go b/cmd/sam-node/mcp_handlers_test.go index 05db489..8131244 100644 --- a/cmd/sam-node/mcp_handlers_test.go +++ b/cmd/sam-node/mcp_handlers_test.go @@ -72,12 +72,14 @@ func TestHandleFindRemoteTools_EmptyMesh_ReturnsEmptyArray(t *testing.T) { if !ok { t.Fatalf("expected TextContent, got %T", res.Content[0]) } - var rows []map[string]any - if err := json.Unmarshal([]byte(tc.Text), &rows); err != nil { - t.Fatalf("response not JSON array: %v (text: %q)", err, tc.Text) + var resp struct { + Items []map[string]any `json:"items"` + } + if err := json.Unmarshal([]byte(tc.Text), &resp); err != nil { + t.Fatalf("response not JSON object: %v (text: %q)", err, tc.Text) } - if len(rows) != 0 { - t.Errorf("expected empty results for empty mesh, got %d rows", len(rows)) + if len(resp.Items) != 0 { + t.Errorf("expected empty results for empty mesh, got %d rows", len(resp.Items)) } } @@ -154,8 +156,10 @@ func TestHandleFindRemoteTools_SinglePeer(t *testing.T) { if !ok { t.Fatalf("expected TextContent, got %T", res.Content[0]) } - var rows []remoteToolRow - if err := json.Unmarshal([]byte(tc.Text), &rows); err != nil { + var resp struct { + Items []remoteToolRow `json:"items"` + } + if err := json.Unmarshal([]byte(tc.Text), &resp); err != nil { t.Fatalf("unmarshal: %v (text: %q)", err, tc.Text) } @@ -163,7 +167,7 @@ func TestHandleFindRemoteTools_SinglePeer(t *testing.T) { "code-reviewer.review_pr": false, "code-reviewer.add_comment": false, } - for _, row := range rows { + for _, row := range resp.Items { if row.PeerID != nodeB.Host.ID().String() { t.Errorf("row has peer_id %q, want %q", row.PeerID, nodeB.Host.ID().String()) } @@ -173,7 +177,7 @@ func TestHandleFindRemoteTools_SinglePeer(t *testing.T) { } for name, found := range wantNames { if !found { - t.Errorf("expected tool %q in response, not found; rows=%+v", name, rows) + t.Errorf("expected tool %q in response, not found; rows=%+v", name, resp.Items) } } } diff --git a/tests/e2e/find_remote_tools.bats b/tests/e2e/find_remote_tools.bats index 19eb0ea..35a3173 100644 --- a/tests/e2e/find_remote_tools.bats +++ b/tests/e2e/find_remote_tools.bats @@ -79,7 +79,7 @@ teardown() { local match_count match_count=$(echo "$catalog" | jq --arg pid "${node2_peer_id}" ' - [.[] | select(.peer_id == $pid + [.items[] | select(.peer_id == $pid and (.tool_name | startswith("calculator.")))] | length ') echo "Matching calculator tool entries: ${match_count}" From f637ad9edc094ace538115d81244012af405fc25 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 11:46:31 +0200 Subject: [PATCH 10/17] Update cmd/sam-node/mcp_handlers.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- cmd/sam-node/mcp_handlers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/sam-node/mcp_handlers.go b/cmd/sam-node/mcp_handlers.go index 49ede54..19f32b8 100644 --- a/cmd/sam-node/mcp_handlers.go +++ b/cmd/sam-node/mcp_handlers.go @@ -1063,8 +1063,8 @@ func PaginateSlice[T any](items []T, cursor string, limit int) ([]T, string, err startIdx := 0 if cursor != "" { idx, err := strconv.Atoi(cursor) - if err != nil { - return nil, "", err + if err != nil || idx < 0 { + return nil, "", fmt.Errorf("invalid cursor: %q", cursor) } startIdx = idx } From e196445efcc80ad08e3b1ce11c97a407f22d74e4 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 11:46:38 +0200 Subject: [PATCH 11/17] Update development/kind/test-mesh-e2e.sh Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- development/kind/test-mesh-e2e.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/development/kind/test-mesh-e2e.sh b/development/kind/test-mesh-e2e.sh index 41613a5..20c69ab 100755 --- a/development/kind/test-mesh-e2e.sh +++ b/development/kind/test-mesh-e2e.sh @@ -79,5 +79,5 @@ echo "== call calculator.add(2,3) ==" result=$(mcp -tool call_remote_tool \ -args "{\"peer_id\":\"$peer\",\"tool_name\":\"calculator.add\",\"arguments\":{\"a\":2,\"b\":3}}") echo "result: $result" -printf '%s' "$result" | grep -q 5 || { echo "calculator.add did not return 5"; exit 1; } +[ "$result" = "5" ] || { echo "calculator.add did not return 5"; exit 1; } echo "OK: calculator.add(2,3) == 5" From c3f56bd2d75e68bf1231dc0e28cc93d68fd291d0 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 10:28:52 +0000 Subject: [PATCH 12/17] fix lint --- cmd/sam-node/mcp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/sam-node/mcp.go b/cmd/sam-node/mcp.go index 78e3e57..63a7888 100644 --- a/cmd/sam-node/mcp.go +++ b/cmd/sam-node/mcp.go @@ -209,7 +209,7 @@ func NewMCPServer(node *SamNode) *mcp.Server { }, }, }, func(ctx context.Context, req *mcp.GetPromptRequest) (*mcp.GetPromptResult, error) { - peerID, _ := req.Params.Arguments["peer_id"] + peerID := req.Params.Arguments["peer_id"] text := fmt.Sprintf("I am trying to debug connectivity to peer %s. Please check the local mesh state using the mesh://state resource, and then use the check_connectivity tool to diagnose the network path. Finally, use discover_remote_services to see if they are advertising the expected service.", peerID) return &mcp.GetPromptResult{ Description: "Debug mesh connectivity", From f09cc87db46784b849d091145c7badb79bda8b97 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 10:44:03 +0000 Subject: [PATCH 13/17] assert kind mesh e2e test --- development/kind/test-mesh-e2e.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/development/kind/test-mesh-e2e.sh b/development/kind/test-mesh-e2e.sh index 20c69ab..fefeedf 100755 --- a/development/kind/test-mesh-e2e.sh +++ b/development/kind/test-mesh-e2e.sh @@ -79,5 +79,8 @@ echo "== call calculator.add(2,3) ==" result=$(mcp -tool call_remote_tool \ -args "{\"peer_id\":\"$peer\",\"tool_name\":\"calculator.add\",\"arguments\":{\"a\":2,\"b\":3}}") echo "result: $result" -[ "$result" = "5" ] || { echo "calculator.add did not return 5"; exit 1; } +if [[ "$result" != *"5"* ]]; then + echo "calculator.add did not return 5" + exit 1 +fi echo "OK: calculator.add(2,3) == 5" From 68b74e909c680f030ca4b54611e0ff00faa0b95d Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 11:02:25 +0000 Subject: [PATCH 14/17] address reviews --- cmd/sam-node/mcp_handlers.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/cmd/sam-node/mcp_handlers.go b/cmd/sam-node/mcp_handlers.go index 19f32b8..5711663 100644 --- a/cmd/sam-node/mcp_handlers.go +++ b/cmd/sam-node/mcp_handlers.go @@ -741,6 +741,9 @@ func (n *SamNode) fetchRemoteResourceCatalogue(ctx context.Context, targetPeer p listRes, err := session.ListResources(ctx, &mcp.ListResourcesParams{}) if err == nil && listRes != nil { for _, r := range listRes.Resources { + if r == nil { + continue + } // Namespace the URI scheme or name if we want, but resources are URIs. // We'll prefix the Name to indicate origin. r.Name = svc.Name + "." + r.Name @@ -755,6 +758,9 @@ func (n *SamNode) fetchRemoteResourceCatalogue(ctx context.Context, targetPeer p func appendFilteredResourceRows(rows []remoteResourceRow, peerID string, resources []*mcp.Resource, serviceName string) []remoteResourceRow { for _, r := range resources { + if r == nil { + continue + } if serviceName != "" && !strings.HasPrefix(r.Name, serviceName+".") { continue } @@ -835,7 +841,7 @@ func (n *SamNode) handleReadRemoteResource(ctx context.Context, req *mcp.CallToo }) if err == nil && res != nil && len(res.Contents) > 0 { - defer cleanup() + cleanup() // Marshal the contents data, _ := json.Marshal(res.Contents) @@ -953,6 +959,9 @@ func (n *SamNode) fetchRemotePromptCatalogue(ctx context.Context, targetPeer pee listRes, err := session.ListPrompts(ctx, &mcp.ListPromptsParams{}) if err == nil && listRes != nil { for _, p := range listRes.Prompts { + if p == nil { + continue + } p.Name = svc.Name + "." + p.Name allPrompts = append(allPrompts, p) } @@ -965,6 +974,9 @@ func (n *SamNode) fetchRemotePromptCatalogue(ctx context.Context, targetPeer pee func appendFilteredPromptRows(rows []remotePromptRow, peerID string, prompts []*mcp.Prompt, serviceName string) []remotePromptRow { for _, p := range prompts { + if p == nil { + continue + } if serviceName != "" && !strings.HasPrefix(p.Name, serviceName+".") { continue } From 972be2e671199dbae01e9107b624a7a81e4d7726 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 11:14:51 +0000 Subject: [PATCH 15/17] revert: remove mcp prompts and pagination --- cmd/sam-node/mcp.go | 81 ----- cmd/sam-node/mcp_handlers.go | 485 +----------------------------- cmd/sam-node/mcp_handlers_test.go | 22 +- tests/e2e/find_remote_tools.bats | 2 +- 4 files changed, 11 insertions(+), 579 deletions(-) diff --git a/cmd/sam-node/mcp.go b/cmd/sam-node/mcp.go index 63a7888..6790efa 100644 --- a/cmd/sam-node/mcp.go +++ b/cmd/sam-node/mcp.go @@ -143,87 +143,6 @@ func NewMCPServer(node *SamNode) *mcp.Server { Description: "Returns the last few lines of the node's log output.", }, node.handleGetRecentLogs) - // Add the find_remote_resources tool. - mcp.AddTool(mcpServer, &mcp.Tool{ - Name: "find_remote_resources", - Description: "Discover MCP resources available on hosted services across the mesh. Returns name + description per resource.", - }, node.handleFindRemoteResources) - - // Add the read_remote_resource tool. - mcp.AddTool(mcpServer, &mcp.Tool{ - Name: "read_remote_resource", - Description: "Read a specific resource from a remote peer.", - }, node.handleReadRemoteResource) - - // Add the find_remote_prompts tool. - mcp.AddTool(mcpServer, &mcp.Tool{ - Name: "find_remote_prompts", - Description: "Discover MCP prompts available on hosted services across the mesh.", - }, node.handleFindRemotePrompts) - - // Add the get_remote_prompt tool. - mcp.AddTool(mcpServer, &mcp.Tool{ - Name: "get_remote_prompt", - Description: "Get a specific prompt from a remote peer.", - }, node.handleGetRemotePrompt) - - // Add a local resource for mesh state - mcpServer.AddResource(&mcp.Resource{ - URI: "mesh://state", - Name: "Mesh State", - Description: "Current state of the SAM node and mesh connections", - MIMEType: "application/json", - }, func(ctx context.Context, req *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) { - peers := node.Host.Network().Peers() - connectedPeers := make([]string, 0, len(peers)) - for _, p := range peers { - connectedPeers = append(connectedPeers, p.String()) - } - resData := map[string]any{ - "connected_peers": connectedPeers, - "dht_size": node.DHT.RoutingTable().Size(), - "hub_peer_id": node.HubPeerID.String(), - "peer_id": node.Host.ID().String(), - } - data, _ := json.MarshalIndent(resData, "", " ") - return &mcp.ReadResourceResult{ - Contents: []*mcp.ResourceContents{ - { - URI: "mesh://state", - MIMEType: "application/json", - Text: string(data), - }, - }, - }, nil - }) - - // Add a local prompt - mcpServer.AddPrompt(&mcp.Prompt{ - Name: "mesh_debugging", - Description: "A standard prompt for debugging mesh connectivity issues", - Arguments: []*mcp.PromptArgument{ - { - Name: "peer_id", - Description: "The ID of the peer you are trying to reach", - Required: true, - }, - }, - }, func(ctx context.Context, req *mcp.GetPromptRequest) (*mcp.GetPromptResult, error) { - peerID := req.Params.Arguments["peer_id"] - text := fmt.Sprintf("I am trying to debug connectivity to peer %s. Please check the local mesh state using the mesh://state resource, and then use the check_connectivity tool to diagnose the network path. Finally, use discover_remote_services to see if they are advertising the expected service.", peerID) - return &mcp.GetPromptResult{ - Description: "Debug mesh connectivity", - Messages: []*mcp.PromptMessage{ - { - Role: "user", - Content: &mcp.TextContent{ - Text: text, - }, - }, - }, - }, nil - }) - return mcpServer } diff --git a/cmd/sam-node/mcp_handlers.go b/cmd/sam-node/mcp_handlers.go index 5711663..e6dd5a7 100644 --- a/cmd/sam-node/mcp_handlers.go +++ b/cmd/sam-node/mcp_handlers.go @@ -1,24 +1,9 @@ -// Copyright 2026 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package main import ( "context" "encoding/json" "fmt" - "strconv" "strings" "sync" "time" @@ -257,7 +242,6 @@ type FindRemoteToolsParams struct { Intent string `json:"intent,omitempty" jsonschema:"Natural-language description of what the caller is looking for. Reserved for future semantic ranking; currently accepted but ignored."` PeerID string `json:"peer_id,omitempty" jsonschema:"Restrict the search to a single peer. Empty means search the whole mesh."` ServiceName string `json:"service_name,omitempty" jsonschema:"Restrict results to tools whose name starts with this service prefix (e.g. 'code-reviewer'). Empty means no service filter."` - Cursor string `json:"cursor,omitempty" jsonschema:"Optional pagination cursor. Pass the nextCursor from a previous response to get the next page."` } // remoteToolRow is one entry in the find_remote_tools response. @@ -327,20 +311,7 @@ func (n *SamNode) handleFindRemoteTools(ctx context.Context, req *mcp.CallToolRe if rows == nil { rows = []remoteToolRow{} } - - paginatedRows, nextCursor, err := PaginateSlice(rows, params.Cursor, 50) - if err != nil { - return nil, nil, err - } - - respObj := map[string]any{ - "items": paginatedRows, - } - if nextCursor != "" { - respObj["nextCursor"] = nextCursor - } - - respData, err := json.Marshal(respObj) + respData, err := json.Marshal(rows) if err != nil { return nil, nil, err } @@ -638,457 +609,3 @@ func (n *SamNode) handleGetRecentLogs(ctx context.Context, req *mcp.CallToolRequ Content: []mcp.Content{&mcp.TextContent{Text: string(data)}}, }, nil, nil } - -// FindRemoteResourcesParams defines the parameters for find_remote_resources. -type FindRemoteResourcesParams struct { - PeerID string `json:"peer_id,omitempty" jsonschema:"Restrict the search to a single peer. Empty means search the whole mesh."` - ServiceName string `json:"service_name,omitempty" jsonschema:"Restrict results to resources whose name starts with this service prefix."` - Cursor string `json:"cursor,omitempty" jsonschema:"Optional pagination cursor."` -} - -// remoteResourceRow is one entry in the find_remote_resources response. -type remoteResourceRow struct { - PeerID string `json:"peer_id"` - ResourceURI string `json:"resource_uri"` - Name string `json:"name"` - Description string `json:"description"` -} - -func (n *SamNode) handleFindRemoteResources(ctx context.Context, req *mcp.CallToolRequest, params FindRemoteResourcesParams) (*mcp.CallToolResult, any, error) { - selfID := n.Host.ID().String() - if params.PeerID != "" && params.PeerID == selfID { - return nil, nil, fmt.Errorf("peer_id %q is this node; cross-mesh discovery cannot target self", params.PeerID) - } - - var rows []remoteResourceRow - - if params.PeerID != "" { - pid, err := peer.Decode(params.PeerID) - if err != nil { - return nil, nil, fmt.Errorf("invalid peer_id %q: %w", params.PeerID, err) - } - resources, err := n.fetchRemoteResourceCatalogue(ctx, pid) - if err != nil { - return nil, nil, err - } - rows = appendFilteredResourceRows(rows, params.PeerID, resources, params.ServiceName) - } else { - providers, err := n.DiscoverRemoteServices(ctx, api.ServiceType_SERVICE_TYPE_MCP, "") - if err != nil { - return nil, nil, fmt.Errorf("discover providers: %w", err) - } - seen := map[string]bool{} - var peerIDs []peer.ID - for _, p := range providers { - if p.PeerId == selfID || seen[p.PeerId] { - continue - } - seen[p.PeerId] = true - pid, err := peer.Decode(p.PeerId) - if err != nil { - continue - } - peerIDs = append(peerIDs, pid) - } - - rows = n.fanOutFetchResources(ctx, peerIDs, params.ServiceName) - } - - if rows == nil { - rows = []remoteResourceRow{} - } - - paginatedRows, nextCursor, err := PaginateSlice(rows, params.Cursor, 50) - if err != nil { - return nil, nil, err - } - - respObj := map[string]any{ - "items": paginatedRows, - } - if nextCursor != "" { - respObj["nextCursor"] = nextCursor - } - - respData, err := json.Marshal(respObj) - if err != nil { - return nil, nil, err - } - return &mcp.CallToolResult{ - Content: []mcp.Content{&mcp.TextContent{Text: string(respData)}}, - }, nil, nil -} - -func (n *SamNode) fetchRemoteResourceCatalogue(ctx context.Context, targetPeer peer.ID) ([]*mcp.Resource, error) { - services, err := n.fetchRemoteServiceCatalog(ctx, targetPeer, "MCP") - if err != nil { - return nil, fmt.Errorf("fetch remote service catalog: %w", err) - } - - var allResources []*mcp.Resource - - for _, svc := range services { - if svc.Type != api.ServiceType_SERVICE_TYPE_MCP { - continue - } - - n.preparePeerAddrs(ctx, targetPeer) - session, cleanup, err := n.ConnectMCPSession(ctx, targetPeer, svc.Name) - if err != nil { - continue - } - - listRes, err := session.ListResources(ctx, &mcp.ListResourcesParams{}) - if err == nil && listRes != nil { - for _, r := range listRes.Resources { - if r == nil { - continue - } - // Namespace the URI scheme or name if we want, but resources are URIs. - // We'll prefix the Name to indicate origin. - r.Name = svc.Name + "." + r.Name - allResources = append(allResources, r) - } - } - cleanup() - } - - return allResources, nil -} - -func appendFilteredResourceRows(rows []remoteResourceRow, peerID string, resources []*mcp.Resource, serviceName string) []remoteResourceRow { - for _, r := range resources { - if r == nil { - continue - } - if serviceName != "" && !strings.HasPrefix(r.Name, serviceName+".") { - continue - } - rows = append(rows, remoteResourceRow{ - PeerID: peerID, - ResourceURI: r.URI, - Name: r.Name, - Description: r.Description, - }) - } - return rows -} - -func (n *SamNode) fanOutFetchResources(ctx context.Context, peers []peer.ID, serviceName string) []remoteResourceRow { - const maxConcurrent = 8 - sem := make(chan struct{}, maxConcurrent) - - var ( - mu sync.Mutex - rows []remoteResourceRow - ) - - var wg sync.WaitGroup - for _, pid := range peers { - pid := pid - wg.Add(1) - go func() { - defer wg.Done() - sem <- struct{}{} - defer func() { <-sem }() - - peerCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - resources, err := n.fetchRemoteResourceCatalogue(peerCtx, pid) - if err != nil { - return - } - mu.Lock() - rows = appendFilteredResourceRows(rows, pid.String(), resources, serviceName) - mu.Unlock() - }() - } - wg.Wait() - return rows -} - -// ReadRemoteResourceParams defines the parameters for read_remote_resource. -type ReadRemoteResourceParams struct { - PeerID string `json:"peer_id" jsonschema:"The Peer ID of the target agent"` - URI string `json:"uri" jsonschema:"The URI of the remote resource"` -} - -func (n *SamNode) handleReadRemoteResource(ctx context.Context, req *mcp.CallToolRequest, params ReadRemoteResourceParams) (*mcp.CallToolResult, any, error) { - targetPeer, err := peer.Decode(params.PeerID) - if err != nil { - return nil, nil, err - } - - // Try all MCP services on the remote peer to see which one has the resource - services, err := n.fetchRemoteServiceCatalog(ctx, targetPeer, "MCP") - if err != nil { - return nil, nil, err - } - - for _, svc := range services { - if svc.Type != api.ServiceType_SERVICE_TYPE_MCP { - continue - } - - session, cleanup, err := n.ConnectMCPSession(ctx, targetPeer, svc.Name) - if err != nil { - continue - } - - res, err := session.ReadResource(ctx, &mcp.ReadResourceParams{ - URI: params.URI, - }) - - if err == nil && res != nil && len(res.Contents) > 0 { - cleanup() - - // Marshal the contents - data, _ := json.Marshal(res.Contents) - return &mcp.CallToolResult{ - Content: []mcp.Content{ - &mcp.TextContent{Text: string(data)}, - }, - }, nil, nil - } - cleanup() - } - - return nil, nil, fmt.Errorf("resource %s not found on peer %s", params.URI, params.PeerID) -} - -// FindRemotePromptsParams defines the parameters for find_remote_prompts. -type FindRemotePromptsParams struct { - PeerID string `json:"peer_id,omitempty" jsonschema:"Restrict the search to a single peer. Empty means search the whole mesh."` - ServiceName string `json:"service_name,omitempty" jsonschema:"Restrict results to prompts whose name starts with this service prefix."` - Cursor string `json:"cursor,omitempty" jsonschema:"Optional pagination cursor."` -} - -// remotePromptRow is one entry in the find_remote_prompts response. -type remotePromptRow struct { - PeerID string `json:"peer_id"` - Name string `json:"name"` - Description string `json:"description"` - Arguments []*mcp.PromptArgument `json:"arguments"` -} - -func (n *SamNode) handleFindRemotePrompts(ctx context.Context, req *mcp.CallToolRequest, params FindRemotePromptsParams) (*mcp.CallToolResult, any, error) { - selfID := n.Host.ID().String() - if params.PeerID != "" && params.PeerID == selfID { - return nil, nil, fmt.Errorf("peer_id %q is this node; cross-mesh discovery cannot target self", params.PeerID) - } - - var rows []remotePromptRow - - if params.PeerID != "" { - pid, err := peer.Decode(params.PeerID) - if err != nil { - return nil, nil, fmt.Errorf("invalid peer_id %q: %w", params.PeerID, err) - } - prompts, err := n.fetchRemotePromptCatalogue(ctx, pid) - if err != nil { - return nil, nil, err - } - rows = appendFilteredPromptRows(rows, params.PeerID, prompts, params.ServiceName) - } else { - providers, err := n.DiscoverRemoteServices(ctx, api.ServiceType_SERVICE_TYPE_MCP, "") - if err != nil { - return nil, nil, fmt.Errorf("discover providers: %w", err) - } - seen := map[string]bool{} - var peerIDs []peer.ID - for _, p := range providers { - if p.PeerId == selfID || seen[p.PeerId] { - continue - } - seen[p.PeerId] = true - pid, err := peer.Decode(p.PeerId) - if err != nil { - continue - } - peerIDs = append(peerIDs, pid) - } - - rows = n.fanOutFetchPrompts(ctx, peerIDs, params.ServiceName) - } - - if rows == nil { - rows = []remotePromptRow{} - } - - paginatedRows, nextCursor, err := PaginateSlice(rows, params.Cursor, 50) - if err != nil { - return nil, nil, err - } - - respObj := map[string]any{ - "items": paginatedRows, - } - if nextCursor != "" { - respObj["nextCursor"] = nextCursor - } - - respData, err := json.Marshal(respObj) - if err != nil { - return nil, nil, err - } - return &mcp.CallToolResult{ - Content: []mcp.Content{&mcp.TextContent{Text: string(respData)}}, - }, nil, nil -} - -func (n *SamNode) fetchRemotePromptCatalogue(ctx context.Context, targetPeer peer.ID) ([]*mcp.Prompt, error) { - services, err := n.fetchRemoteServiceCatalog(ctx, targetPeer, "MCP") - if err != nil { - return nil, fmt.Errorf("fetch remote service catalog: %w", err) - } - - var allPrompts []*mcp.Prompt - - for _, svc := range services { - if svc.Type != api.ServiceType_SERVICE_TYPE_MCP { - continue - } - - n.preparePeerAddrs(ctx, targetPeer) - session, cleanup, err := n.ConnectMCPSession(ctx, targetPeer, svc.Name) - if err != nil { - continue - } - - listRes, err := session.ListPrompts(ctx, &mcp.ListPromptsParams{}) - if err == nil && listRes != nil { - for _, p := range listRes.Prompts { - if p == nil { - continue - } - p.Name = svc.Name + "." + p.Name - allPrompts = append(allPrompts, p) - } - } - cleanup() - } - - return allPrompts, nil -} - -func appendFilteredPromptRows(rows []remotePromptRow, peerID string, prompts []*mcp.Prompt, serviceName string) []remotePromptRow { - for _, p := range prompts { - if p == nil { - continue - } - if serviceName != "" && !strings.HasPrefix(p.Name, serviceName+".") { - continue - } - rows = append(rows, remotePromptRow{ - PeerID: peerID, - Name: p.Name, - Description: p.Description, - Arguments: p.Arguments, - }) - } - return rows -} - -func (n *SamNode) fanOutFetchPrompts(ctx context.Context, peers []peer.ID, serviceName string) []remotePromptRow { - const maxConcurrent = 8 - sem := make(chan struct{}, maxConcurrent) - - var ( - mu sync.Mutex - rows []remotePromptRow - ) - - var wg sync.WaitGroup - for _, pid := range peers { - pid := pid - wg.Add(1) - go func() { - defer wg.Done() - sem <- struct{}{} - defer func() { <-sem }() - - peerCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - prompts, err := n.fetchRemotePromptCatalogue(peerCtx, pid) - if err != nil { - return - } - mu.Lock() - rows = appendFilteredPromptRows(rows, pid.String(), prompts, serviceName) - mu.Unlock() - }() - } - wg.Wait() - return rows -} - -// GetRemotePromptParams defines the parameters for get_remote_prompt. -type GetRemotePromptParams struct { - PeerID string `json:"peer_id" jsonschema:"The Peer ID of the target agent"` - Name string `json:"name" jsonschema:"The namespaced name of the remote prompt (e.g. 'service.prompt')"` - Arguments map[string]string `json:"arguments,omitempty" jsonschema:"Arguments to pass to the prompt"` -} - -func (n *SamNode) handleGetRemotePrompt(ctx context.Context, req *mcp.CallToolRequest, params GetRemotePromptParams) (*mcp.CallToolResult, any, error) { - targetPeer, err := peer.Decode(params.PeerID) - if err != nil { - return nil, nil, err - } - - targetService := api.CatalogTarget - originalPromptName := params.Name - if parts := strings.SplitN(params.Name, ".", 2); len(parts) == 2 { - targetService = parts[0] - originalPromptName = parts[1] - } - - session, cleanup, err := n.ConnectMCPSession(ctx, targetPeer, targetService) - if err != nil { - return nil, nil, err - } - defer cleanup() - - res, err := session.GetPrompt(ctx, &mcp.GetPromptParams{ - Name: originalPromptName, - Arguments: params.Arguments, - }) - - if err != nil { - return nil, nil, fmt.Errorf("failed to get prompt %s: %w", params.Name, err) - } - - data, _ := json.Marshal(res) - return &mcp.CallToolResult{ - Content: []mcp.Content{ - &mcp.TextContent{Text: string(data)}, - }, - }, nil, nil -} - -// PaginateSlice ... -func PaginateSlice[T any](items []T, cursor string, limit int) ([]T, string, error) { - if limit <= 0 { - limit = 50 - } - startIdx := 0 - if cursor != "" { - idx, err := strconv.Atoi(cursor) - if err != nil || idx < 0 { - return nil, "", fmt.Errorf("invalid cursor: %q", cursor) - } - startIdx = idx - } - if startIdx >= len(items) { - return []T{}, "", nil - } - endIdx := startIdx + limit - nextCursor := "" - if endIdx < len(items) { - nextCursor = strconv.Itoa(endIdx) - } else { - endIdx = len(items) - } - return items[startIdx:endIdx], nextCursor, nil -} diff --git a/cmd/sam-node/mcp_handlers_test.go b/cmd/sam-node/mcp_handlers_test.go index 8131244..05db489 100644 --- a/cmd/sam-node/mcp_handlers_test.go +++ b/cmd/sam-node/mcp_handlers_test.go @@ -72,14 +72,12 @@ func TestHandleFindRemoteTools_EmptyMesh_ReturnsEmptyArray(t *testing.T) { if !ok { t.Fatalf("expected TextContent, got %T", res.Content[0]) } - var resp struct { - Items []map[string]any `json:"items"` - } - if err := json.Unmarshal([]byte(tc.Text), &resp); err != nil { - t.Fatalf("response not JSON object: %v (text: %q)", err, tc.Text) + var rows []map[string]any + if err := json.Unmarshal([]byte(tc.Text), &rows); err != nil { + t.Fatalf("response not JSON array: %v (text: %q)", err, tc.Text) } - if len(resp.Items) != 0 { - t.Errorf("expected empty results for empty mesh, got %d rows", len(resp.Items)) + if len(rows) != 0 { + t.Errorf("expected empty results for empty mesh, got %d rows", len(rows)) } } @@ -156,10 +154,8 @@ func TestHandleFindRemoteTools_SinglePeer(t *testing.T) { if !ok { t.Fatalf("expected TextContent, got %T", res.Content[0]) } - var resp struct { - Items []remoteToolRow `json:"items"` - } - if err := json.Unmarshal([]byte(tc.Text), &resp); err != nil { + var rows []remoteToolRow + if err := json.Unmarshal([]byte(tc.Text), &rows); err != nil { t.Fatalf("unmarshal: %v (text: %q)", err, tc.Text) } @@ -167,7 +163,7 @@ func TestHandleFindRemoteTools_SinglePeer(t *testing.T) { "code-reviewer.review_pr": false, "code-reviewer.add_comment": false, } - for _, row := range resp.Items { + for _, row := range rows { if row.PeerID != nodeB.Host.ID().String() { t.Errorf("row has peer_id %q, want %q", row.PeerID, nodeB.Host.ID().String()) } @@ -177,7 +173,7 @@ func TestHandleFindRemoteTools_SinglePeer(t *testing.T) { } for name, found := range wantNames { if !found { - t.Errorf("expected tool %q in response, not found; rows=%+v", name, resp.Items) + t.Errorf("expected tool %q in response, not found; rows=%+v", name, rows) } } } diff --git a/tests/e2e/find_remote_tools.bats b/tests/e2e/find_remote_tools.bats index 35a3173..19eb0ea 100644 --- a/tests/e2e/find_remote_tools.bats +++ b/tests/e2e/find_remote_tools.bats @@ -79,7 +79,7 @@ teardown() { local match_count match_count=$(echo "$catalog" | jq --arg pid "${node2_peer_id}" ' - [.items[] | select(.peer_id == $pid + [.[] | select(.peer_id == $pid and (.tool_name | startswith("calculator.")))] | length ') echo "Matching calculator tool entries: ${match_count}" From 53b15af29c3fc764bd2c567e42cb0c53a7e37f5e Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 11:32:52 +0000 Subject: [PATCH 16/17] Fix flake due to biscuit timeout --- cmd/sam-node/gate_test.go | 5 +++-- cmd/sam-node/mcp_test.go | 10 +++++++--- cmd/sam-node/middleware_test.go | 18 +++++++++++------- cmd/sam-node/node_test.go | 8 +++++--- cmd/sam-node/oidc_test.go | 8 ++++---- cmd/sam-node/relay_acl_test.go | 5 +++-- cmd/sam-node/sidecar_test.go | 14 +++++++------- 7 files changed, 40 insertions(+), 28 deletions(-) diff --git a/cmd/sam-node/gate_test.go b/cmd/sam-node/gate_test.go index 4220a2c..c14ca7b 100644 --- a/cmd/sam-node/gate_test.go +++ b/cmd/sam-node/gate_test.go @@ -48,8 +48,9 @@ func TestConnectionGater(t *testing.T) { } node := &SamNode{ - Store: store, - revokedPeers: cache, + Store: store, + revokedPeers: cache, + BiscuitTimeout: 500 * time.Millisecond, } gater := &nodeConnGate{node: node} diff --git a/cmd/sam-node/mcp_test.go b/cmd/sam-node/mcp_test.go index 0fe4464..fdb5f18 100644 --- a/cmd/sam-node/mcp_test.go +++ b/cmd/sam-node/mcp_test.go @@ -20,6 +20,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/peerstore" @@ -29,7 +30,9 @@ import ( func TestMCPHandler_HTTP(t *testing.T) { // Setup a dummy node - node := &SamNode{} + node := &SamNode{ + BiscuitTimeout: 500 * time.Millisecond, + } handler := NewMCPHandler(node) ts := httptest.NewServer(handler) @@ -88,8 +91,9 @@ func TestResolveRelayAddresses(t *testing.T) { defer func() { _ = kdht.Close() }() node := &SamNode{ - Host: localHost, - DHT: kdht, + Host: localHost, + DHT: kdht, + BiscuitTimeout: 500 * time.Millisecond, } // Create relay host diff --git a/cmd/sam-node/middleware_test.go b/cmd/sam-node/middleware_test.go index a2f0f51..050daf8 100644 --- a/cmd/sam-node/middleware_test.go +++ b/cmd/sam-node/middleware_test.go @@ -146,9 +146,10 @@ func TestAuthorize(t *testing.T) { } node := &SamNode{ - Store: store, - trustedKeys: []TrustedKey{{Key: pub, ReceivedAt: time.Now()}}, - TrustHubRBAC: true, + Store: store, + trustedKeys: []TrustedKey{{Key: pub, ReceivedAt: time.Now()}}, + TrustHubRBAC: true, + BiscuitTimeout: 500 * time.Millisecond, } req := RequestContext{ @@ -357,9 +358,10 @@ func TestRevocation(t *testing.T) { cache, _ := lru.New[string, int64](10000) rl, _ := NewPeerRateLimiter(100) node := &SamNode{ - trustedKeys: []TrustedKey{{Key: pub, ReceivedAt: time.Now()}}, - revokedPeers: cache, - rateLimiter: rl, + trustedKeys: []TrustedKey{{Key: pub, ReceivedAt: time.Now()}}, + revokedPeers: cache, + rateLimiter: rl, + BiscuitTimeout: 500 * time.Millisecond, } // Mark as revoked @@ -413,7 +415,8 @@ func TestVerifyEvent(t *testing.T) { } node := &SamNode{ - trustedKeys: []TrustedKey{{Key: pub, ReceivedAt: time.Now()}}, + trustedKeys: []TrustedKey{{Key: pub, ReceivedAt: time.Now()}}, + BiscuitTimeout: 500 * time.Millisecond, } event := &api.MeshEvent{ @@ -480,6 +483,7 @@ func TestVerifyBiscuitCache(t *testing.T) { node := &SamNode{ trustedKeys: []TrustedKey{{Key: pub, ReceivedAt: time.Now()}}, verificationCache: cache, + BiscuitTimeout: 500 * time.Millisecond, } // Case 1: Fresh verification (uncached) diff --git a/cmd/sam-node/node_test.go b/cmd/sam-node/node_test.go index 15aa901..9d797ea 100644 --- a/cmd/sam-node/node_test.go +++ b/cmd/sam-node/node_test.go @@ -30,7 +30,8 @@ import ( func TestHandleBannedEvent(t *testing.T) { revokedCache, _ := lru.New[string, int64](10) node := &SamNode{ - revokedPeers: revokedCache, + revokedPeers: revokedCache, + BiscuitTimeout: 500 * time.Millisecond, } event := &api.MeshEvent{ @@ -47,7 +48,7 @@ func TestHandleBannedEvent(t *testing.T) { } func TestHandleKeyRotationEvent(t *testing.T) { - node := &SamNode{} + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond} pub, _, err := ed25519.GenerateKey(nil) if err != nil { @@ -74,7 +75,8 @@ func TestStartRenewalLoop_ExpiredAndFails(t *testing.T) { _ = store.SaveIdentityExpiration(time.Now().Add(-1 * time.Hour).Unix()) node := &SamNode{ - Store: store, + BiscuitTimeout: 500 * time.Millisecond, + Store: store, } // Run the renewal loop. Since there's no JWT/Issuer provided, it fails to renew. diff --git a/cmd/sam-node/oidc_test.go b/cmd/sam-node/oidc_test.go index af0e9a1..a72ad5a 100644 --- a/cmd/sam-node/oidc_test.go +++ b/cmd/sam-node/oidc_test.go @@ -48,7 +48,7 @@ func TestInteractiveLogin(t *testing.T) { server := httptest.NewServer(mux) defer server.Close() - node := &SamNode{} + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond} ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -93,7 +93,7 @@ func TestDiscoverEndpoints(t *testing.T) { server := httptest.NewServer(mux) defer server.Close() - node := &SamNode{} + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond} ctx := context.Background() tokenURL, authURL, err := node.DiscoverEndpoints(ctx, server.URL) @@ -146,7 +146,7 @@ func TestInteractiveLoginWithRefresh(t *testing.T) { } }() - node := &SamNode{Store: store} + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond, Store: store} ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -250,7 +250,7 @@ func TestRenewWithRefreshToken(t *testing.T) { t.Fatalf("Failed to save Refresh Token: %v", err) } - node := &SamNode{Store: store} + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond, Store: store} ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/cmd/sam-node/relay_acl_test.go b/cmd/sam-node/relay_acl_test.go index abfdc6a..0b33e55 100644 --- a/cmd/sam-node/relay_acl_test.go +++ b/cmd/sam-node/relay_acl_test.go @@ -16,13 +16,14 @@ package main import ( "testing" + "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" ) func TestNodeRelayACL_AllowConnect(t *testing.T) { - node := &SamNode{} + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond} acl := &nodeRelayACL{node: node} srcPeer := peer.ID("src-peer") @@ -49,7 +50,7 @@ func TestNodeRelayACL_AllowConnect(t *testing.T) { } func TestNodeRelayACL_AllowReserve(t *testing.T) { - node := &SamNode{} + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond} acl := &nodeRelayACL{node: node} peerID := peer.ID("some-peer") diff --git a/cmd/sam-node/sidecar_test.go b/cmd/sam-node/sidecar_test.go index 12d26d3..574d585 100644 --- a/cmd/sam-node/sidecar_test.go +++ b/cmd/sam-node/sidecar_test.go @@ -146,7 +146,7 @@ func TestHandleRegisterService(t *testing.T) { // Wait for DHT to recognize the peer time.Sleep(100 * time.Millisecond) - node := &SamNode{ + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond, services: NewServiceRegistry(d), DHT: d, } @@ -185,7 +185,7 @@ func TestHandleRegisterService(t *testing.T) { } func TestHandleUnregisterService(t *testing.T) { - node := &SamNode{ + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond, services: NewServiceRegistry(&fakeDHT{}), } node.services.insertService(&testService{info: &api.ServiceInfo{Name: "test-service"}}) @@ -222,7 +222,7 @@ func TestHandleDiscoverService(t *testing.T) { } defer func() { _ = d.Close() }() - node := &SamNode{ + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond, services: NewServiceRegistry(d), DHT: d, Host: h, @@ -281,7 +281,7 @@ func TestHandleDiscoverService(t *testing.T) { } func TestListLocalServices(t *testing.T) { - node := &SamNode{ + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond, services: NewServiceRegistry(&fakeDHT{}), } @@ -299,7 +299,7 @@ func TestListLocalServices(t *testing.T) { } func TestListLocalServices_TypeFilter(t *testing.T) { - node := &SamNode{ + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond, services: NewServiceRegistry(&fakeDHT{}), } mcpA := &api.ServiceInfo{Type: api.ServiceType_SERVICE_TYPE_MCP, Name: "mcp-a"} @@ -398,7 +398,7 @@ func TestServiceKeyToCID_Equivalence(t *testing.T) { } func TestHandleRegisterService_Validation(t *testing.T) { - node := &SamNode{ + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond, services: NewServiceRegistry(&fakeDHT{}), } @@ -458,7 +458,7 @@ func TestHandleRegisterService_Validation(t *testing.T) { } func TestStartSidecarServer_TokenMandatory(t *testing.T) { - node := &SamNode{} + node := &SamNode{BiscuitTimeout: 500 * time.Millisecond} // Test case: No token, no TLS err := startSidecarServer(node, "127.0.0.1:0", "", "", "", "") From f2235a2faeff1a0d93e3cac7ae3f585a85edfd1d Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Mon, 29 Jun 2026 11:45:41 +0000 Subject: [PATCH 17/17] fix kind ee2e mesh --- development/kind/test-mesh-e2e.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/development/kind/test-mesh-e2e.sh b/development/kind/test-mesh-e2e.sh index fefeedf..a1f9582 100755 --- a/development/kind/test-mesh-e2e.sh +++ b/development/kind/test-mesh-e2e.sh @@ -49,7 +49,7 @@ for i in $(seq 1 90); do echo "discover attempt $i: $tools" peer="$(printf '%s' "$tools" \ - | jq -r '.items[]? | select(.tool_name=="calculator.add") | .peer_id' 2>/dev/null \ + | jq -r '.[]? | select(.tool_name=="calculator.add") | .peer_id' 2>/dev/null \ | head -n1 || true)" if [ -n "$peer" ]; then