Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/k8s/sam-node-cop-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ spec:
python -u /app/banana_bot_playground.py
env:
- name: SAM_MCP_URL
value: "http://127.0.0.1:8080/mcp/events"
value: "http://127.0.0.1:8080/mcp"
- name: SAM_API_TOKEN
value: "secret-token"
- name: GEMINI_API_KEY
Expand Down
4 changes: 2 additions & 2 deletions .github/k8s/sam-node-openclaw-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ data:
"servers": {
"sam-mesh": {
"enabled": true,
"url": "http://127.0.0.1:8080/mcp/events"
"url": "http://127.0.0.1:8080/mcp"
}
}
}
Expand Down Expand Up @@ -74,7 +74,7 @@ spec:
) >/dev/null 2>&1 </dev/null &
env:
- name: SAM_MCP_URL
value: "http://127.0.0.1:8080/mcp/events"
value: "http://127.0.0.1:8080/mcp"
- name: OPENCLAW_STATE_DIR
value: "/data"
- name: OPENCLAW_GATEWAY_TOKEN
Expand Down
44 changes: 2 additions & 42 deletions .github/workflows/kind-mesh-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,48 +55,8 @@ jobs:
- name: Bring up the kind mesh
run: make kind-up ARGS="-s"

- name: Enroll a local sam-node
run: |
./development/kind/run-local-node.sh > "$RUNNER_TEMP/local-node.log" 2>&1 &
PID=$!
for _ in $(seq 1 60); do
grep -q "SAM Node Online" "$RUNNER_TEMP/local-node.log" && break
kill -0 "$PID" 2>/dev/null || { echo "local node exited early:"; cat "$RUNNER_TEMP/local-node.log"; exit 1; }
sleep 1
done
grep -q "SAM Node Online" "$RUNNER_TEMP/local-node.log" \
|| { echo "local node did not come online:"; cat "$RUNNER_TEMP/local-node.log"; exit 1; }
echo "local node online"

- name: Mesh info, discover, find + call calculator
run: |
set -euo pipefail
URL=http://127.0.0.1:9099/mcp/events
mcp() { ./bin/mcp-client -url "$URL" -timeout 20 "$@" 2>/dev/null; }

echo "== get_mesh_info =="
info=$(mcp -tool get_mesh_info -args '{}')
echo "$info"
echo "$info" | jq -e '.hub_peer_id != "" and (.dht_size > 0)' >/dev/null \
|| { echo "get_mesh_info assertion failed"; exit 1; }

echo "== discover services / find calculator (poll) =="
peer=""
for _ in $(seq 1 30); do
tools=$(mcp -tool find_remote_tools -args '{}' || true)
peer=$(printf '%s' "$tools" | jq -r '.[]? | select(.tool_name=="calculator.add") | .peer_id' 2>/dev/null | head -n1)
[ -n "$peer" ] && break
sleep 1
done
[ -n "$peer" ] || { echo "calculator.add not discovered"; exit 1; }
echo "calculator host: $peer"

echo "== call calculator.add(2,3) =="
result=$(mcp -tool call_remote_tool \
-args "{\"peer_id\":\"$peer\",\"tool_name\":\"calculator.add\",\"arguments\":{\"a\":2,\"b\":3}}")
echo "result: $result"
printf '%s' "$result" | grep -q 5 || { echo "calculator.add did not return 5"; exit 1; }
echo "OK: calculator.add(2,3) == 5"
- name: Run e2e mesh tests
run: make kind-e2e-mesh

- name: Collect logs on failure
if: failure()
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ kind-down:
kind-local-node:
./development/kind/run-local-node.sh $(ARGS)

.PHONY: kind-e2e-mesh
kind-e2e-mesh: build
./development/kind/test-mesh-e2e.sh

test:
CGO_ENABLED=1 go test -v -race -count 1 $(if $(WHAT),-run $(WHAT)) ./...

Expand Down
2 changes: 1 addition & 1 deletion cmd/mcp-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func main() {
}

// Construct URL
baseURL := strings.TrimSuffix(*serverURL, "/mcp/events")
baseURL := strings.TrimSuffix(*serverURL, "/mcp")
baseURL = strings.TrimSuffix(baseURL, "/")
if !strings.Contains(baseURL, "/sam/service/discover") {
baseURL = baseURL + "/sam/service/discover"
Expand Down
19 changes: 16 additions & 3 deletions cmd/sam-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,20 @@ func main() {
if jwtStr == "" {
token, _ := store.LoadIdentity()
if len(token) == 0 {
logger.Fatal("No JWT or stored identity found. Cannot authenticate.")
displayHub := hubAddr
if displayHub == "" {
if h, err := store.LoadHubURL(); err == nil && h != "" {
displayHub = h
} else {
displayHub = "https://bananas.sam-mesh.dev"
}
}
logger.Infof("No identity found. Starting unauthenticated sidecar for enrollment over MCP...")
if err := startUnauthSidecarServer(displayHub, bindAddrFlag, tlsCertFlag, tlsKeyFlag); err != nil {
logger.Fatalf("Failed to start unauthenticated sidecar: %v", err)
}
<-ctx.Done()
return
}
logger.Infoln("Using stored identity.")

Expand Down Expand Up @@ -554,7 +567,7 @@ func resolveDataDir() string {
func getOrGenerateKey(s *Store) crypto.PrivKey {
kb, _ := s.LoadKey()
if len(kb) == 0 {
fmt.Println("[Store] Generating new Peer Identity...")
logger.Info("[Store] Generating new Peer Identity...")
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1)
if err != nil {
logger.Fatalf("Failed to generate key: %v", err)
Expand Down Expand Up @@ -667,6 +680,6 @@ func (n *SamNode) Enroll(ctx context.Context, jwt string) error {
return fmt.Errorf("failed to connect and authenticate with any hub after HTTP enrollment (last error: %v)", lastAuthErr)
}

fmt.Println("Successfully enrolled via HTTP and stored identity and hub config.")
logger.Info("Successfully enrolled via HTTP and stored identity and hub config.")
return nil
}
152 changes: 147 additions & 5 deletions cmd/sam-node/mcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@

Remote tool names are namespaced as '<service>.<tool>' (e.g. 'code-reviewer.review_pr'). Prefer discovering and describing a tool before calling it rather than guessing arguments.`

// NewMCPHandler creates a new HTTP handler for the MCP server using the official SDK.
func NewMCPHandler(node *SamNode) http.Handler {
// Create an MCP server.
// NewMCPServer creates a new MCP server instance with all tools registered.
func NewMCPServer(node *SamNode) *mcp.Server {
mcpServer := mcp.NewServer(&mcp.Implementation{
Name: "sam-node-mcp",
Version: "0.1.0",
Expand Down Expand Up @@ -144,14 +143,157 @@
Description: "Returns the last few lines of the node's log output.",
}, node.handleGetRecentLogs)

// Add the find_remote_resources tool.
mcp.AddTool(mcpServer, &mcp.Tool{
Name: "find_remote_resources",
Description: "Discover MCP resources available on hosted services across the mesh. Returns name + description per resource.",
}, node.handleFindRemoteResources)

// Add the read_remote_resource tool.
mcp.AddTool(mcpServer, &mcp.Tool{
Name: "read_remote_resource",
Description: "Read a specific resource from a remote peer.",
}, node.handleReadRemoteResource)

// Add the find_remote_prompts tool.
mcp.AddTool(mcpServer, &mcp.Tool{
Name: "find_remote_prompts",
Description: "Discover MCP prompts available on hosted services across the mesh.",
}, node.handleFindRemotePrompts)

// Add the get_remote_prompt tool.
mcp.AddTool(mcpServer, &mcp.Tool{
Name: "get_remote_prompt",
Description: "Get a specific prompt from a remote peer.",
}, node.handleGetRemotePrompt)

// Add a local resource for mesh state
mcpServer.AddResource(&mcp.Resource{
URI: "mesh://state",
Name: "Mesh State",
Description: "Current state of the SAM node and mesh connections",
MIMEType: "application/json",
}, func(ctx context.Context, req *mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) {
peers := node.Host.Network().Peers()
connectedPeers := make([]string, 0, len(peers))
for _, p := range peers {
connectedPeers = append(connectedPeers, p.String())
}
resData := map[string]any{
"connected_peers": connectedPeers,
"dht_size": node.DHT.RoutingTable().Size(),
"hub_peer_id": node.HubPeerID.String(),
"peer_id": node.Host.ID().String(),
}
data, _ := json.MarshalIndent(resData, "", " ")
return &mcp.ReadResourceResult{
Contents: []*mcp.ResourceContents{
{
URI: "mesh://state",
MIMEType: "application/json",
Text: string(data),
},
},
}, nil
})

// Add a local prompt
mcpServer.AddPrompt(&mcp.Prompt{
Name: "mesh_debugging",
Description: "A standard prompt for debugging mesh connectivity issues",
Arguments: []*mcp.PromptArgument{
{
Name: "peer_id",
Description: "The ID of the peer you are trying to reach",
Required: true,
},
},
}, func(ctx context.Context, req *mcp.GetPromptRequest) (*mcp.GetPromptResult, error) {
peerID, _ := req.Params.Arguments["peer_id"]

Check failure on line 212 in cmd/sam-node/mcp.go

View workflow job for this annotation

GitHub Actions / lint

S1005: unnecessary assignment to the blank identifier (staticcheck)
text := fmt.Sprintf("I am trying to debug connectivity to peer %s. Please check the local mesh state using the mesh://state resource, and then use the check_connectivity tool to diagnose the network path. Finally, use discover_remote_services to see if they are advertising the expected service.", peerID)
return &mcp.GetPromptResult{
Description: "Debug mesh connectivity",
Messages: []*mcp.PromptMessage{
{
Role: "user",
Content: &mcp.TextContent{
Text: text,
},
},
},
}, nil
})

return mcpServer
}

// NewUnauthenticatedMCPServer creates a minimal MCP server that instructs the client on how to authenticate.
func NewUnauthenticatedMCPServer(hubURL string) *mcp.Server {
mcpServer := mcp.NewServer(&mcp.Implementation{
Name: "sam-node-mcp-unauth",
Version: "0.1.0",
}, &mcp.ServerOptions{Instructions: "This node is unauthenticated. Use the get_login_instructions tool or help_user_login prompt."})

mcp.AddTool(mcpServer, &mcp.Tool{
Name: "get_login_instructions",
Description: "Get instructions on how to authenticate this node so it can join the SAM mesh.",
}, func(ctx context.Context, req *mcp.CallToolRequest, params map[string]any) (*mcp.CallToolResult, any, error) {
msg := fmt.Sprintf("The node is unauthenticated. Please open a regular terminal and run:\n\n sam-node join %s\n\nOnce complete, restart this MCP client.", hubURL)
return &mcp.CallToolResult{
Content: []mcp.Content{
&mcp.TextContent{Text: msg},
},
}, nil, nil
})

mcpServer.AddPrompt(&mcp.Prompt{
Name: "help_user_login",
Description: "Provides the user with login instructions when the node is unauthenticated.",
}, func(ctx context.Context, req *mcp.GetPromptRequest) (*mcp.GetPromptResult, error) {
return &mcp.GetPromptResult{
Description: "Instructions for joining the SAM mesh.",
Messages: []*mcp.PromptMessage{
{
Role: "user",
Content: &mcp.TextContent{
Text: fmt.Sprintf("Please open a terminal and run:\n\n`sam-node join %s`\n\nAfter you finish, please restart this MCP connection.", hubURL),
},
},
},
}, nil
})

return mcpServer
}

// NewUnauthenticatedMCPHandler creates an HTTP handler for the unauthenticated MCP server.
func NewUnauthenticatedMCPHandler(hubURL string) http.Handler {
mcpServer := NewUnauthenticatedMCPServer(hubURL)

sseHandler := mcp.NewSSEHandler(func(request *http.Request) *mcp.Server {
return mcpServer
}, nil)

mux := http.NewServeMux()
mux.Handle("/mcp", sseHandler)

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger.Debugf("Unauth MCP Request: %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)
mux.ServeHTTP(w, r)
})
}

// NewMCPHandler creates a new HTTP handler for the MCP server using the official SDK.
func NewMCPHandler(node *SamNode) http.Handler {
mcpServer := NewMCPServer(node)

// Create the SSE handler using the SDK
sseHandler := mcp.NewSSEHandler(func(request *http.Request) *mcp.Server {
return mcpServer
}, nil)

mux := http.NewServeMux()
mux.Handle("/mcp/events", sseHandler)
mux.Handle("/mcp/message", sseHandler)
mux.Handle("/mcp", sseHandler)

// Wrap in logging middleware to debug incoming requests
wrappedHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading
Loading