diff --git a/pkg/kubernetes/blockdevice.go b/pkg/kubernetes/blockdevice.go index 2aa6f90..769d7fc 100644 --- a/pkg/kubernetes/blockdevice.go +++ b/pkg/kubernetes/blockdevice.go @@ -46,3 +46,25 @@ func GetConsumableBlockDevices(ctx context.Context, kubeconfig *rest.Config) ([] logger.Debug("Found %d consumable BlockDevices", len(blockDevices)) return blockDevices, nil } + +// GetConsumableBlockDevicesByNode returns consumable BlockDevices for a specific node. +func GetConsumableBlockDevicesByNode(ctx context.Context, kubeconfig *rest.Config, nodeName string) ([]BlockDevice, error) { + if nodeName == "" { + return nil, fmt.Errorf("nodeName is required") + } + + logger.Debug("Getting consumable BlockDevices from node %s", nodeName) + + bdClient, err := storage.NewBlockDeviceClient(ctx, kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to create BlockDevice client: %w", err) + } + + blockDevices, err := bdClient.ListConsumableByNode(ctx, nodeName) + if err != nil { + return nil, err + } + + logger.Debug("Found %d consumable BlockDevices on node %s", len(blockDevices), nodeName) + return blockDevices, nil +} diff --git a/pkg/kubernetes/localstorageclass.go b/pkg/kubernetes/localstorageclass.go new file mode 100644 index 0000000..a3b1f3f --- /dev/null +++ b/pkg/kubernetes/localstorageclass.go @@ -0,0 +1,150 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "fmt" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" +) + +var LocalStorageClassGVR = schema.GroupVersionResource{ + Group: "storage.deckhouse.io", + Version: "v1alpha1", + Resource: "localstorageclasses", +} + +type LocalStorageClassConfig struct { + Name string + LVMVolumeGroups []string // LVMVolumeGroup resource names + LVMType string // "Thick" or "Thin" + ThinPoolName string // required when LVMType is "Thin" + ReclaimPolicy string // "Delete" or "Retain" (default: "Delete") + VolumeBindingMode string // "WaitForFirstConsumer" or "Immediate" (default: "WaitForFirstConsumer") +} + +func CreateLocalStorageClass(ctx context.Context, kubeconfig *rest.Config, cfg LocalStorageClassConfig) error { + if cfg.Name == "" { + return fmt.Errorf("LocalStorageClass name is required") + } + if len(cfg.LVMVolumeGroups) == 0 { + return fmt.Errorf("at least one LVMVolumeGroup is required") + } + if cfg.LVMType == "" { + cfg.LVMType = "Thick" + } + if cfg.LVMType == "Thin" && cfg.ThinPoolName == "" { + return fmt.Errorf("ThinPoolName is required for Thin LVM type") + } + if cfg.ReclaimPolicy == "" { + cfg.ReclaimPolicy = "Delete" + } + if cfg.VolumeBindingMode == "" { + cfg.VolumeBindingMode = "WaitForFirstConsumer" + } + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + lvgRefs := make([]interface{}, len(cfg.LVMVolumeGroups)) + for i, name := range cfg.LVMVolumeGroups { + ref := map[string]interface{}{ + "name": name, + } + if cfg.LVMType == "Thin" { + ref["thin"] = map[string]interface{}{ + "poolName": cfg.ThinPoolName, + } + } + lvgRefs[i] = ref + } + + lsc := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "storage.deckhouse.io/v1alpha1", + "kind": "LocalStorageClass", + "metadata": map[string]interface{}{ + "name": cfg.Name, + }, + "spec": map[string]interface{}{ + "lvm": map[string]interface{}{ + "lvmVolumeGroups": lvgRefs, + "type": cfg.LVMType, + }, + "reclaimPolicy": cfg.ReclaimPolicy, + "volumeBindingMode": cfg.VolumeBindingMode, + }, + }, + } + + logger.Info("Creating LocalStorageClass %s (type=%s, lvmVolumeGroups=%v)", cfg.Name, cfg.LVMType, cfg.LVMVolumeGroups) + _, err = dynamicClient.Resource(LocalStorageClassGVR).Create(ctx, lsc, metav1.CreateOptions{}) + if err != nil { + if apierrors.IsAlreadyExists(err) { + logger.Info("LocalStorageClass %s already exists, skipping create", cfg.Name) + return nil + } + return fmt.Errorf("failed to create LocalStorageClass %s: %w", cfg.Name, err) + } + logger.Success("LocalStorageClass %s created", cfg.Name) + return nil +} + +// WaitForLocalStorageClassCreated waits for the LocalStorageClass CR status to indicate +// that the controller has created the corresponding StorageClass. +func WaitForLocalStorageClassCreated(ctx context.Context, kubeconfig *rest.Config, name string, timeout time.Duration) error { + logger.Debug("Waiting for LocalStorageClass %s to be Created (timeout: %v)", name, timeout) + + dynamicClient, err := NewDynamicClientWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create dynamic client: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + obj, err := dynamicClient.Resource(LocalStorageClassGVR).Get(ctx, name, metav1.GetOptions{}) + if err == nil { + phase, _, _ := unstructured.NestedString(obj.Object, "status", "phase") + if phase == "Created" { + logger.Success("LocalStorageClass %s is Created", name) + return nil + } + logger.Debug("LocalStorageClass %s phase: %s, waiting...", name, phase) + } + + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for LocalStorageClass %s to be Created: %w", name, ctx.Err()) + case <-ticker.C: + } + } +} diff --git a/pkg/kubernetes/nodes.go b/pkg/kubernetes/nodes.go index ab898da..6a59407 100644 --- a/pkg/kubernetes/nodes.go +++ b/pkg/kubernetes/nodes.go @@ -21,6 +21,8 @@ import ( "fmt" "time" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" @@ -29,6 +31,133 @@ import ( const nodeLabelPollInterval = 10 * time.Second +// GetNodes returns the names of all nodes in the cluster. +func GetNodes(ctx context.Context, kubeconfig *rest.Config) ([]corev1.Node, error) { + clientset, err := NewClientsetWithRetry(ctx, kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to create clientset: %w", err) + } + + nodeList, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to list nodes: %w", err) + } + + logger.Debug("Found %d nodes", len(nodeList.Items)) + return nodeList.Items, nil +} + +// GetWorkerNodes returns all worker nodes in the cluster. +// A worker node is any node that does NOT have the "node-role.kubernetes.io/control-plane" label. +func GetWorkerNodes(ctx context.Context, kubeconfig *rest.Config) ([]corev1.Node, error) { + allNodes, err := GetNodes(ctx, kubeconfig) + if err != nil { + return nil, err + } + + var workers []corev1.Node + for _, node := range allNodes { + if _, isMaster := node.Labels["node-role.kubernetes.io/control-plane"]; isMaster { + continue + } + if _, isMaster := node.Labels["node-role.kubernetes.io/master"]; isMaster { + continue + } + workers = append(workers, node) + } + + logger.Debug("Found %d worker nodes", len(workers)) + return workers, nil +} + +// LabelNodes adds a label to each of the specified nodes. +// If a node already has the label with the desired value, it is skipped. +// Uses retry with re-fetch to handle optimistic concurrency conflicts. +func LabelNodes(ctx context.Context, kubeconfig *rest.Config, nodeNames []string, labelKey, labelValue string) error { + if len(nodeNames) == 0 { + return nil + } + + clientset, err := NewClientsetWithRetry(ctx, kubeconfig) + if err != nil { + return fmt.Errorf("failed to create clientset: %w", err) + } + + const maxRetries = 5 + + for _, name := range nodeNames { + var lastErr error + for attempt := 0; attempt < maxRetries; attempt++ { + node, err := clientset.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get node %s: %w", name, err) + } + + if node.Labels != nil { + if v, ok := node.Labels[labelKey]; ok && v == labelValue { + logger.Debug("Node %s already has label %s=%s", name, labelKey, labelValue) + lastErr = nil + break + } + } + + if node.Labels == nil { + node.Labels = make(map[string]string) + } + node.Labels[labelKey] = labelValue + + _, lastErr = clientset.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}) + if lastErr == nil { + logger.Info("Labeled node %s with %s=%s", name, labelKey, labelValue) + break + } + + if apierrors.IsConflict(lastErr) { + logger.Debug("Conflict labeling node %s (attempt %d/%d), retrying...", name, attempt+1, maxRetries) + continue + } + return fmt.Errorf("failed to label node %s: %w", name, lastErr) + } + if lastErr != nil { + return fmt.Errorf("failed to label node %s after %d attempts: %w", name, maxRetries, lastErr) + } + } + + return nil +} + +// GetNodeTaints returns the taints of the named node. +func GetNodeTaints(ctx context.Context, kubeconfig *rest.Config, nodeName string) ([]corev1.Taint, error) { + clientset, err := NewClientsetWithRetry(ctx, kubeconfig) + if err != nil { + return nil, fmt.Errorf("failed to create clientset: %w", err) + } + + node, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get node %s: %w", nodeName, err) + } + + return node.Spec.Taints, nil +} + +// IsNodeCordoned checks whether a node has NoSchedule or NoExecute taints +// that would prevent DaemonSet pods from scheduling. +func IsNodeCordoned(ctx context.Context, kubeconfig *rest.Config, nodeName string) (bool, error) { + taints, err := GetNodeTaints(ctx, kubeconfig, nodeName) + if err != nil { + return false, err + } + + for _, taint := range taints { + if taint.Effect == corev1.TaintEffectNoSchedule || taint.Effect == corev1.TaintEffectNoExecute { + logger.Debug("Node %s has taint %s=%s:%s", nodeName, taint.Key, taint.Value, taint.Effect) + return true, nil + } + } + return false, nil +} + // WaitForNodesLabeled waits for all specified nodes to have the given label with the expected value. // It polls each node in parallel every 10 seconds until all nodes have the label or the context times out. // Parameters: diff --git a/pkg/kubernetes/storageclass.go b/pkg/kubernetes/storageclass.go index c1235f6..ea54770 100644 --- a/pkg/kubernetes/storageclass.go +++ b/pkg/kubernetes/storageclass.go @@ -22,9 +22,12 @@ import ( "sync" "time" + storagev1 "k8s.io/api/storage/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" + "github.com/deckhouse/storage-e2e/internal/kubernetes/deckhouse" "github.com/deckhouse/storage-e2e/internal/logger" ) @@ -56,67 +59,101 @@ func WaitForStorageClasses(ctx context.Context, kubeconfig *rest.Config, storage func WaitForStorageClass(ctx context.Context, kubeconfig *rest.Config, storageClassName string, timeout time.Duration) error { logger.Debug("Waiting for StorageClass %s to become available (timeout: %v)", storageClassName, timeout) - // Create clientset from kubeconfig with retry for transient network errors clientset, err := NewClientsetWithRetry(ctx, kubeconfig) if err != nil { return fmt.Errorf("failed to create clientset: %w", err) } - deadline := time.Now().Add(timeout) - for { - // Check if context is done - if ctx.Err() != nil { - return ctx.Err() - } + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() - // Check if timeout reached - if time.Now().After(deadline) { - return fmt.Errorf("timeout waiting for StorageClass %s", storageClassName) - } + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() - // Try to get the storage class + for { _, err := clientset.StorageV1().StorageClasses().Get(ctx, storageClassName, metav1.GetOptions{}) if err == nil { logger.Success("StorageClass %s is available", storageClassName) return nil } - // Wait a bit before retrying - time.Sleep(5 * time.Second) + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for StorageClass %s: %w", storageClassName, ctx.Err()) + case <-ticker.C: + } } } -// WaitForStorageClassDeletion waits for a storage class to be deleted -func WaitForStorageClassDeletion(ctx context.Context, kubeconfig *rest.Config, storageClassName string, timeout time.Duration) error { - logger.Debug("Waiting for StorageClass %s to be deleted (timeout: %v)", storageClassName, timeout) +// GetDefaultStorageClassName returns the name of the current default StorageClass +// (annotated with storageclass.kubernetes.io/is-default-class=true), or "" if none exists. +func GetDefaultStorageClassName(ctx context.Context, kubeconfig *rest.Config) (string, error) { + clientset, err := NewClientsetWithRetry(ctx, kubeconfig) + if err != nil { + return "", fmt.Errorf("failed to create clientset: %w", err) + } + + scList, err := clientset.StorageV1().StorageClasses().List(ctx, metav1.ListOptions{}) + if err != nil { + return "", fmt.Errorf("failed to list StorageClasses: %w", err) + } + + for _, sc := range scList.Items { + if sc.Annotations["storageclass.kubernetes.io/is-default-class"] == "true" { + return sc.Name, nil + } + } + return "", nil +} - // Create clientset from kubeconfig with retry for transient network errors +// GetStorageClass returns the StorageClass with the given name, or (nil, nil) if it does not exist. +func GetStorageClass(ctx context.Context, kubeconfig *rest.Config, name string) (*storagev1.StorageClass, error) { clientset, err := NewClientsetWithRetry(ctx, kubeconfig) if err != nil { - return fmt.Errorf("failed to create clientset: %w", err) + return nil, fmt.Errorf("failed to create clientset: %w", err) } - deadline := time.Now().Add(timeout) - for { - // Check if context is done - if ctx.Err() != nil { - return ctx.Err() + sc, err := clientset.StorageV1().StorageClasses().Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil } + return nil, fmt.Errorf("failed to get StorageClass %s: %w", name, err) + } + return sc, nil +} + +// SetGlobalDefaultStorageClass updates the "global" ModuleConfig to set +// spec.settings.storageClass to the given name, making it the cluster default. +func SetGlobalDefaultStorageClass(ctx context.Context, kubeconfig *rest.Config, storageClassName string) error { + const moduleName = "global" + const moduleVersion = 1 - // Check if timeout reached - if time.Now().After(deadline) { - return fmt.Errorf("timeout waiting for StorageClass %s to be deleted", storageClassName) + settings := map[string]interface{}{ + "storageClass": storageClassName, + } + + mc, err := deckhouse.GetModuleConfig(ctx, kubeconfig, moduleName) + if err != nil { + if apierrors.IsNotFound(err) { + logger.Info("Creating global ModuleConfig with storageClass=%s", storageClassName) + return deckhouse.CreateModuleConfig(ctx, kubeconfig, moduleName, moduleVersion, true, settings) } + return fmt.Errorf("failed to get global ModuleConfig: %w", err) + } - // Try to get the storage class - _, err := clientset.StorageV1().StorageClasses().Get(ctx, storageClassName, metav1.GetOptions{}) - if err != nil { - // Assume deleted if we can't get it - logger.Success("StorageClass %s is deleted", storageClassName) - return nil + existingSettings := map[string]interface{}{} + if mc.Spec.Settings != nil { + for k, v := range mc.Spec.Settings { + existingSettings[k] = v } + } + existingSettings["storageClass"] = storageClassName - // Wait a bit before retrying - time.Sleep(2 * time.Second) + logger.Info("Updating global ModuleConfig with storageClass=%s", storageClassName) + enabled := true + if mc.Spec.Enabled != nil { + enabled = *mc.Spec.Enabled } + return deckhouse.UpdateModuleConfig(ctx, kubeconfig, moduleName, mc.Spec.Version, enabled, existingSettings) } diff --git a/pkg/testkit/storageclass.go b/pkg/testkit/storageclass.go new file mode 100644 index 0000000..83aae9b --- /dev/null +++ b/pkg/testkit/storageclass.go @@ -0,0 +1,335 @@ +/* +Copyright 2025 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testkit + +import ( + "context" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/rest" + + "github.com/deckhouse/storage-e2e/internal/logger" + "github.com/deckhouse/storage-e2e/pkg/kubernetes" +) + +// DefaultStorageClassConfig configures CreateDefaultStorageClass behavior. +type DefaultStorageClassConfig struct { + // StorageClassName is the name for the created LocalStorageClass (and the resulting StorageClass). + StorageClassName string + + // LVMType is "Thick" or "Thin" (default: "Thin"). + LVMType string + + // ThinPoolName is required when LVMType is "Thin". + ThinPoolName string + + // VGName is the LVM Volume Group name to create on each node (default: "vg-local"). + VGName string + + // NodeNames lists nodes on which to create LVGs. + // If empty, nodes are discovered automatically (workers only, unless IncludeMasters is set). + NodeNames []string + + // IncludeMasters, when true, includes master nodes in the target list. + // Nodes with NoSchedule/NoExecute taints will still receive VirtualDisks and labels, + // but BlockDevice/LVG creation is skipped for them (the DaemonSet won't schedule there). + IncludeMasters bool + + // --- VM disk attachment (optional) --- + // When BaseKubeconfig is set, VirtualDisks are created and attached to worker VMs + // on the base (hypervisor) cluster before waiting for BlockDevices. + // If nil, disk attachment is skipped (disks must be pre-provisioned). + BaseKubeconfig *rest.Config + + // VMNamespace is the namespace in the base cluster where VMs reside. + // Required when BaseKubeconfig is set. + VMNamespace string + + // BaseStorageClassName is the StorageClass on the base cluster used for VirtualDisk PVCs. + // Required when BaseKubeconfig is set. + BaseStorageClassName string + + // DiskSize is the size of each VirtualDisk to attach (default: "20Gi"). + DiskSize string + + // --- Timeouts --- + + // DiskAttachTimeout is how long to wait for all disk attachments (default: 15m). + DiskAttachTimeout time.Duration + + // BlockDeviceWaitTimeout is how long to wait for consumable block devices per node (default: 10m). + BlockDeviceWaitTimeout time.Duration + + // LVGReadyTimeout is how long to wait for each LVG to become Ready (default: 10m). + LVGReadyTimeout time.Duration + + // LocalStorageClassTimeout is how long to wait for LocalStorageClass to reach Created phase (default: 5m). + LocalStorageClassTimeout time.Duration + + // StorageClassWaitTimeout is how long to wait for the resulting StorageClass to appear (default: 2m). + StorageClassWaitTimeout time.Duration +} + +func (c *DefaultStorageClassConfig) applyDefaults() { + if c.LVMType == "" { + c.LVMType = "Thin" + } + if c.VGName == "" { + c.VGName = "vg-local" + } + if c.ThinPoolName == "" { + c.ThinPoolName = "thinpool" + } + if c.DiskSize == "" { + c.DiskSize = "20Gi" + } + if c.DiskAttachTimeout == 0 { + c.DiskAttachTimeout = 15 * time.Minute + } + if c.BlockDeviceWaitTimeout == 0 { + c.BlockDeviceWaitTimeout = 10 * time.Minute + } + if c.LVGReadyTimeout == 0 { + c.LVGReadyTimeout = 10 * time.Minute + } + if c.LocalStorageClassTimeout == 0 { + c.LocalStorageClassTimeout = 5 * time.Minute + } + if c.StorageClassWaitTimeout == 0 { + c.StorageClassWaitTimeout = 2 * time.Minute + } +} + +// CreateDefaultStorageClass is a high-level helper that: +// 1. Discovers target nodes (workers by default; all nodes when IncludeMasters is set). +// 2. Enables sds-node-configurator and sds-local-volume modules. +// 3. Labels target nodes so the sds-node-configurator DaemonSet schedules agents. +// 4. (Optional) If BaseKubeconfig is set, attaches VirtualDisks to target VMs. +// 5. On each schedulable node: waits for consumable BlockDevices, then creates an LVMVolumeGroup. +// Nodes with NoSchedule/NoExecute taints are skipped (DaemonSet won't schedule there). +// 6. Creates a LocalStorageClass CR referencing the created LVGs. +// 7. Waits for the sds-local-volume controller to create the corresponding StorageClass. +func CreateDefaultStorageClass(ctx context.Context, kubeconfig *rest.Config, cfg DefaultStorageClassConfig) (string, error) { + cfg.applyDefaults() + + if cfg.StorageClassName == "" { + return "", fmt.Errorf("StorageClassName is required") + } + if cfg.LVMType == "Thin" && cfg.ThinPoolName == "" { + return "", fmt.Errorf("ThinPoolName is required for Thin LVM type") + } + + // 1. Resolve node list. + nodes := cfg.NodeNames + if len(nodes) == 0 { + var nodeObjs []corev1.Node + var err error + if cfg.IncludeMasters { + nodeObjs, err = kubernetes.GetNodes(ctx, kubeconfig) + if err != nil { + return "", fmt.Errorf("failed to get all nodes: %w", err) + } + } else { + nodeObjs, err = kubernetes.GetWorkerNodes(ctx, kubeconfig) + if err != nil { + return "", fmt.Errorf("failed to get worker nodes: %w", err) + } + } + for _, n := range nodeObjs { + nodes = append(nodes, n.Name) + } + } + if len(nodes) == 0 { + return "", fmt.Errorf("no nodes available for LVG creation") + } + logger.Info("Target nodes (%d): %v", len(nodes), nodes) + + // 2. Enable sds-node-configurator and sds-local-volume modules. + storageModules := []kubernetes.ModuleSpec{ + { + Name: "sds-node-configurator", + Version: 1, + Enabled: true, + }, + { + Name: "sds-local-volume", + Version: 1, + Enabled: true, + Dependencies: []string{"sds-node-configurator"}, + }, + } + if err := kubernetes.EnableModulesAndWait(ctx, kubeconfig, nil, nil, storageModules, 10*time.Minute); err != nil { + return "", fmt.Errorf("failed to enable storage modules: %w", err) + } + + // 3. Label all nodes so sds-node-configurator DaemonSet schedules agents. + const sdsLocalVolumeNodeLabel = "storage.deckhouse.io/sds-local-volume-node" + if err := kubernetes.LabelNodes(ctx, kubeconfig, nodes, sdsLocalVolumeNodeLabel, ""); err != nil { + return "", fmt.Errorf("failed to label nodes for sds-node-configurator: %w", err) + } + + // 4. Attach VirtualDisks to all VMs including masters (VM-based clusters only). + if cfg.BaseKubeconfig != nil { + if cfg.VMNamespace == "" { + return "", fmt.Errorf("VMNamespace is required when BaseKubeconfig is set") + } + if cfg.BaseStorageClassName == "" { + return "", fmt.Errorf("BaseStorageClassName is required when BaseKubeconfig is set") + } + + logger.Info("Attaching VirtualDisks to %d VMs", len(nodes)) + attachCtx, attachCancel := context.WithTimeout(ctx, cfg.DiskAttachTimeout) + defer attachCancel() + + runSuffix := time.Now().Unix() + for _, nodeName := range nodes { + diskName := fmt.Sprintf("%s-sds-local-disk-%d", nodeName, runSuffix) + res, err := kubernetes.AttachVirtualDiskToVM(attachCtx, cfg.BaseKubeconfig, kubernetes.VirtualDiskAttachmentConfig{ + VMName: nodeName, + Namespace: cfg.VMNamespace, + DiskName: diskName, + DiskSize: cfg.DiskSize, + StorageClassName: cfg.BaseStorageClassName, + }) + if err != nil { + return "", fmt.Errorf("failed to attach VirtualDisk to VM %s: %w", nodeName, err) + } + + if err := kubernetes.WaitForVirtualDiskAttached(attachCtx, cfg.BaseKubeconfig, cfg.VMNamespace, res.AttachmentName, 10*time.Second); err != nil { + return "", fmt.Errorf("disk attachment for VM %s did not complete: %w", nodeName, err) + } + } + logger.Success("All VirtualDisks attached") + } + + // 5. For each node: wait for block devices → create LVG → wait for Ready. + // Nodes with NoSchedule/NoExecute taints are skipped (agent DaemonSet won't schedule there). + var lvgNames []string + for _, nodeName := range nodes { + if cfg.IncludeMasters { + cordoned, err := kubernetes.IsNodeCordoned(ctx, kubeconfig, nodeName) + if err != nil { + logger.Warn("Could not check taints for node %s: %v, attempting LVG setup anyway", nodeName, err) + } else if cordoned { + logger.Warn("Skipping LVG setup on node %s: has NoSchedule/NoExecute taint (agent DaemonSet won't schedule)", nodeName) + continue + } + } + + logger.Info("Setting up LVG on node %s", nodeName) + + var bds []kubernetes.BlockDevice + deadline := time.Now().Add(cfg.BlockDeviceWaitTimeout) + for { + if ctx.Err() != nil { + return "", ctx.Err() + } + if time.Now().After(deadline) { + return "", fmt.Errorf("timeout waiting for consumable block devices on node %s", nodeName) + } + + var err error + bds, err = kubernetes.GetConsumableBlockDevicesByNode(ctx, kubeconfig, nodeName) + if err != nil { + logger.Debug("Error getting block devices on %s: %v, retrying...", nodeName, err) + time.Sleep(10 * time.Second) + continue + } + if len(bds) > 0 { + break + } + logger.Debug("No consumable block devices on %s yet, retrying...", nodeName) + time.Sleep(10 * time.Second) + } + logger.Info("Found %d consumable block device(s) on node %s", len(bds), nodeName) + + lvgName := fmt.Sprintf("lvg-%s", nodeName) + err := kubernetes.CreateLVMVolumeGroup(ctx, kubeconfig, lvgName, nodeName, []string{bds[0].Name}, cfg.VGName) + if err != nil && !apierrors.IsAlreadyExists(err) { + return "", fmt.Errorf("failed to create LVG %s: %w", lvgName, err) + } + + if err := kubernetes.WaitForLVMVolumeGroupReady(ctx, kubeconfig, lvgName, cfg.LVGReadyTimeout); err != nil { + return "", fmt.Errorf("LVG %s did not become ready: %w", lvgName, err) + } + + lvgNames = append(lvgNames, lvgName) + } + + // 6. Create LocalStorageClass CR referencing all created LVGs. + err := kubernetes.CreateLocalStorageClass(ctx, kubeconfig, kubernetes.LocalStorageClassConfig{ + Name: cfg.StorageClassName, + LVMVolumeGroups: lvgNames, + LVMType: cfg.LVMType, + ThinPoolName: cfg.ThinPoolName, + }) + if err != nil { + return "", fmt.Errorf("failed to create LocalStorageClass %s: %w", cfg.StorageClassName, err) + } + + // 7. Wait for the controller to process LocalStorageClass and create StorageClass. + if err := kubernetes.WaitForLocalStorageClassCreated(ctx, kubeconfig, cfg.StorageClassName, cfg.LocalStorageClassTimeout); err != nil { + return "", fmt.Errorf("LocalStorageClass %s did not reach Created phase: %w", cfg.StorageClassName, err) + } + + if err := kubernetes.WaitForStorageClass(ctx, kubeconfig, cfg.StorageClassName, cfg.StorageClassWaitTimeout); err != nil { + return "", fmt.Errorf("StorageClass %s did not appear: %w", cfg.StorageClassName, err) + } + + logger.Success("StorageClass %s created via LocalStorageClass with VG %s on %d nodes", cfg.StorageClassName, cfg.VGName, len(nodes)) + return cfg.StorageClassName, nil +} + +// EnsureDefaultStorageClass is an idempotent wrapper around CreateDefaultStorageClass. +// It first checks whether the requested StorageClass already exists. If it does, +// it skips creation. In either case it configures the StorageClass as the cluster +// default via the "global" ModuleConfig (spec.settings.storageClass). +// +// Returns the StorageClass name and any error encountered. +func EnsureDefaultStorageClass(ctx context.Context, kubeconfig *rest.Config, cfg DefaultStorageClassConfig) (string, error) { + cfg.applyDefaults() + + if cfg.StorageClassName == "" { + return "", fmt.Errorf("StorageClassName is required") + } + + existingSC, err := kubernetes.GetStorageClass(ctx, kubeconfig, cfg.StorageClassName) + if err != nil { + return "", fmt.Errorf("failed to check StorageClass %s: %w", cfg.StorageClassName, err) + } + + var scName string + if existingSC != nil { + logger.Info("StorageClass %s already exists, skipping creation", cfg.StorageClassName) + scName = cfg.StorageClassName + } else { + scName, err = CreateDefaultStorageClass(ctx, kubeconfig, cfg) + if err != nil { + return "", fmt.Errorf("failed to create StorageClass %s: %w", cfg.StorageClassName, err) + } + } + + if err := kubernetes.SetGlobalDefaultStorageClass(ctx, kubeconfig, scName); err != nil { + return "", fmt.Errorf("failed to set %s as default in global ModuleConfig: %w", scName, err) + } + logger.Success("StorageClass %s is set as the cluster default", scName) + + return scName, nil +}