diff --git a/pkg/kubernetes/cephblockpool.go b/pkg/kubernetes/cephblockpool.go new file mode 100644 index 0000000..1d112e5 --- /dev/null +++ b/pkg/kubernetes/cephblockpool.go @@ -0,0 +1,218 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" +) + +// CephBlockPoolGVR is the GroupVersionResource of Rook's CephBlockPool. +var CephBlockPoolGVR = schema.GroupVersionResource{ + Group: "ceph.rook.io", + Version: "v1", + Resource: "cephblockpools", +} + +// CephBlockPoolConfig describes a minimal replicated or erasure-coded Ceph +// RBD pool managed by Rook. Exactly one of ReplicaSize or ErasureCoded must +// be set; leaving both zero defaults to a single-replica pool suitable for +// single-node test clusters. +type CephBlockPoolConfig struct { + // Name of the CephBlockPool CR (also becomes the Ceph pool name). + Name string + + // Namespace the Rook operator watches (typically "d8-sds-elastic"). + Namespace string + + // FailureDomain is the CRUSH failure domain: "host" or "osd" (default: "host"). + FailureDomain string + + // --- Replicated pool knobs (used when ErasureCoded is nil) --- + + // ReplicaSize is the number of object copies. Default: 1. + ReplicaSize int + + // RequireSafeReplicaSize toggles Ceph's safeguard against single-replica + // pools. When nil, it is set to `false` for ReplicaSize==1 (unsafe single + // replica, accepted for e2e test clusters) and left unset otherwise. + RequireSafeReplicaSize *bool + + // --- Erasure-coded pool knobs --- + + // ErasureCoded, when non-nil, produces an EC pool instead of a replicated + // one. Its fields map to `spec.erasureCoded.{dataChunks,codingChunks}`. + ErasureCoded *CephBlockPoolErasureCoded +} + +// CephBlockPoolErasureCoded configures a Ceph erasure-coded RBD pool. +type CephBlockPoolErasureCoded struct { + DataChunks int + CodingChunks int +} + +// CreateCephBlockPool creates (or updates, if already present) a CephBlockPool +// in the given namespace from the provided configuration. It is idempotent and +// safe to call on every test run. +func CreateCephBlockPool(ctx context.Context, kubeconfig *rest.Config, cfg CephBlockPoolConfig) error { + if cfg.Name == "" { + return fmt.Errorf("CephBlockPool name is required") + } + if cfg.Namespace == "" { + return fmt.Errorf("CephBlockPool namespace is required") + } + if cfg.ErasureCoded == nil && cfg.ReplicaSize <= 0 { + cfg.ReplicaSize = 1 + } + if cfg.FailureDomain == "" { + cfg.FailureDomain = "host" + } + + spec := map[string]interface{}{ + "failureDomain": cfg.FailureDomain, + } + + if cfg.ErasureCoded != nil { + if cfg.ErasureCoded.DataChunks <= 0 || cfg.ErasureCoded.CodingChunks <= 0 { + return fmt.Errorf("ErasureCoded pool requires positive dataChunks and codingChunks") + } + spec["erasureCoded"] = map[string]interface{}{ + "dataChunks": int64(cfg.ErasureCoded.DataChunks), + "codingChunks": int64(cfg.ErasureCoded.CodingChunks), + } + } else { + replicated := map[string]interface{}{ + "size": int64(cfg.ReplicaSize), + } + requireSafe := cfg.RequireSafeReplicaSize + if requireSafe == nil && cfg.ReplicaSize == 1 { + f := false + requireSafe = &f + } + if requireSafe != nil { + replicated["requireSafeReplicaSize"] = *requireSafe + } + spec["replicated"] = replicated + } + + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "ceph.rook.io/v1", + "kind": "CephBlockPool", + "metadata": map[string]interface{}{ + "name": cfg.Name, + "namespace": cfg.Namespace, + }, + "spec": spec, + }, + } + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + logger.Info("Creating CephBlockPool %s/%s", cfg.Namespace, cfg.Name) + _, err = dynamicClient.Resource(CephBlockPoolGVR).Namespace(cfg.Namespace).Create(ctx, obj, metav1.CreateOptions{}) + if err == nil { + logger.Success("CephBlockPool %s/%s created", cfg.Namespace, cfg.Name) + return nil + } + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create CephBlockPool %s/%s: %w", cfg.Namespace, cfg.Name, err) + } + + logger.Info("CephBlockPool %s/%s already exists, updating spec", cfg.Namespace, cfg.Name) + existing, err := dynamicClient.Resource(CephBlockPoolGVR).Namespace(cfg.Namespace).Get(ctx, cfg.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch existing CephBlockPool %s/%s: %w", cfg.Namespace, cfg.Name, err) + } + existing.Object["spec"] = spec + if _, err := dynamicClient.Resource(CephBlockPoolGVR).Namespace(cfg.Namespace).Update(ctx, existing, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update CephBlockPool %s/%s: %w", cfg.Namespace, cfg.Name, err) + } + return nil +} + +// WaitForCephBlockPoolReady blocks until the CephBlockPool reports +// `status.phase == "Ready"`. Rook transitions the pool from Progressing to +// Ready once the Ceph OSDs have accepted the new pool and its CRUSH rule. +func WaitForCephBlockPoolReady(ctx context.Context, kubeconfig *rest.Config, namespace, name string, timeout time.Duration) error { + if namespace == "" || name == "" { + return fmt.Errorf("namespace and name are required") + } + + logger.Debug("Waiting for CephBlockPool %s/%s to become Ready (timeout: %v)", namespace, name, timeout) + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + obj, err := dynamicClient.Resource(CephBlockPoolGVR).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) + if err == nil { + phase, _, _ := unstructured.NestedString(obj.Object, "status", "phase") + if phase == "Ready" { + logger.Success("CephBlockPool %s/%s is Ready", namespace, name) + return nil + } + logger.Debug("CephBlockPool %s/%s phase: %q, waiting...", namespace, name, phase) + } else if !apierrors.IsNotFound(err) { + logger.Debug("Error getting CephBlockPool %s/%s: %v", namespace, name, err) + } + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for CephBlockPool %s/%s: %w", namespace, name, ctx.Err()) + case <-ticker.C: + } + } +} + +// DeleteCephBlockPool deletes a CephBlockPool. Safe to call if the pool does +// not exist. +func DeleteCephBlockPool(ctx context.Context, kubeconfig *rest.Config, namespace, name string) error { + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + if err := dynamicClient.Resource(CephBlockPoolGVR).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to delete CephBlockPool %s/%s: %w", namespace, name, err) + } + logger.Info("Deleted CephBlockPool %s/%s", namespace, name) + return nil +} diff --git a/pkg/kubernetes/cephcluster.go b/pkg/kubernetes/cephcluster.go new file mode 100644 index 0000000..d6a1ad8 --- /dev/null +++ b/pkg/kubernetes/cephcluster.go @@ -0,0 +1,395 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" +) + +// CephClusterGVR is the GroupVersionResource of Rook's CephCluster. +var CephClusterGVR = schema.GroupVersionResource{ + Group: "ceph.rook.io", + Version: "v1", + Resource: "cephclusters", +} + +// Defaults shared between CephClusterConfig and the testkit-level helper. +const ( + DefaultRookNamespace = "d8-sds-elastic" + DefaultCephClusterName = "ceph-cluster" + DefaultCephImage = "quay.io/ceph/ceph:v18.2.7" + DefaultDataDirHostPath = "/var/lib/rook" + DefaultOSDStorageClassSize = "20Gi" +) + +// CephClusterConfig describes a Rook-managed Ceph cluster suitable for e2e +// testing. It is intentionally narrower than Rook's native CephCluster CRD: +// knobs that don't matter for our scenarios are hidden behind hard-coded +// defaults (mirroring the values from the internal Flant wiki instruction +// on deploying sds-elastic + Rook + Ceph on LVM). +type CephClusterConfig struct { + // Name of the CephCluster (default: "ceph-cluster"). + Name string + + // Namespace where Rook watches (default: "d8-sds-elastic"). + Namespace string + + // CephImage is the Ceph container image tag. + // Default: "quay.io/ceph/ceph:v18.2.7". + CephImage string + + // AllowUnsupportedCephVersion flips spec.cephVersion.allowUnsupported. + // Default: true (e2e clusters are allowed to run any version Ceph ships). + AllowUnsupportedCephVersion *bool + + // MonCount / MgrCount are the Rook mon/mgr replica counts. Defaults: + // 1 / 1, which is appropriate for single-node / tiny test clusters. + MonCount int + MgrCount int + + // AllowMultipleMonPerNode allows multiple mons on the same node + // (required for single-node clusters). Default: true. + AllowMultipleMonPerNode *bool + + // DataDirHostPath is where Rook persists mon/OSD data on each node. + // Default: "/var/lib/rook". + DataDirHostPath string + + // NetworkProvider selects the Rook networking mode. Supported values: + // "" — default CNI pod network (suitable for in-cluster e2e); + // "host" — host networking (matches the Flant wiki production layout). + NetworkProvider string + + // PublicNetworkCIDRs / ClusterNetworkCIDRs are the public/cluster CIDRs + // plumbed into `spec.network.addressRanges` when NetworkProvider is + // non-empty. They are ignored for the default (CNI) mode. + PublicNetworkCIDRs []string + ClusterNetworkCIDRs []string + + // --- OSD backing --- + + // OSDStorageClass is the name of a k8s StorageClass able to hand out + // block-mode PVCs. Those PVCs are used by Rook's + // `storage.storageClassDeviceSets` to back OSDs. + OSDStorageClass string + + // OSDCount is the number of OSDs to provision (default: 1). + OSDCount int + + // OSDSize is the size of each OSD PVC (default: "20Gi"). + OSDSize string + + // OSDDeviceSetName is the `storageClassDeviceSets[].name` (default: + // "set1"). Changing it is useful mostly for debugging. + OSDDeviceSetName string +} + +func (c *CephClusterConfig) applyDefaults() { + if c.Name == "" { + c.Name = DefaultCephClusterName + } + if c.Namespace == "" { + c.Namespace = DefaultRookNamespace + } + if c.CephImage == "" { + c.CephImage = DefaultCephImage + } + if c.AllowUnsupportedCephVersion == nil { + t := true + c.AllowUnsupportedCephVersion = &t + } + if c.MonCount <= 0 { + c.MonCount = 1 + } + if c.MgrCount <= 0 { + c.MgrCount = 1 + } + if c.AllowMultipleMonPerNode == nil { + t := true + c.AllowMultipleMonPerNode = &t + } + if c.DataDirHostPath == "" { + c.DataDirHostPath = DefaultDataDirHostPath + } + if c.OSDCount <= 0 { + c.OSDCount = 1 + } + if c.OSDSize == "" { + c.OSDSize = DefaultOSDStorageClassSize + } + if c.OSDDeviceSetName == "" { + c.OSDDeviceSetName = "set1" + } +} + +// CreateCephCluster creates (or updates) a CephCluster in the given namespace. +// It is idempotent: if the resource already exists, its spec is overwritten +// with the freshly-rendered one so callers can tweak `CephClusterConfig` and +// re-apply without manual cleanup. +func CreateCephCluster(ctx context.Context, kubeconfig *rest.Config, cfg CephClusterConfig) error { + cfg.applyDefaults() + + if cfg.OSDStorageClass == "" { + return fmt.Errorf("CephCluster requires OSDStorageClass (backing StorageClass for OSD PVCs)") + } + + spec := buildCephClusterSpec(cfg) + + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "ceph.rook.io/v1", + "kind": "CephCluster", + "metadata": map[string]interface{}{ + "name": cfg.Name, + "namespace": cfg.Namespace, + }, + "spec": spec, + }, + } + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + logger.Info("Creating CephCluster %s/%s (image=%s, mon=%d, mgr=%d, osd=%d x %s on SC %s)", + cfg.Namespace, cfg.Name, cfg.CephImage, cfg.MonCount, cfg.MgrCount, cfg.OSDCount, cfg.OSDSize, cfg.OSDStorageClass) + + _, err = dynamicClient.Resource(CephClusterGVR).Namespace(cfg.Namespace).Create(ctx, obj, metav1.CreateOptions{}) + if err == nil { + logger.Success("CephCluster %s/%s created", cfg.Namespace, cfg.Name) + return nil + } + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create CephCluster %s/%s: %w", cfg.Namespace, cfg.Name, err) + } + + logger.Info("CephCluster %s/%s already exists, updating spec", cfg.Namespace, cfg.Name) + existing, err := dynamicClient.Resource(CephClusterGVR).Namespace(cfg.Namespace).Get(ctx, cfg.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch existing CephCluster %s/%s: %w", cfg.Namespace, cfg.Name, err) + } + existing.Object["spec"] = spec + if _, err := dynamicClient.Resource(CephClusterGVR).Namespace(cfg.Namespace).Update(ctx, existing, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update CephCluster %s/%s: %w", cfg.Namespace, cfg.Name, err) + } + return nil +} + +// buildCephClusterSpec renders the spec portion of a CephCluster object. The +// choice of fields follows the Flant internal wiki instruction for +// sds-elastic + Rook + Ceph, stripped down to the parts that matter in e2e: +// - mon/mgr counts come from the config (1/1 by default for single-node); +// - network.provider=host is opt-in via NetworkProvider; +// - OSDs are backed by one `storageClassDeviceSets[0]` entry that points +// to a user-supplied StorageClass capable of issuing block-mode PVCs. +func buildCephClusterSpec(cfg CephClusterConfig) map[string]interface{} { + spec := map[string]interface{}{ + "cephVersion": map[string]interface{}{ + "image": cfg.CephImage, + "allowUnsupported": *cfg.AllowUnsupportedCephVersion, + }, + "dataDirHostPath": cfg.DataDirHostPath, + "skipUpgradeChecks": false, + "continueUpgradeAfterChecksEvenIfNotHealthy": false, + "mon": map[string]interface{}{ + "count": int64(cfg.MonCount), + "allowMultiplePerNode": *cfg.AllowMultipleMonPerNode, + }, + "mgr": map[string]interface{}{ + "count": int64(cfg.MgrCount), + "allowMultiplePerNode": *cfg.AllowMultipleMonPerNode, + "modules": []interface{}{ + map[string]interface{}{ + "name": "pg_autoscaler", + "enabled": true, + }, + }, + }, + "dashboard": map[string]interface{}{ + "enabled": false, + "ssl": false, + }, + "crashCollector": map[string]interface{}{ + "disable": false, + }, + "logCollector": map[string]interface{}{ + "enabled": true, + "periodicity": "daily", + "maxLogSize": "100M", + }, + "priorityClassNames": map[string]interface{}{ + "mon": "system-node-critical", + "osd": "system-node-critical", + "mgr": "system-cluster-critical", + }, + "disruptionManagement": map[string]interface{}{ + "managePodBudgets": true, + "osdMaintenanceTimeout": int64(30), + "pgHealthCheckTimeout": int64(0), + }, + "storage": map[string]interface{}{ + "useAllNodes": true, + "useAllDevices": false, + "storageClassDeviceSets": []interface{}{ + map[string]interface{}{ + "name": cfg.OSDDeviceSetName, + "count": int64(cfg.OSDCount), + "portable": false, + "tuneDeviceClass": true, + "volumeClaimTemplates": []interface{}{ + map[string]interface{}{ + "metadata": map[string]interface{}{ + "name": "data", + }, + "spec": map[string]interface{}{ + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "storage": cfg.OSDSize, + }, + }, + "storageClassName": cfg.OSDStorageClass, + "volumeMode": "Block", + "accessModes": []interface{}{"ReadWriteOnce"}, + }, + }, + }, + }, + }, + }, + } + + if cfg.NetworkProvider != "" { + network := map[string]interface{}{ + "provider": cfg.NetworkProvider, + "connections": map[string]interface{}{ + "encryption": map[string]interface{}{"enabled": false}, + "compression": map[string]interface{}{"enabled": false}, + "requireMsgr2": false, + }, + } + + addrs := map[string]interface{}{} + if len(cfg.PublicNetworkCIDRs) > 0 { + addrs["public"] = toInterfaceSlice(cfg.PublicNetworkCIDRs) + } + if len(cfg.ClusterNetworkCIDRs) > 0 { + addrs["cluster"] = toInterfaceSlice(cfg.ClusterNetworkCIDRs) + } + if len(addrs) > 0 { + network["addressRanges"] = addrs + } + spec["network"] = network + } + + return spec +} + +// toInterfaceSlice converts a []string to a []interface{} so it can be +// embedded into an `unstructured.Unstructured`'s object tree. +func toInterfaceSlice(in []string) []interface{} { + out := make([]interface{}, len(in)) + for i, v := range in { + out[i] = v + } + return out +} + +// WaitForCephClusterReady blocks until the CephCluster status reports that +// Ceph is up and healthy. Rook exposes the cluster state through two status +// fields: +// - `status.state` — overall lifecycle phase ("Creating", "Created", +// "Updating", "Error"); +// - `status.ceph.health` — the Ceph health summary ("HEALTH_OK", +// "HEALTH_WARN", "HEALTH_ERR"). On a single-OSD test cluster Ceph often +// sits in HEALTH_WARN (PGs undersized, no replicas), which we still treat +// as "good enough" as long as `status.state == "Created"`. +// +// We return success once `state == "Created"`. HEALTH_ERR is reported in the +// log and does not short-circuit (Rook may recover). +func WaitForCephClusterReady(ctx context.Context, kubeconfig *rest.Config, namespace, name string, timeout time.Duration) error { + if namespace == "" || name == "" { + return fmt.Errorf("namespace and name are required") + } + + logger.Debug("Waiting for CephCluster %s/%s to reach Created state (timeout: %v)", namespace, name, timeout) + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + obj, err := dynamicClient.Resource(CephClusterGVR).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) + if err == nil { + state, _, _ := unstructured.NestedString(obj.Object, "status", "state") + health, _, _ := unstructured.NestedString(obj.Object, "status", "ceph", "health") + phase, _, _ := unstructured.NestedString(obj.Object, "status", "phase") + + if state == "Created" || phase == "Ready" { + logger.Success("CephCluster %s/%s is Created (ceph health: %s)", namespace, name, health) + return nil + } + logger.Debug("CephCluster %s/%s state=%q phase=%q health=%q", namespace, name, state, phase, health) + } else if !apierrors.IsNotFound(err) { + logger.Debug("Error getting CephCluster %s/%s: %v", namespace, name, err) + } + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for CephCluster %s/%s: %w", namespace, name, ctx.Err()) + case <-ticker.C: + } + } +} + +// DeleteCephCluster removes a CephCluster. Tearing down the cluster this way +// is a *destructive* operation — Rook will leave OSD data on host disks under +// `dataDirHostPath` and operator-managed PVCs will not be garbage-collected +// automatically. The operation is still idempotent: a NotFound error is +// swallowed. +func DeleteCephCluster(ctx context.Context, kubeconfig *rest.Config, namespace, name string) error { + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + if err := dynamicClient.Resource(CephClusterGVR).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to delete CephCluster %s/%s: %w", namespace, name, err) + } + logger.Info("Deleted CephCluster %s/%s", namespace, name) + return nil +} diff --git a/pkg/kubernetes/cephclusterconnection.go b/pkg/kubernetes/cephclusterconnection.go new file mode 100644 index 0000000..3110cfb --- /dev/null +++ b/pkg/kubernetes/cephclusterconnection.go @@ -0,0 +1,273 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" +) + +// GVRs of the csi-ceph cluster-scoped CRs. We use unstructured to avoid +// pulling github.com/deckhouse/csi-ceph/api into go.mod just for these +// tiny types. +var ( + CephClusterConnectionGVR = schema.GroupVersionResource{ + Group: "storage.deckhouse.io", + Version: "v1alpha1", + Resource: "cephclusterconnections", + } + CephClusterAuthenticationGVR = schema.GroupVersionResource{ + Group: "storage.deckhouse.io", + Version: "v1alpha1", + Resource: "cephclusterauthentications", + } +) + +// CephClusterAuthenticationConfig describes CephX credentials that csi-ceph +// reuses for every StorageClass that references the authentication. +type CephClusterAuthenticationConfig struct { + // Name of the CephClusterAuthentication CR. + Name string + // UserID is the Ceph user (typically "admin"). + UserID string + // UserKey is the CephX key of UserID. + UserKey string +} + +// CreateCephClusterAuthentication creates (or updates) a +// CephClusterAuthentication CR with the given CephX credentials. +func CreateCephClusterAuthentication(ctx context.Context, kubeconfig *rest.Config, cfg CephClusterAuthenticationConfig) error { + if cfg.Name == "" { + return fmt.Errorf("CephClusterAuthentication name is required") + } + if cfg.UserID == "" { + return fmt.Errorf("CephClusterAuthentication UserID is required") + } + if cfg.UserKey == "" { + return fmt.Errorf("CephClusterAuthentication UserKey is required") + } + + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "storage.deckhouse.io/v1alpha1", + "kind": "CephClusterAuthentication", + "metadata": map[string]interface{}{ + "name": cfg.Name, + }, + "spec": map[string]interface{}{ + "userID": cfg.UserID, + "userKey": cfg.UserKey, + }, + }, + } + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + logger.Info("Creating CephClusterAuthentication %s (userID=%s)", cfg.Name, cfg.UserID) + _, err = dynamicClient.Resource(CephClusterAuthenticationGVR).Create(ctx, obj, metav1.CreateOptions{}) + if err == nil { + return nil + } + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create CephClusterAuthentication %s: %w", cfg.Name, err) + } + + logger.Info("CephClusterAuthentication %s already exists, updating spec", cfg.Name) + existing, err := dynamicClient.Resource(CephClusterAuthenticationGVR).Get(ctx, cfg.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch CephClusterAuthentication %s: %w", cfg.Name, err) + } + existing.Object["spec"] = obj.Object["spec"] + if _, err := dynamicClient.Resource(CephClusterAuthenticationGVR).Update(ctx, existing, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update CephClusterAuthentication %s: %w", cfg.Name, err) + } + return nil +} + +// DeleteCephClusterAuthentication removes a CephClusterAuthentication. +// NotFound is treated as success. +func DeleteCephClusterAuthentication(ctx context.Context, kubeconfig *rest.Config, name string) error { + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + if err := dynamicClient.Resource(CephClusterAuthenticationGVR).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to delete CephClusterAuthentication %s: %w", name, err) + } + logger.Info("Deleted CephClusterAuthentication %s", name) + return nil +} + +// CephClusterConnectionConfig describes a csi-ceph CephClusterConnection CR. +// Its spec.clusterID (== Ceph fsid) is immutable once created. +type CephClusterConnectionConfig struct { + // Name of the CephClusterConnection CR. + Name string + // ClusterID is the Ceph fsid. Immutable after creation. + ClusterID string + // Monitors is the list of `ip:port` monitor endpoints. + Monitors []string + // UserID is the Ceph user (typically "admin"). + UserID string + // UserKey is the CephX key of UserID. + UserKey string +} + +// CreateCephClusterConnection creates (or updates) a CephClusterConnection CR. +// If the resource already exists we do *not* attempt to update spec.clusterID +// (which the CRD marks immutable) — only Monitors/UserID/UserKey are synced. +func CreateCephClusterConnection(ctx context.Context, kubeconfig *rest.Config, cfg CephClusterConnectionConfig) error { + if cfg.Name == "" { + return fmt.Errorf("CephClusterConnection name is required") + } + if cfg.ClusterID == "" { + return fmt.Errorf("CephClusterConnection ClusterID (fsid) is required") + } + if len(cfg.Monitors) == 0 { + return fmt.Errorf("CephClusterConnection Monitors is required") + } + + monitors := make([]interface{}, len(cfg.Monitors)) + for i, m := range cfg.Monitors { + monitors[i] = m + } + + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "storage.deckhouse.io/v1alpha1", + "kind": "CephClusterConnection", + "metadata": map[string]interface{}{ + "name": cfg.Name, + }, + "spec": map[string]interface{}{ + "clusterID": cfg.ClusterID, + "monitors": monitors, + "userID": cfg.UserID, + "userKey": cfg.UserKey, + }, + }, + } + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + logger.Info("Creating CephClusterConnection %s (clusterID=%s, mons=%d)", cfg.Name, cfg.ClusterID, len(cfg.Monitors)) + _, err = dynamicClient.Resource(CephClusterConnectionGVR).Create(ctx, obj, metav1.CreateOptions{}) + if err == nil { + return nil + } + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create CephClusterConnection %s: %w", cfg.Name, err) + } + + logger.Info("CephClusterConnection %s already exists, syncing monitors/userID/userKey", cfg.Name) + existing, err := dynamicClient.Resource(CephClusterConnectionGVR).Get(ctx, cfg.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch CephClusterConnection %s: %w", cfg.Name, err) + } + if err := unstructured.SetNestedSlice(existing.Object, monitors, "spec", "monitors"); err != nil { + return fmt.Errorf("set monitors: %w", err) + } + if err := unstructured.SetNestedField(existing.Object, cfg.UserID, "spec", "userID"); err != nil { + return fmt.Errorf("set userID: %w", err) + } + if err := unstructured.SetNestedField(existing.Object, cfg.UserKey, "spec", "userKey"); err != nil { + return fmt.Errorf("set userKey: %w", err) + } + if _, err := dynamicClient.Resource(CephClusterConnectionGVR).Update(ctx, existing, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update CephClusterConnection %s: %w", cfg.Name, err) + } + return nil +} + +// DeleteCephClusterConnection removes a CephClusterConnection. +// NotFound is treated as success. +func DeleteCephClusterConnection(ctx context.Context, kubeconfig *rest.Config, name string) error { + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + if err := dynamicClient.Resource(CephClusterConnectionGVR).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to delete CephClusterConnection %s: %w", name, err) + } + logger.Info("Deleted CephClusterConnection %s", name) + return nil +} + +// WaitForCephClusterConnectionCreated polls until the CephClusterConnection +// status reports phase=Created. csi-ceph's controller flips the status from +// Pending to Created once it has verified the supplied fsid / monitors / +// CephX credentials against the real Ceph cluster. +func WaitForCephClusterConnectionCreated(ctx context.Context, kubeconfig *rest.Config, name string, timeout time.Duration) error { + if name == "" { + return fmt.Errorf("name is required") + } + + logger.Debug("Waiting for CephClusterConnection %s phase=Created (timeout: %v)", name, timeout) + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + obj, err := dynamicClient.Resource(CephClusterConnectionGVR).Get(ctx, name, metav1.GetOptions{}) + if err == nil { + phase, _, _ := unstructured.NestedString(obj.Object, "status", "phase") + reason, _, _ := unstructured.NestedString(obj.Object, "status", "reason") + if phase == "Created" { + logger.Success("CephClusterConnection %s is Created", name) + return nil + } + logger.Debug("CephClusterConnection %s phase=%q reason=%q", name, phase, reason) + } else if !apierrors.IsNotFound(err) { + logger.Debug("Error getting CephClusterConnection %s: %v", name, err) + } + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for CephClusterConnection %s: %w", name, ctx.Err()) + case <-ticker.C: + } + } +} diff --git a/pkg/kubernetes/cephcredentials.go b/pkg/kubernetes/cephcredentials.go new file mode 100644 index 0000000..11f68ec --- /dev/null +++ b/pkg/kubernetes/cephcredentials.go @@ -0,0 +1,183 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" +) + +// Well-known Rook resources that hold Ceph connection data. +const ( + // RookMonSecretName is the Secret that the Rook operator populates with + // admin credentials and cluster fsid once the CephCluster is bootstrapped. + RookMonSecretName = "rook-ceph-mon" + + // RookMonEndpointsConfigMapName is the ConfigMap the operator keeps in + // sync with the current set of Ceph monitors. + RookMonEndpointsConfigMapName = "rook-ceph-mon-endpoints" +) + +// CephCredentials holds the information a Ceph CSI client needs to connect +// to a cluster bootstrapped by Rook. +type CephCredentials struct { + // FSID is the Ceph cluster unique identifier. + FSID string + + // AdminUser is the Ceph user name (typically "admin"). + AdminUser string + + // AdminKey is the CephX key for AdminUser. + AdminKey string + + // Monitors is the list of monitor endpoints in "IP:PORT" form, sorted + // alphabetically to make the output stable across runs. + Monitors []string +} + +// WaitForCephCredentials blocks until all pieces of information required to +// connect to the Rook-managed Ceph cluster are populated: +// - Secret `rook-ceph-mon` exists and has `fsid`, `ceph-username`, `ceph-secret`. +// - ConfigMap `rook-ceph-mon-endpoints` exists and has at least one reachable monitor. +// +// The returned CephCredentials is suitable for wiring csi-ceph CRs +// (CephClusterConnection, CephClusterAuthentication). +func WaitForCephCredentials(ctx context.Context, kubeconfig *rest.Config, namespace string, timeout time.Duration) (*CephCredentials, error) { + if namespace == "" { + return nil, fmt.Errorf("namespace is required") + } + + logger.Debug("Waiting for Ceph credentials in %s (timeout: %v)", namespace, timeout) + + clientset, err := NewClientsetWithRetry(ctx, kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to create clientset: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + secret, err := clientset.CoreV1().Secrets(namespace).Get(ctx, RookMonSecretName, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + logger.Debug("Failed to get Secret %s/%s: %v", namespace, RookMonSecretName, err) + } + + cm, cmErr := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, RookMonEndpointsConfigMapName, metav1.GetOptions{}) + if cmErr != nil && !apierrors.IsNotFound(cmErr) { + logger.Debug("Failed to get ConfigMap %s/%s: %v", namespace, RookMonEndpointsConfigMapName, cmErr) + } + + if err == nil && cmErr == nil { + creds, extractErr := extractCephCredentials(secret.Data, cm.Data) + if extractErr == nil { + logger.Success("Ceph credentials ready in %s (fsid=%s, %d monitor(s))", namespace, creds.FSID, len(creds.Monitors)) + return creds, nil + } + logger.Debug("Rook credentials not complete yet: %v", extractErr) + } + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout waiting for Ceph credentials in %s: %w", namespace, ctx.Err()) + case <-ticker.C: + } + } +} + +// extractCephCredentials parses the Rook-managed Secret/ConfigMap payloads +// into a CephCredentials struct. It returns an error if any required field +// is missing so the caller can keep polling until the operator has populated +// everything. +func extractCephCredentials(secretData map[string][]byte, cmData map[string]string) (*CephCredentials, error) { + fsid := strings.TrimSpace(string(secretData["fsid"])) + if fsid == "" { + return nil, fmt.Errorf("Secret %s is missing `fsid`", RookMonSecretName) + } + + adminUser := strings.TrimSpace(string(secretData["ceph-username"])) + if adminUser == "" { + adminUser = "client.admin" + } + adminUser = strings.TrimPrefix(adminUser, "client.") + + adminKey := strings.TrimSpace(string(secretData["ceph-secret"])) + if adminKey == "" { + return nil, fmt.Errorf("Secret %s is missing `ceph-secret`", RookMonSecretName) + } + + raw, ok := cmData["data"] + if !ok { + return nil, fmt.Errorf("ConfigMap %s is missing `data`", RookMonEndpointsConfigMapName) + } + monitors, err := parseMonEndpoints(raw) + if err != nil { + return nil, err + } + if len(monitors) == 0 { + return nil, fmt.Errorf("ConfigMap %s has no populated monitor endpoints", RookMonEndpointsConfigMapName) + } + + return &CephCredentials{ + FSID: fsid, + AdminUser: adminUser, + AdminKey: adminKey, + Monitors: monitors, + }, nil +} + +// parseMonEndpoints parses the Rook-maintained monitor endpoints string. +// +// Rook stores the current mon list in the `data` key of the +// `rook-ceph-mon-endpoints` ConfigMap as a comma-separated list of +// `=:` pairs, for example: +// +// a=10.0.0.1:6789,b=10.0.0.2:6789,c=10.0.0.3:6789 +// +// This helper returns just the `:` portion of every entry, sorted +// alphabetically for stable output. +func parseMonEndpoints(raw string) ([]string, error) { + out := []string{} + for _, part := range strings.Split(raw, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + // Strip the "=" prefix if present. + if idx := strings.Index(part, "="); idx >= 0 { + part = part[idx+1:] + } + if part == "" { + continue + } + out = append(out, part) + } + sort.Strings(out) + return out, nil +} diff --git a/pkg/kubernetes/cephstorageclass.go b/pkg/kubernetes/cephstorageclass.go new file mode 100644 index 0000000..6bf256c --- /dev/null +++ b/pkg/kubernetes/cephstorageclass.go @@ -0,0 +1,230 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" +) + +// CephStorageClassGVR points at csi-ceph's CephStorageClass CR (not to be +// confused with Rook's CephCluster / CephBlockPool). +var CephStorageClassGVR = schema.GroupVersionResource{ + Group: "storage.deckhouse.io", + Version: "v1alpha1", + Resource: "cephstorageclasses", +} + +// Supported CephStorageClass types, mirroring csi-ceph's CRD enum. +const ( + CephStorageClassTypeRBD = "RBD" + CephStorageClassTypeCephFS = "CephFS" +) + +// CephStorageClassConfig is an intentionally narrow shape tailored for the +// e2e scenarios we care about today — an RBD StorageClass backed by a single +// block pool. CephFS variant is supported but requires FSName+FSPool to be +// set by the caller. +type CephStorageClassConfig struct { + // Name of the CephStorageClass CR (becomes the k8s StorageClass name). + Name string + + // ClusterConnectionName points at a CephClusterConnection CR. + ClusterConnectionName string + + // ClusterAuthenticationName points at a CephClusterAuthentication CR. + ClusterAuthenticationName string + + // ReclaimPolicy mirrors StorageClass.ReclaimPolicy ("Delete" / "Retain"). + // Default: "Delete". + ReclaimPolicy string + + // Type is "RBD" (default) or "CephFS". + Type string + + // --- RBD options (Type == "RBD") --- + + // RBDPool is the Ceph pool name (e.g. "ceph-rbd-r1"). + RBDPool string + + // RBDDefaultFSType picks the filesystem mkfs on volume attach. + // Default: "ext4". + RBDDefaultFSType string + + // --- CephFS options (Type == "CephFS") --- + CephFSName string // Name of the CephFilesystem. + CephFSPool string // Pool to use inside that filesystem. +} + +// CreateCephStorageClass creates (or updates) a CephStorageClass CR. On +// success the csi-ceph controller provisions a corresponding core +// storage.k8s.io/v1 StorageClass in the cluster. +func CreateCephStorageClass(ctx context.Context, kubeconfig *rest.Config, cfg CephStorageClassConfig) error { + if cfg.Name == "" { + return fmt.Errorf("CephStorageClass name is required") + } + if cfg.ClusterConnectionName == "" { + return fmt.Errorf("CephStorageClass ClusterConnectionName is required") + } + if cfg.ClusterAuthenticationName == "" { + return fmt.Errorf("CephStorageClass ClusterAuthenticationName is required") + } + if cfg.Type == "" { + cfg.Type = CephStorageClassTypeRBD + } + if cfg.ReclaimPolicy == "" { + cfg.ReclaimPolicy = "Delete" + } + + spec := map[string]interface{}{ + "clusterConnectionName": cfg.ClusterConnectionName, + "clusterAuthenticationName": cfg.ClusterAuthenticationName, + "reclaimPolicy": cfg.ReclaimPolicy, + "type": cfg.Type, + } + + switch cfg.Type { + case CephStorageClassTypeRBD: + if cfg.RBDPool == "" { + return fmt.Errorf("CephStorageClass of type RBD requires RBDPool") + } + if cfg.RBDDefaultFSType == "" { + cfg.RBDDefaultFSType = "ext4" + } + spec["rbd"] = map[string]interface{}{ + "defaultFSType": cfg.RBDDefaultFSType, + "pool": cfg.RBDPool, + } + case CephStorageClassTypeCephFS: + if cfg.CephFSName == "" || cfg.CephFSPool == "" { + return fmt.Errorf("CephStorageClass of type CephFS requires CephFSName and CephFSPool") + } + spec["cephFS"] = map[string]interface{}{ + "fsName": cfg.CephFSName, + "pool": cfg.CephFSPool, + } + default: + return fmt.Errorf("unsupported CephStorageClass Type: %s", cfg.Type) + } + + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "storage.deckhouse.io/v1alpha1", + "kind": "CephStorageClass", + "metadata": map[string]interface{}{ + "name": cfg.Name, + }, + "spec": spec, + }, + } + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + logger.Info("Creating CephStorageClass %s (type=%s, conn=%s, auth=%s)", + cfg.Name, cfg.Type, cfg.ClusterConnectionName, cfg.ClusterAuthenticationName) + _, err = dynamicClient.Resource(CephStorageClassGVR).Create(ctx, obj, metav1.CreateOptions{}) + if err == nil { + return nil + } + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create CephStorageClass %s: %w", cfg.Name, err) + } + + logger.Info("CephStorageClass %s already exists, updating spec", cfg.Name) + existing, err := dynamicClient.Resource(CephStorageClassGVR).Get(ctx, cfg.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch CephStorageClass %s: %w", cfg.Name, err) + } + existing.Object["spec"] = spec + if _, err := dynamicClient.Resource(CephStorageClassGVR).Update(ctx, existing, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update CephStorageClass %s: %w", cfg.Name, err) + } + return nil +} + +// DeleteCephStorageClass removes a CephStorageClass. NotFound is treated as +// success. The underlying k8s StorageClass is removed by the csi-ceph +// controller as a side effect. +func DeleteCephStorageClass(ctx context.Context, kubeconfig *rest.Config, name string) error { + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + if err := dynamicClient.Resource(CephStorageClassGVR).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to delete CephStorageClass %s: %w", name, err) + } + logger.Info("Deleted CephStorageClass %s", name) + return nil +} + +// WaitForCephStorageClassCreated polls until the CephStorageClass status +// reports phase=Created (the csi-ceph controller flips this once the backing +// k8s StorageClass has been provisioned). +func WaitForCephStorageClassCreated(ctx context.Context, kubeconfig *rest.Config, name string, timeout time.Duration) error { + if name == "" { + return fmt.Errorf("name is required") + } + + logger.Debug("Waiting for CephStorageClass %s phase=Created (timeout: %v)", name, timeout) + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + + for { + obj, err := dynamicClient.Resource(CephStorageClassGVR).Get(ctx, name, metav1.GetOptions{}) + if err == nil { + phase, _, _ := unstructured.NestedString(obj.Object, "status", "phase") + reason, _, _ := unstructured.NestedString(obj.Object, "status", "reason") + if phase == "Created" { + logger.Success("CephStorageClass %s is Created", name) + return nil + } + logger.Debug("CephStorageClass %s phase=%q reason=%q", name, phase, reason) + } else if !apierrors.IsNotFound(err) { + logger.Debug("Error getting CephStorageClass %s: %v", name, err) + } + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for CephStorageClass %s: %w", name, ctx.Err()) + case <-ticker.C: + } + } +} diff --git a/pkg/kubernetes/rookconfigoverride.go b/pkg/kubernetes/rookconfigoverride.go new file mode 100644 index 0000000..2027318 --- /dev/null +++ b/pkg/kubernetes/rookconfigoverride.go @@ -0,0 +1,140 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "sort" + "strings" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" +) + +// RookConfigOverrideName is the well-known ConfigMap name Rook reads Ceph +// config overrides from (see Rook docs: "Advanced Configuration – Custom +// ceph.conf Settings"). Rook watches this ConfigMap in its operator namespace +// and injects the `config` key into `/etc/ceph/ceph.conf` of every Ceph daemon. +const RookConfigOverrideName = "rook-config-override" + +// SetRookConfigOverride creates or updates the `rook-config-override` ConfigMap +// in the given Rook operator namespace so that Ceph daemons pick up the +// provided global settings. +// +// The ConfigMap format expected by Rook is: +// +// apiVersion: v1 +// kind: ConfigMap +// metadata: +// name: rook-config-override +// namespace: +// data: +// config: | +// [global] +// key1 = value1 +// key2 = value2 +// +// `globals` is rendered under `[global]`. Keys are sorted for a stable output. +// Passing an empty/nil `globals` map produces an empty `[global]` section, +// which effectively clears previously-set overrides. +func SetRookConfigOverride(ctx context.Context, kubeconfig *rest.Config, namespace string, globals map[string]string) error { + if namespace == "" { + return fmt.Errorf("namespace is required") + } + + clientset, err := NewClientsetWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create clientset: %w", err) + } + + cfg := renderCephGlobalConfig(globals) + + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: RookConfigOverrideName, + Namespace: namespace, + }, + Data: map[string]string{ + "config": cfg, + }, + } + + existing, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, RookConfigOverrideName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + logger.Info("Creating ConfigMap %s/%s with Ceph global overrides (%d keys)", namespace, RookConfigOverrideName, len(globals)) + if _, err := clientset.CoreV1().ConfigMaps(namespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create ConfigMap %s/%s: %w", namespace, RookConfigOverrideName, err) + } + return nil + } + return fmt.Errorf("failed to get ConfigMap %s/%s: %w", namespace, RookConfigOverrideName, err) + } + + logger.Info("Updating ConfigMap %s/%s with Ceph global overrides (%d keys)", namespace, RookConfigOverrideName, len(globals)) + existing.Data = cm.Data + if _, err := clientset.CoreV1().ConfigMaps(namespace).Update(ctx, existing, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update ConfigMap %s/%s: %w", namespace, RookConfigOverrideName, err) + } + return nil +} + +// DeleteRookConfigOverride removes the `rook-config-override` ConfigMap. It +// is safe to call when the ConfigMap does not exist. +func DeleteRookConfigOverride(ctx context.Context, kubeconfig *rest.Config, namespace string) error { + if namespace == "" { + return fmt.Errorf("namespace is required") + } + + clientset, err := NewClientsetWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create clientset: %w", err) + } + + if err := clientset.CoreV1().ConfigMaps(namespace).Delete(ctx, RookConfigOverrideName, metav1.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to delete ConfigMap %s/%s: %w", namespace, RookConfigOverrideName, err) + } + logger.Info("Deleted ConfigMap %s/%s", namespace, RookConfigOverrideName) + return nil +} + +// renderCephGlobalConfig renders a `[global]` section for ceph.conf from the +// provided key/value pairs. Keys are sorted so the rendered output is stable +// across calls with logically-equivalent maps (avoids unnecessary CM updates). +func renderCephGlobalConfig(globals map[string]string) string { + var b strings.Builder + b.WriteString("[global]\n") + + keys := make([]string, 0, len(globals)) + for k := range globals { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + fmt.Fprintf(&b, "%s = %s\n", k, globals[k]) + } + return b.String() +} diff --git a/pkg/kubernetes/storageclass_manage.go b/pkg/kubernetes/storageclass_manage.go new file mode 100644 index 0000000..bb7fb94 --- /dev/null +++ b/pkg/kubernetes/storageclass_manage.go @@ -0,0 +1,100 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" +) + +type StorageClassCreateConfig struct { + Name string + Provisioner string + Parameters map[string]string + VolumeBindingMode storagev1.VolumeBindingMode + ReclaimPolicy corev1.PersistentVolumeReclaimPolicy + AllowExpansion bool + MakeDefault bool + AdditionalLabels map[string]string + AdditionalAnnot map[string]string +} + +func CreateStorageClass(ctx context.Context, kubeconfig *rest.Config, cfg StorageClassCreateConfig) error { + if cfg.Name == "" { + return fmt.Errorf("storage class name is required") + } + if cfg.Provisioner == "" { + return fmt.Errorf("provisioner is required") + } + + clientset, err := NewClientsetWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create clientset: %w", err) + } + + annotations := map[string]string{} + for k, v := range cfg.AdditionalAnnot { + annotations[k] = v + } + if cfg.MakeDefault { + annotations["storageclass.kubernetes.io/is-default-class"] = "true" + annotations["storageclass.beta.kubernetes.io/is-default-class"] = "true" + } + + labels := map[string]string{} + for k, v := range cfg.AdditionalLabels { + labels[k] = v + } + + sc := &storagev1.StorageClass{ + TypeMeta: metav1.TypeMeta{ + Kind: "StorageClass", + APIVersion: "storage.k8s.io/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cfg.Name, + Labels: labels, + Annotations: annotations, + }, + Provisioner: cfg.Provisioner, + Parameters: cfg.Parameters, + ReclaimPolicy: &cfg.ReclaimPolicy, + AllowVolumeExpansion: &cfg.AllowExpansion, + VolumeBindingMode: &cfg.VolumeBindingMode, + } + + logger.Info("Creating StorageClass %s (provisioner=%s)", cfg.Name, cfg.Provisioner) + _, err = clientset.StorageV1().StorageClasses().Create(ctx, sc, metav1.CreateOptions{}) + if err != nil { + if apierrors.IsAlreadyExists(err) { + logger.Info("StorageClass %s already exists, skipping create", cfg.Name) + return nil + } + return fmt.Errorf("failed to create StorageClass %s: %w", cfg.Name, err) + } + logger.Success("StorageClass %s created", cfg.Name) + return nil +} + diff --git a/pkg/kubernetes/volumesnapshotclass.go b/pkg/kubernetes/volumesnapshotclass.go new file mode 100644 index 0000000..9307615 --- /dev/null +++ b/pkg/kubernetes/volumesnapshotclass.go @@ -0,0 +1,125 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" +) + +var VolumeSnapshotClassGVR = schema.GroupVersionResource{ + Group: "snapshot.storage.k8s.io", + Version: "v1", + Resource: "volumesnapshotclasses", +} + +type VolumeSnapshotClassConfig struct { + Name string + Driver string + DeletionPolicy string // "Delete" or "Retain" + Parameters map[string]string + MakeDefault bool +} + +func CreateVolumeSnapshotClass(ctx context.Context, kubeconfig *rest.Config, cfg VolumeSnapshotClassConfig) error { + if cfg.Name == "" { + return fmt.Errorf("volume snapshot class name is required") + } + if cfg.Driver == "" { + return fmt.Errorf("driver is required") + } + if cfg.DeletionPolicy == "" { + cfg.DeletionPolicy = "Delete" + } + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + annotations := map[string]interface{}{} + if cfg.MakeDefault { + annotations["snapshot.storage.kubernetes.io/is-default-class"] = "true" + } + + parameters := map[string]interface{}{} + for k, v := range cfg.Parameters { + parameters[k] = v + } + + vsc := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "snapshot.storage.k8s.io/v1", + "kind": "VolumeSnapshotClass", + "metadata": map[string]interface{}{ + "name": cfg.Name, + "annotations": annotations, + }, + "driver": cfg.Driver, + "deletionPolicy": cfg.DeletionPolicy, + "parameters": parameters, + }, + } + + logger.Info("Creating VolumeSnapshotClass %s (driver=%s, deletionPolicy=%s)", cfg.Name, cfg.Driver, cfg.DeletionPolicy) + _, err = dynamicClient.Resource(VolumeSnapshotClassGVR).Create(ctx, vsc, metav1.CreateOptions{}) + if err != nil { + if apierrors.IsAlreadyExists(err) { + logger.Info("VolumeSnapshotClass %s already exists, skipping create", cfg.Name) + return nil + } + return fmt.Errorf("failed to create VolumeSnapshotClass %s: %w", cfg.Name, err) + } + logger.Success("VolumeSnapshotClass %s created", cfg.Name) + return nil +} + +func WaitForVolumeSnapshotClass(ctx context.Context, kubeconfig *rest.Config, name string, timeout time.Duration) error { + logger.Debug("Waiting for VolumeSnapshotClass %s to become available (timeout: %v)", name, timeout) + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + deadline := time.Now().Add(timeout) + for { + if ctx.Err() != nil { + return ctx.Err() + } + if time.Now().After(deadline) { + return fmt.Errorf("timeout waiting for VolumeSnapshotClass %s", name) + } + + _, err := dynamicClient.Resource(VolumeSnapshotClassGVR).Get(ctx, name, metav1.GetOptions{}) + if err == nil { + logger.Success("VolumeSnapshotClass %s is available", name) + return nil + } + + time.Sleep(5 * time.Second) + } +} diff --git a/pkg/testkit/ceph.go b/pkg/testkit/ceph.go new file mode 100644 index 0000000..f7e0e5e --- /dev/null +++ b/pkg/testkit/ceph.go @@ -0,0 +1,441 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testkit + +import ( + "context" + "fmt" + "time" + + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/infrastructure/ssh" + "github.com/deckhouse/storage-e2e/internal/logger" + "github.com/deckhouse/storage-e2e/pkg/kubernetes" +) + +// CephStorageClassConfig controls the end-to-end provisioning of a +// Rook-managed Ceph cluster plus a csi-ceph-backed k8s StorageClass: +// +// 1. Enables Deckhouse modules required for the stack: +// sds-node-configurator, sds-elastic (Rook), csi-ceph. +// 2. (Optional) Falls back to EnsureDefaultStorageClass to produce a +// sds-local-volume StorageClass for backing OSD PVCs. +// 3. Seeds `rook-config-override` with per-test global Ceph settings +// (e.g. `ms_crc_data = false` for the PR #131 scenario). +// 4. Creates a CephCluster (Rook) and waits until it is Created. +// 5. Creates a CephBlockPool and waits until it is Ready. +// 6. Reads fsid / monitors / CephX admin key from Rook-managed secrets +// and wires them into CephClusterConnection + CephClusterAuthentication +// CRs so csi-ceph can talk to the cluster. +// 7. Creates a CephStorageClass CR and waits for the csi-ceph controller +// to materialize a core storage.k8s.io/v1 StorageClass. +// +// Only StorageClassName is strictly required; everything else has sensible +// defaults tuned for single-node / tiny test clusters. +type CephStorageClassConfig struct { + // --- Top-level identity --- + + // StorageClassName is the name of the CephStorageClass CR (and of the + // resulting k8s StorageClass). Required. + StorageClassName string + + // Namespace is the Rook / sds-elastic namespace. Default: "d8-sds-elastic". + Namespace string + + // --- sds-elastic / Rook CephCluster --- + + // CephClusterName is the Rook CephCluster name. Default: "ceph-cluster". + CephClusterName string + + // CephImage is the Ceph container image tag. Default: "quay.io/ceph/ceph:v18.2.7". + CephImage string + + // MonCount / MgrCount are the Rook mon/mgr replica counts. + // Defaults: 1 / 1 (good for 1..3 node test clusters). + MonCount int + MgrCount int + + // NetworkProvider: "" for CNI (default), "host" for host networking. + NetworkProvider string + PublicNetworkCIDRs []string + ClusterNetworkCIDRs []string + + // GlobalCephConfigOverrides populates `rook-config-override` under + // `[global]`, e.g. {"ms_crc_data": "false"}. nil / empty map leaves + // the ConfigMap untouched except for creating it as an empty `[global]`. + GlobalCephConfigOverrides map[string]string + + // --- OSD backing --- + + // OSDStorageClass is a block-capable StorageClass used to back OSD PVCs. + // When empty, EnsureDefaultStorageClass is invoked with + // OSDBackingStorageClass* to provision a sds-local-volume SC. + OSDStorageClass string + + // OSDCount is the number of OSDs. Default: 1. + OSDCount int + + // OSDSize is the size of each OSD PVC. Default: "20Gi". + OSDSize string + + // --- Fallback SC provisioning via sds-local-volume (when OSDStorageClass is empty) --- + + // OSDBackingStorageClassName names the sds-local-volume SC that we + // auto-provision for OSDs. Default: "sds-local-volume-thin-ceph-osd". + OSDBackingStorageClassName string + + // OSDBackingLVMType passed to EnsureDefaultStorageClass ("Thick"/"Thin"). + // Default: "Thick" (simpler for block-mode PVCs used as Ceph OSDs). + OSDBackingLVMType string + + // OSDBackingIncludeMasters exposes EnsureDefaultStorageClass.IncludeMasters. + OSDBackingIncludeMasters bool + + // OSDBackingBaseKubeconfig/VMNamespace/BaseStorageClassName are plumbed + // through to EnsureDefaultStorageClass to enable automatic VirtualDisk + // attachment on nested-VM clusters. + OSDBackingBaseKubeconfig *rest.Config + OSDBackingVMNamespace string + OSDBackingBaseStorageClassName string + + // MasterSSH is optional SSH access to the control plane. Not used by + // EnsureCephStorageClass in this revision; callers may set it for + // follow-up bootstrap or diagnostics hooks. + MasterSSH ssh.SSHClient + + // --- CephBlockPool --- + + // PoolName is the Rook CephBlockPool name (also becomes the Ceph pool + // name referenced by CephStorageClass.spec.rbd.pool). + // Default: "ceph-rbd-r". + PoolName string + + // ReplicaSize is the CephBlockPool replication factor. Default: 1. + ReplicaSize int + + // FailureDomain is the CRUSH failure domain: "host" or "osd". + // Default: "osd" when ReplicaSize==1, "host" otherwise. + FailureDomain string + + // --- csi-ceph wiring --- + + // ClusterConnectionName and ClusterAuthenticationName point at the + // CephClusterConnection / CephClusterAuthentication CRs we create. + // Defaults: both "-conn". + ClusterConnectionName string + ClusterAuthenticationName string + + // RBDDefaultFSType picks the mkfs used on attach. Default: "ext4". + RBDDefaultFSType string + + // --- Modules --- + + // SkipModuleEnablement disables the module-enable step (useful when the + // caller has already configured ModuleConfig on the cluster). + SkipModuleEnablement bool + + // SdsElasticSettings overrides `spec.settings` of the sds-elastic + // ModuleConfig. Defaults to the minimal set that makes sense on a + // single-node test cluster. + SdsElasticSettings map[string]interface{} + + // CsiCephSettings overrides `spec.settings` of the csi-ceph ModuleConfig. + CsiCephSettings map[string]interface{} + + // CsiCephModulePullOverride pins a specific csi-ceph image tag (dev + // registry only). Useful for testing PRs that haven't been released yet. + CsiCephModulePullOverride string + + // --- Timeouts --- + + ModulesReadyTimeout time.Duration // default 15m + CephClusterReadyTimeout time.Duration // default 20m + CephPoolReadyTimeout time.Duration // default 10m + CredentialsTimeout time.Duration // default 10m + CSICephPhaseTimeout time.Duration // default 5m + StorageClassWaitTimeout time.Duration // default 2m +} + +func (c *CephStorageClassConfig) applyDefaults() { + if c.Namespace == "" { + c.Namespace = kubernetes.DefaultRookNamespace + } + if c.CephClusterName == "" { + c.CephClusterName = kubernetes.DefaultCephClusterName + } + if c.CephImage == "" { + c.CephImage = kubernetes.DefaultCephImage + } + if c.MonCount <= 0 { + c.MonCount = 1 + } + if c.MgrCount <= 0 { + c.MgrCount = 1 + } + if c.OSDCount <= 0 { + c.OSDCount = 1 + } + if c.OSDSize == "" { + c.OSDSize = kubernetes.DefaultOSDStorageClassSize + } + if c.OSDBackingStorageClassName == "" { + c.OSDBackingStorageClassName = "sds-local-volume-thick-ceph-osd" + } + if c.OSDBackingLVMType == "" { + c.OSDBackingLVMType = "Thick" + } + if c.ReplicaSize <= 0 { + c.ReplicaSize = 1 + } + if c.PoolName == "" { + c.PoolName = fmt.Sprintf("ceph-rbd-r%d", c.ReplicaSize) + } + if c.FailureDomain == "" { + if c.ReplicaSize == 1 { + c.FailureDomain = "osd" + } else { + c.FailureDomain = "host" + } + } + if c.ClusterConnectionName == "" { + c.ClusterConnectionName = c.StorageClassName + "-conn" + } + if c.ClusterAuthenticationName == "" { + c.ClusterAuthenticationName = c.StorageClassName + "-conn" + } + if c.RBDDefaultFSType == "" { + c.RBDDefaultFSType = "ext4" + } + if c.ModulesReadyTimeout == 0 { + c.ModulesReadyTimeout = 15 * time.Minute + } + if c.CephClusterReadyTimeout == 0 { + c.CephClusterReadyTimeout = 20 * time.Minute + } + if c.CephPoolReadyTimeout == 0 { + c.CephPoolReadyTimeout = 10 * time.Minute + } + if c.CredentialsTimeout == 0 { + c.CredentialsTimeout = 10 * time.Minute + } + if c.CSICephPhaseTimeout == 0 { + c.CSICephPhaseTimeout = 5 * time.Minute + } + if c.StorageClassWaitTimeout == 0 { + c.StorageClassWaitTimeout = 2 * time.Minute + } +} + +// EnsureCephStorageClass is the high-level entry point that turns an empty +// cluster into one with a working csi-ceph StorageClass. See +// CephStorageClassConfig for the step-by-step flow. +// +// The function is idempotent: re-running it picks up the existing Rook +// CephCluster / pool / csi-ceph CRs and only fills in whatever is still +// missing. Returns the name of the resulting k8s StorageClass. +func EnsureCephStorageClass(ctx context.Context, kubeconfig *rest.Config, cfg CephStorageClassConfig) (string, error) { + cfg.applyDefaults() + + if cfg.StorageClassName == "" { + return "", fmt.Errorf("StorageClassName is required") + } + + logger.Step(1, "Enabling Deckhouse modules for csi-ceph (sds-node-configurator, sds-elastic, csi-ceph)") + if !cfg.SkipModuleEnablement { + if err := ensureCephModules(ctx, kubeconfig, cfg); err != nil { + return "", fmt.Errorf("enable ceph modules: %w", err) + } + } + logger.StepComplete(1, "Modules enabled") + + logger.Step(2, "Resolving OSD backing StorageClass") + osdSC, err := ensureOSDBackingStorageClass(ctx, kubeconfig, &cfg) + if err != nil { + return "", fmt.Errorf("resolve OSD backing StorageClass: %w", err) + } + logger.StepComplete(2, "OSD backing StorageClass: %s", osdSC) + + logger.Step(3, "Seeding rook-config-override ConfigMap") + if err := kubernetes.SetRookConfigOverride(ctx, kubeconfig, cfg.Namespace, cfg.GlobalCephConfigOverrides); err != nil { + return "", fmt.Errorf("set rook-config-override: %w", err) + } + logger.StepComplete(3, "rook-config-override ready (%d global key(s))", len(cfg.GlobalCephConfigOverrides)) + + logger.Step(4, "Creating Rook CephCluster %s/%s", cfg.Namespace, cfg.CephClusterName) + if err := kubernetes.CreateCephCluster(ctx, kubeconfig, kubernetes.CephClusterConfig{ + Name: cfg.CephClusterName, + Namespace: cfg.Namespace, + CephImage: cfg.CephImage, + MonCount: cfg.MonCount, + MgrCount: cfg.MgrCount, + NetworkProvider: cfg.NetworkProvider, + PublicNetworkCIDRs: cfg.PublicNetworkCIDRs, + ClusterNetworkCIDRs: cfg.ClusterNetworkCIDRs, + OSDStorageClass: osdSC, + OSDCount: cfg.OSDCount, + OSDSize: cfg.OSDSize, + }); err != nil { + return "", fmt.Errorf("create CephCluster: %w", err) + } + if err := kubernetes.WaitForCephClusterReady(ctx, kubeconfig, cfg.Namespace, cfg.CephClusterName, cfg.CephClusterReadyTimeout); err != nil { + return "", fmt.Errorf("wait CephCluster: %w", err) + } + logger.StepComplete(4, "CephCluster %s/%s is Created", cfg.Namespace, cfg.CephClusterName) + + logger.Step(5, "Creating CephBlockPool %s/%s (replica=%d, failureDomain=%s)", + cfg.Namespace, cfg.PoolName, cfg.ReplicaSize, cfg.FailureDomain) + if err := kubernetes.CreateCephBlockPool(ctx, kubeconfig, kubernetes.CephBlockPoolConfig{ + Name: cfg.PoolName, + Namespace: cfg.Namespace, + FailureDomain: cfg.FailureDomain, + ReplicaSize: cfg.ReplicaSize, + }); err != nil { + return "", fmt.Errorf("create CephBlockPool: %w", err) + } + if err := kubernetes.WaitForCephBlockPoolReady(ctx, kubeconfig, cfg.Namespace, cfg.PoolName, cfg.CephPoolReadyTimeout); err != nil { + return "", fmt.Errorf("wait CephBlockPool: %w", err) + } + logger.StepComplete(5, "CephBlockPool %s/%s is Ready", cfg.Namespace, cfg.PoolName) + + logger.Step(6, "Extracting Rook-managed Ceph credentials (fsid, monitors, admin key)") + creds, err := kubernetes.WaitForCephCredentials(ctx, kubeconfig, cfg.Namespace, cfg.CredentialsTimeout) + if err != nil { + return "", fmt.Errorf("wait ceph credentials: %w", err) + } + logger.StepComplete(6, "Ceph credentials: fsid=%s, user=%s, %d monitor(s): %v", + creds.FSID, creds.AdminUser, len(creds.Monitors), creds.Monitors) + + logger.Step(7, "Wiring csi-ceph: CephClusterAuthentication %q + CephClusterConnection %q", + cfg.ClusterAuthenticationName, cfg.ClusterConnectionName) + if err := kubernetes.CreateCephClusterAuthentication(ctx, kubeconfig, kubernetes.CephClusterAuthenticationConfig{ + Name: cfg.ClusterAuthenticationName, + UserID: creds.AdminUser, + UserKey: creds.AdminKey, + }); err != nil { + return "", fmt.Errorf("create CephClusterAuthentication: %w", err) + } + if err := kubernetes.CreateCephClusterConnection(ctx, kubeconfig, kubernetes.CephClusterConnectionConfig{ + Name: cfg.ClusterConnectionName, + ClusterID: creds.FSID, + Monitors: creds.Monitors, + UserID: creds.AdminUser, + UserKey: creds.AdminKey, + }); err != nil { + return "", fmt.Errorf("create CephClusterConnection: %w", err) + } + if err := kubernetes.WaitForCephClusterConnectionCreated(ctx, kubeconfig, cfg.ClusterConnectionName, cfg.CSICephPhaseTimeout); err != nil { + return "", fmt.Errorf("wait CephClusterConnection: %w", err) + } + logger.StepComplete(7, "csi-ceph wired against Ceph cluster %s", creds.FSID) + + logger.Step(8, "Creating CephStorageClass %q → StorageClass", cfg.StorageClassName) + if err := kubernetes.CreateCephStorageClass(ctx, kubeconfig, kubernetes.CephStorageClassConfig{ + Name: cfg.StorageClassName, + ClusterConnectionName: cfg.ClusterConnectionName, + ClusterAuthenticationName: cfg.ClusterAuthenticationName, + Type: kubernetes.CephStorageClassTypeRBD, + RBDPool: cfg.PoolName, + RBDDefaultFSType: cfg.RBDDefaultFSType, + }); err != nil { + return "", fmt.Errorf("create CephStorageClass: %w", err) + } + if err := kubernetes.WaitForCephStorageClassCreated(ctx, kubeconfig, cfg.StorageClassName, cfg.CSICephPhaseTimeout); err != nil { + return "", fmt.Errorf("wait CephStorageClass: %w", err) + } + if err := kubernetes.WaitForStorageClass(ctx, kubeconfig, cfg.StorageClassName, cfg.StorageClassWaitTimeout); err != nil { + return "", fmt.Errorf("wait core StorageClass: %w", err) + } + logger.StepComplete(8, "StorageClass %s is available", cfg.StorageClassName) + + logger.Success("Ceph e2e stack ready: CephCluster %s/%s + pool %s → StorageClass %s", + cfg.Namespace, cfg.CephClusterName, cfg.PoolName, cfg.StorageClassName) + return cfg.StorageClassName, nil +} + +// EnsureDefaultCephStorageClass is EnsureCephStorageClass + SetGlobalDefaultStorageClass. +// After this call new PVCs without an explicit storageClassName will use the +// freshly-provisioned Ceph RBD class. +func EnsureDefaultCephStorageClass(ctx context.Context, kubeconfig *rest.Config, cfg CephStorageClassConfig) (string, error) { + scName, err := EnsureCephStorageClass(ctx, kubeconfig, cfg) + if err != nil { + return "", err + } + if err := kubernetes.SetGlobalDefaultStorageClass(ctx, kubeconfig, scName); err != nil { + return "", fmt.Errorf("set %s as default in global ModuleConfig: %w", scName, err) + } + logger.Success("StorageClass %s set as cluster default", scName) + return scName, nil +} + +// ensureCephModules enables sds-node-configurator + sds-elastic + csi-ceph +// and waits for their Ready phase. +func ensureCephModules(ctx context.Context, kubeconfig *rest.Config, cfg CephStorageClassConfig) error { + sdsElasticSettings := cfg.SdsElasticSettings + if sdsElasticSettings == nil { + sdsElasticSettings = map[string]interface{}{} + } + + csiCephSettings := cfg.CsiCephSettings + if csiCephSettings == nil { + csiCephSettings = map[string]interface{}{} + } + + modules := []kubernetes.ModuleSpec{ + { + Name: "sds-node-configurator", + Version: 1, + Enabled: true, + }, + { + Name: "sds-elastic", + Version: 1, + Enabled: true, + Settings: sdsElasticSettings, + Dependencies: []string{"sds-node-configurator"}, + }, + { + Name: "csi-ceph", + Version: 1, + Enabled: true, + Settings: csiCephSettings, + Dependencies: []string{"sds-elastic"}, + ModulePullOverride: cfg.CsiCephModulePullOverride, + }, + } + return kubernetes.EnableModulesAndWait(ctx, kubeconfig, nil, nil, modules, cfg.ModulesReadyTimeout) +} + +// ensureOSDBackingStorageClass returns an already-existing SC name (if the +// caller supplied OSDStorageClass) or delegates to EnsureDefaultStorageClass +// to provision a sds-local-volume SC on the fly. +func ensureOSDBackingStorageClass(ctx context.Context, kubeconfig *rest.Config, cfg *CephStorageClassConfig) (string, error) { + if cfg.OSDStorageClass != "" { + logger.Info("Using pre-existing OSD backing StorageClass %s", cfg.OSDStorageClass) + return cfg.OSDStorageClass, nil + } + + localCfg := DefaultStorageClassConfig{ + StorageClassName: cfg.OSDBackingStorageClassName, + LVMType: cfg.OSDBackingLVMType, + IncludeMasters: cfg.OSDBackingIncludeMasters, + BaseKubeconfig: cfg.OSDBackingBaseKubeconfig, + VMNamespace: cfg.OSDBackingVMNamespace, + BaseStorageClassName: cfg.OSDBackingBaseStorageClassName, + } + return EnsureDefaultStorageClass(ctx, kubeconfig, localCfg) +} diff --git a/tests/csi-ceph/cluster_config.yml b/tests/csi-ceph/cluster_config.yml new file mode 100644 index 0000000..62fcdbc --- /dev/null +++ b/tests/csi-ceph/cluster_config.yml @@ -0,0 +1,56 @@ +# csi-ceph smoke testkit: 3 workers to let a Rook Ceph cluster come up in a +# realistic layout (1 mon/mgr, 3 OSDs by default). Masters are untainted so +# the mon/mgr can land there on tiny clusters as well. +clusterDefinition: + masters: + - hostname: "master-1" + hostType: "vm" + osType: "Ubuntu 22.04 6.2.0-39-generic" + cpu: 4 + coreFraction: 50 + ram: 8 + diskSize: 50 + workers: + - hostname: "worker-1" + hostType: "vm" + osType: "Ubuntu 22.04 6.2.0-39-generic" + cpu: 4 + coreFraction: 50 + ram: 8 + diskSize: 50 + - hostname: "worker-2" + hostType: "vm" + osType: "Ubuntu 22.04 6.2.0-39-generic" + cpu: 4 + coreFraction: 50 + ram: 8 + diskSize: 50 + - hostname: "worker-3" + hostType: "vm" + osType: "Ubuntu 22.04 6.2.0-39-generic" + cpu: 4 + coreFraction: 50 + ram: 8 + diskSize: 50 + dkpParameters: + kubernetesVersion: "Automatic" + podSubnetCIDR: "10.112.0.0/16" + serviceSubnetCIDR: "10.225.0.0/16" + clusterDomain: "cluster.local" + registryRepo: "dev-registry.deckhouse.io/sys/deckhouse-oss" + devBranch: "main" + # Only the bare minimum is pre-enabled here. EnsureCephStorageClass will + # turn on sds-elastic + csi-ceph (and sds-local-volume as OSD backing + # automatically, unless CSI_CEPH_OSD_STORAGE_CLASS is provided). + modules: + - name: "snapshot-controller" + version: 1 + enabled: true + modulePullOverride: "main" + dependencies: [] + - name: "sds-node-configurator" + version: 1 + enabled: true + settings: + enableThinProvisioning: true + dependencies: [] diff --git a/tests/csi-ceph/csi_ceph_suite_test.go b/tests/csi-ceph/csi_ceph_suite_test.go new file mode 100644 index 0000000..c8d1442 --- /dev/null +++ b/tests/csi-ceph/csi_ceph_suite_test.go @@ -0,0 +1,46 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi_ceph + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/logger" +) + +var _ = BeforeSuite(func() { + Expect(config.ValidateEnvironment()).To(Succeed(), "validate environment") + Expect(logger.Initialize()).To(Succeed(), "initialize logger") +}) + +var _ = AfterSuite(func() { + if err := logger.Close(); err != nil { + GinkgoWriter.Printf("Warning: Failed to close logger: %v\n", err) + } +}) + +func TestCsiCeph(t *testing.T) { + RegisterFailHandler(Fail) + suiteConfig, reporterConfig := GinkgoConfiguration() + reporterConfig.Verbose = true + reporterConfig.ShowNodeEvents = false + RunSpecs(t, "csi-ceph (storage-e2e testkit)", suiteConfig, reporterConfig) +} diff --git a/tests/csi-ceph/csi_ceph_test.go b/tests/csi-ceph/csi_ceph_test.go new file mode 100644 index 0000000..9f9a600 --- /dev/null +++ b/tests/csi-ceph/csi_ceph_test.go @@ -0,0 +1,127 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi_ceph + +import ( + "context" + "fmt" + "os" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/pkg/cluster" + k8s "github.com/deckhouse/storage-e2e/pkg/kubernetes" + "github.com/deckhouse/storage-e2e/pkg/testkit" +) + +const ( + // testStorageClassName matches what csi-ceph's smoke test in + // /csi-ceph/e2e also expects, so the two can share a cluster. + testStorageClassName = "e2e-ceph-rbd-r1" + testNamespace = "e2e-csi-ceph-smoke" + testPVCName = "e2e-csi-ceph-smoke-pvc" +) + +var _ = Describe("csi-ceph smoke (storage-e2e reference)", Ordered, func() { + var testClusterResources *cluster.TestClusterResources + + BeforeAll(func() { + cluster.OutputEnvironmentVariables() + }) + + AfterAll(func() { + cluster.CleanupTestClusterResources(testClusterResources) + }) + + It("should create or connect to test cluster", func() { + testClusterResources = cluster.CreateOrConnectToTestCluster() + Expect(testClusterResources).NotTo(BeNil()) + Expect(testClusterResources.Kubeconfig).NotTo(BeNil()) + }) + + It("should ensure Ceph RBD StorageClass via Rook (EnsureCephStorageClass)", func() { + Expect(testClusterResources).NotTo(BeNil()) + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute) + defer cancel() + + cfg := testkit.CephStorageClassConfig{ + StorageClassName: testStorageClassName, + ReplicaSize: 1, + FailureDomain: "osd", + + // When OSDStorageClass is empty EnsureCephStorageClass will fall + // back to EnsureDefaultStorageClass to create a sds-local-volume + // Thick SC on the fly. + OSDStorageClass: os.Getenv("CSI_CEPH_OSD_STORAGE_CLASS"), + OSDBackingIncludeMasters: true, + + // Let callers pin a specific csi-ceph image from a dev-registry PR. + CsiCephModulePullOverride: os.Getenv("CSI_CEPH_MODULE_PULL_OVERRIDE"), + } + + // VirtualDisk attachment for nested-VM clusters. + if testClusterResources.VMResources != nil { + cfg.OSDBackingBaseKubeconfig = testClusterResources.BaseKubeconfig + cfg.OSDBackingVMNamespace = testClusterResources.VMResources.Namespace + cfg.OSDBackingBaseStorageClassName = config.TestClusterStorageClass + } + + scName, err := testkit.EnsureCephStorageClass(ctx, testClusterResources.Kubeconfig, cfg) + Expect(err).NotTo(HaveOccurred(), "EnsureCephStorageClass") + Expect(scName).To(Equal(testStorageClassName)) + }) + + It("should provision a PVC against the Ceph StorageClass", func() { + Expect(testClusterResources).NotTo(BeNil()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + _, err := k8s.CreateNamespaceIfNotExists(ctx, testClusterResources.Kubeconfig, testNamespace) + Expect(err).NotTo(HaveOccurred(), "create test namespace") + + apply, err := k8s.NewApplyClient(testClusterResources.Kubeconfig) + Expect(err).NotTo(HaveOccurred(), "create apply client") + + pvcYAML := fmt.Sprintf(`apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: %s + namespace: %s + labels: + e2e.csi-ceph/smoke: "true" +spec: + accessModes: [ "ReadWriteOnce" ] + resources: + requests: + storage: 1Gi + storageClassName: %s +`, testPVCName, testNamespace, testStorageClassName) + + Expect(apply.ApplyYAML(ctx, pvcYAML, testNamespace)).To(Succeed(), "apply PVC") + + clientset, err := k8s.NewClientsetWithRetry(ctx, testClusterResources.Kubeconfig) + Expect(err).NotTo(HaveOccurred(), "clientset") + + Expect(k8s.WaitForPVCsBound(ctx, clientset, testNamespace, "e2e.csi-ceph/smoke=true", 1, 60, 5*time.Second)). + To(Succeed(), "wait PVC bound") + }) +})