Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 89 additions & 24 deletions cmd/thv-operator/api/v1alpha1/mcptelemetryconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/stacklok/toolhive/pkg/telemetry"
)

// SensitiveHeader represents a header whose value is stored in a Kubernetes Secret.
Expand All @@ -25,24 +23,76 @@ type SensitiveHeader struct {
SecretKeyRef SecretKeyRef `json:"secretKeyRef"`
}

// MCPTelemetryConfigSpec defines the desired state of MCPTelemetryConfig.
// It embeds telemetry.Config from pkg/telemetry to eliminate the conversion
// layer between CRD and application types. The environmentVariables field is
// CLI-only and rejected by CEL validation; customAttributes is allowed for
// setting shared OTel resource attributes (e.g., deployment.environment).
// MCPTelemetryOTelConfig defines OpenTelemetry configuration for shared MCPTelemetryConfig resources.
// Unlike OpenTelemetryConfig (used by inline MCPServer telemetry), this type:
// - Omits ServiceName (per-server field set via MCPTelemetryConfigReference)
// - Uses map[string]string for Headers (not []string)
// - Adds SensitiveHeaders for Kubernetes Secret-backed credentials
// - Adds ResourceAttributes for shared OTel resource attributes
//
// +kubebuilder:validation:XValidation:rule="!has(self.environmentVariables)",message="environmentVariables is a CLI-only field and cannot be set in MCPTelemetryConfig; use customAttributes for resource attributes"
// +kubebuilder:validation:XValidation:rule="!has(self.headers) || !has(self.sensitiveHeaders) || self.sensitiveHeaders.all(sh, !(sh.name in self.headers))",message="a header name cannot appear in both headers and sensitiveHeaders"
//
//nolint:lll // CEL validation rules exceed line length limit
type MCPTelemetryConfigSpec struct {
telemetry.Config `json:",inline"` // nolint:revive
type MCPTelemetryOTelConfig struct {
// Enabled controls whether OpenTelemetry is enabled
// +kubebuilder:default=false
// +optional
Enabled bool `json:"enabled,omitempty"`

// Endpoint is the OTLP endpoint URL for tracing and metrics
// +optional
Endpoint string `json:"endpoint,omitempty"`

// Insecure indicates whether to use HTTP instead of HTTPS for the OTLP endpoint
// +kubebuilder:default=false
// +optional
Insecure bool `json:"insecure,omitempty"`

// Headers contains authentication headers for the OTLP endpoint.
// For secret-backed credentials, use sensitiveHeaders instead.
// +optional
Headers map[string]string `json:"headers,omitempty"`

// SensitiveHeaders contains headers whose values are stored in Kubernetes Secrets.
// Use this for credential headers (e.g., API keys, bearer tokens) instead of
// embedding secrets in the headers field.
// +optional
SensitiveHeaders []SensitiveHeader `json:"sensitiveHeaders,omitempty"`

// ResourceAttributes contains custom resource attributes to be added to all telemetry signals.
// These become OTel resource attributes (e.g., deployment.environment, service.namespace).
// Note: service.name is intentionally excluded — it is set per-server via
// MCPTelemetryConfigReference.ServiceName.
// +optional
ResourceAttributes map[string]string `json:"resourceAttributes,omitempty"`

// Metrics defines OpenTelemetry metrics-specific configuration
// +optional
Metrics *OpenTelemetryMetricsConfig `json:"metrics,omitempty"`

// Tracing defines OpenTelemetry tracing configuration
// +optional
Tracing *OpenTelemetryTracingConfig `json:"tracing,omitempty"`

// UseLegacyAttributes controls whether legacy attribute names are emitted alongside
// the new MCP OTEL semantic convention names. Defaults to true for backward compatibility.
// This will change to false in a future release and eventually be removed.
// +kubebuilder:default=true
// +optional
UseLegacyAttributes bool `json:"useLegacyAttributes"`
}

// MCPTelemetryConfigSpec defines the desired state of MCPTelemetryConfig.
// The spec uses a nested structure with openTelemetry and prometheus sub-objects
// for clear separation of concerns.
type MCPTelemetryConfigSpec struct {
// OpenTelemetry defines OpenTelemetry configuration (OTLP endpoint, tracing, metrics)
// +optional
OpenTelemetry *MCPTelemetryOTelConfig `json:"openTelemetry,omitempty"`

// Prometheus defines Prometheus-specific configuration
// +optional
Prometheus *PrometheusConfig `json:"prometheus,omitempty"`
}

// MCPTelemetryConfigStatus defines the observed state of MCPTelemetryConfig
Expand All @@ -59,17 +109,18 @@ type MCPTelemetryConfigStatus struct {
// +optional
ConfigHash string `json:"configHash,omitempty"`

// ReferencingServers is a list of MCPServer resources that reference this MCPTelemetryConfig
// ReferencingWorkloads lists workloads that reference this MCPTelemetryConfig
// +optional
ReferencingServers []string `json:"referencingServers,omitempty"`
ReferencingWorkloads []WorkloadReference `json:"referencingWorkloads,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=mcpotel,categories=toolhive
// +kubebuilder:printcolumn:name="Endpoint",type=string,JSONPath=`.spec.endpoint`
// +kubebuilder:printcolumn:name="Endpoint",type=string,JSONPath=`.spec.openTelemetry.endpoint`
// +kubebuilder:printcolumn:name="Ready",type=string,JSONPath=`.status.conditions[?(@.type=='Valid')].status`
// +kubebuilder:printcolumn:name="References",type=string,JSONPath=`.status.referencingServers`
// +kubebuilder:printcolumn:name="Tracing",type=boolean,JSONPath=`.spec.openTelemetry.tracing.enabled`
// +kubebuilder:printcolumn:name="Metrics",type=boolean,JSONPath=`.spec.openTelemetry.metrics.enabled`
// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp`

// MCPTelemetryConfig is the Schema for the mcptelemetryconfigs API.
Expand Down Expand Up @@ -115,33 +166,47 @@ type MCPTelemetryConfigReference struct {
// CEL catches issues at API admission time, but this method also validates
// stored objects to catch any that bypassed CEL or were stored before CEL rules were added.
func (r *MCPTelemetryConfig) Validate() error {
if err := r.validateCLIOnlyFields(); err != nil {
if err := r.validateEndpointRequiresSignals(); err != nil {
return err
}
return r.validateSensitiveHeaders()
}

// validateCLIOnlyFields rejects CLI-only fields that are not applicable to CRD-managed telemetry.
func (r *MCPTelemetryConfig) validateCLIOnlyFields() error {
if len(r.Spec.EnvironmentVariables) > 0 {
return fmt.Errorf("environmentVariables is a CLI-only field and cannot be set in MCPTelemetryConfig")
// validateEndpointRequiresSignals rejects an endpoint when neither tracing nor metrics is enabled.
// Without this check the config would pass CRD validation but fail at runtime in telemetry.NewProvider.
func (r *MCPTelemetryConfig) validateEndpointRequiresSignals() error {
if r.Spec.OpenTelemetry == nil {
return nil
}
otel := r.Spec.OpenTelemetry
if otel.Endpoint == "" {
return nil
}
tracingEnabled := otel.Tracing != nil && otel.Tracing.Enabled
metricsEnabled := otel.Metrics != nil && otel.Metrics.Enabled
if !tracingEnabled && !metricsEnabled {
return fmt.Errorf("endpoint requires at least one of tracing or metrics to be enabled")
}
return nil
}

// validateSensitiveHeaders validates sensitive header entries and checks for overlap with plaintext headers.
func (r *MCPTelemetryConfig) validateSensitiveHeaders() error {
for i, sh := range r.Spec.SensitiveHeaders {
if r.Spec.OpenTelemetry == nil {
return nil
}
otel := r.Spec.OpenTelemetry
for i, sh := range otel.SensitiveHeaders {
if sh.Name == "" {
return fmt.Errorf("sensitiveHeaders[%d].name must not be empty", i)
return fmt.Errorf("openTelemetry.sensitiveHeaders[%d].name must not be empty", i)
}
if sh.SecretKeyRef.Name == "" {
return fmt.Errorf("sensitiveHeaders[%d].secretKeyRef.name must not be empty", i)
return fmt.Errorf("openTelemetry.sensitiveHeaders[%d].secretKeyRef.name must not be empty", i)
}
if sh.SecretKeyRef.Key == "" {
return fmt.Errorf("sensitiveHeaders[%d].secretKeyRef.key must not be empty", i)
return fmt.Errorf("openTelemetry.sensitiveHeaders[%d].secretKeyRef.key must not be empty", i)
}
if _, exists := r.Spec.Headers[sh.Name]; exists {
if _, exists := otel.Headers[sh.Name]; exists {
return fmt.Errorf("header %q appears in both headers and sensitiveHeaders", sh.Name)
}
}
Expand Down
64 changes: 56 additions & 8 deletions cmd/thv-operator/api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 39 additions & 18 deletions cmd/thv-operator/controllers/mcptelemetryconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"slices"
"sort"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -108,16 +107,16 @@ func (r *MCPTelemetryConfigReconciler) Reconcile(ctx context.Context, req ctrl.R
// Calculate the hash of the current configuration
configHash := r.calculateConfigHash(telemetryConfig.Spec)

// Track referencing MCPServers
referencingServers, err := r.findReferencingServers(ctx, telemetryConfig)
// Track referencing workloads
referencingWorkloads, err := r.findReferencingWorkloads(ctx, telemetryConfig)
if err != nil {
logger.Error(err, "Failed to find referencing MCPServers")
logger.Error(err, "Failed to find referencing workloads")
return ctrl.Result{}, err
}

// Check what changed
hashChanged := telemetryConfig.Status.ConfigHash != configHash
refsChanged := !slices.Equal(telemetryConfig.Status.ReferencingServers, referencingServers)
refsChanged := !workloadRefsEqual(telemetryConfig.Status.ReferencingWorkloads, referencingWorkloads)
needsUpdate := hashChanged || refsChanged || conditionChanged

if hashChanged {
Expand All @@ -129,7 +128,7 @@ func (r *MCPTelemetryConfigReconciler) Reconcile(ctx context.Context, req ctrl.R
if needsUpdate {
telemetryConfig.Status.ConfigHash = configHash
telemetryConfig.Status.ObservedGeneration = telemetryConfig.Generation
telemetryConfig.Status.ReferencingServers = referencingServers
telemetryConfig.Status.ReferencingWorkloads = referencingWorkloads

if err := r.Status().Update(ctx, telemetryConfig); err != nil {
logger.Error(err, "Failed to update MCPTelemetryConfig status")
Expand All @@ -142,7 +141,7 @@ func (r *MCPTelemetryConfigReconciler) Reconcile(ctx context.Context, req ctrl.R

// SetupWithManager sets up the controller with the Manager.
func (r *MCPTelemetryConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Watch MCPServer changes to update ReferencingServers status
// Watch MCPServer changes to update ReferencingWorkloads status
mcpServerHandler := handler.EnqueueRequestsFromMapFunc(
func(_ context.Context, obj client.Object) []reconcile.Request {
mcpServer, ok := obj.(*mcpv1alpha1.MCPServer)
Expand Down Expand Up @@ -187,15 +186,19 @@ func (r *MCPTelemetryConfigReconciler) handleDeletion(
return ctrl.Result{}, nil
}

// Check for referencing servers before allowing deletion
referencingServers, err := r.findReferencingServers(ctx, telemetryConfig)
// Check for referencing workloads before allowing deletion
referencingWorkloads, err := r.findReferencingWorkloads(ctx, telemetryConfig)
if err != nil {
logger.Error(err, "Failed to check referencing servers during deletion")
logger.Error(err, "Failed to check referencing workloads during deletion")
return ctrl.Result{}, err
}

if len(referencingServers) > 0 {
msg := fmt.Sprintf("cannot delete: still referenced by MCPServer(s): %v", referencingServers)
if len(referencingWorkloads) > 0 {
names := make([]string, 0, len(referencingWorkloads))
for _, ref := range referencingWorkloads {
names = append(names, fmt.Sprintf("%s/%s", ref.Kind, ref.Name))
}
msg := fmt.Sprintf("cannot delete: still referenced by MCPServer(s): %v", names)
logger.Info(msg, "telemetryConfig", telemetryConfig.Name)
meta.SetStatusCondition(&telemetryConfig.Status.Conditions, metav1.Condition{
Type: "DeletionBlocked",
Expand All @@ -220,24 +223,42 @@ func (r *MCPTelemetryConfigReconciler) handleDeletion(
return ctrl.Result{}, nil
}

// findReferencingServers returns a sorted list of MCPServer names in the same namespace
// findReferencingWorkloads returns a sorted list of workload references in the same namespace
// that reference this MCPTelemetryConfig via TelemetryConfigRef.
func (r *MCPTelemetryConfigReconciler) findReferencingServers(
func (r *MCPTelemetryConfigReconciler) findReferencingWorkloads(
ctx context.Context,
telemetryConfig *mcpv1alpha1.MCPTelemetryConfig,
) ([]string, error) {
) ([]mcpv1alpha1.WorkloadReference, error) {
mcpServerList := &mcpv1alpha1.MCPServerList{}
if err := r.List(ctx, mcpServerList, client.InNamespace(telemetryConfig.Namespace)); err != nil {
return nil, fmt.Errorf("failed to list MCPServers: %w", err)
}

var refs []string
var refs []mcpv1alpha1.WorkloadReference
for _, server := range mcpServerList.Items {
if server.Spec.TelemetryConfigRef != nil &&
server.Spec.TelemetryConfigRef.Name == telemetryConfig.Name {
refs = append(refs, server.Name)
refs = append(refs, mcpv1alpha1.WorkloadReference{
Kind: "MCPServer",
Name: server.Name,
})
}
}
sort.Strings(refs)
slices.SortFunc(refs, func(a, b mcpv1alpha1.WorkloadReference) int {
if a.Name < b.Name {
return -1
}
if a.Name > b.Name {
return 1
}
return 0
})
return refs, nil
}

// workloadRefsEqual compares two WorkloadReference slices for equality.
func workloadRefsEqual(a, b []mcpv1alpha1.WorkloadReference) bool {
return slices.EqualFunc(a, b, func(x, y mcpv1alpha1.WorkloadReference) bool {
return x.Kind == y.Kind && x.Name == y.Name
})
}
Loading
Loading