diff --git a/simplyblock_core/env_var b/simplyblock_core/env_var index e0ec8a101..0efd231d9 100644 --- a/simplyblock_core/env_var +++ b/simplyblock_core/env_var @@ -1,5 +1,5 @@ SIMPLY_BLOCK_COMMAND_NAME=sbcli-dev SIMPLY_BLOCK_VERSION=19.2.31 -SIMPLY_BLOCK_DOCKER_IMAGE=simplyblock/simplyblock:R25.10-Hotfix -SIMPLY_BLOCK_SPDK_ULTRA_IMAGE=simplyblock/spdk:hotfix-profiling-latest +SIMPLY_BLOCK_DOCKER_IMAGE=public.ecr.aws/simply-block/simplyblock:R25.10-Hotfix-os-fix +SIMPLY_BLOCK_SPDK_ULTRA_IMAGE=public.ecr.aws/simply-block/ultra:R25.10-Hotfix-os-fix-latest diff --git a/simplyblock_core/scripts/charts/values.yaml b/simplyblock_core/scripts/charts/values.yaml index 8d9f34854..bff100a6d 100644 --- a/simplyblock_core/scripts/charts/values.yaml +++ b/simplyblock_core/scripts/charts/values.yaml @@ -19,7 +19,7 @@ grafana: image: simplyblock: repository: "public.ecr.aws/simply-block/simplyblock" - tag: "R25.10-Hotfix" + tag: "R25.10-Hotfix-os-fix" pullPolicy: "Always" ports: diff --git a/simplyblock_core/snode_client.py b/simplyblock_core/snode_client.py index 4fbfd5786..cf9e3fe96 100644 --- a/simplyblock_core/snode_client.py +++ b/simplyblock_core/snode_client.py @@ -80,6 +80,16 @@ def is_live(self): def info(self): return self._request("GET", "info") + def read_allowed_list(self): + return self._request("GET", "read_allowed_list") + + def recalculate_cores_distribution(self, cores, number_of_alceml_devices): + params = { + "cores": cores, + "number_of_alceml_devices": number_of_alceml_devices + } + return self._request("POST", "recalculate_cores_distribution", params) + def spdk_process_start(self, l_cores, spdk_mem, spdk_image=None, spdk_debug=None, cluster_ip=None, fdb_connection=None, namespace=None, server_ip=None, rpc_port=None, rpc_username=None, rpc_password=None, multi_threading_enabled=False, timeout=0, ssd_pcie=None, diff --git a/simplyblock_core/storage_node_ops.py b/simplyblock_core/storage_node_ops.py index 48c4e68ca..34f51a314 100644 --- a/simplyblock_core/storage_node_ops.py +++ b/simplyblock_core/storage_node_ops.py @@ -981,21 +981,6 @@ def add_node(cluster_id, node_addr, iface_name, data_nics_list, logger.error( f"ERROR: The provided cpu mask {req_cpu_count} has values greater than 63, which is not allowed") return False - poller_cpu_cores = node_config.get("distribution").get("poller_cpu_cores") - alceml_cpu_cores = node_config.get("distribution").get("alceml_cpu_cores") - distrib_cpu_cores = node_config.get("distribution").get("distrib_cpu_cores") - alceml_worker_cpu_cores = node_config.get("distribution").get("alceml_worker_cpu_cores") - jc_singleton_core = node_config.get("distribution").get("jc_singleton_core") - app_thread_core = node_config.get("distribution").get("app_thread_core") - jm_cpu_core = node_config.get("distribution").get("jm_cpu_core") - number_of_distribs = node_config.get("number_of_distribs") - - pollers_mask = utils.generate_mask(poller_cpu_cores) - app_thread_mask = utils.generate_mask(app_thread_core) - - if jc_singleton_core: - jc_singleton_mask = utils.decimal_to_hex_power_of_2(jc_singleton_core[0]) - jm_cpu_mask = utils.generate_mask(jm_cpu_core) # Calculate pool count max_prov = 0 @@ -1005,12 +990,6 @@ def add_node(cluster_id, node_addr, iface_name, data_nics_list, logger.error(f"Incorrect max-prov value {max_prov}") return False - number_of_alceml_devices = node_config.get("number_of_alcemls") - # for jm - number_of_alceml_devices += 1 - small_pool_count = node_config.get("small_pool_count") - large_pool_count = node_config.get("large_pool_count") - minimum_hp_memory = node_config.get("huge_page_memory") minimum_hp_memory = max(minimum_hp_memory, max_prov) @@ -1030,16 +1009,7 @@ def add_node(cluster_id, node_addr, iface_name, data_nics_list, # Calculate minimum sys memory minimum_sys_memory = node_config.get("sys_memory") - - # satisfied, spdk_mem = utils.calculate_spdk_memory(minimum_hp_memory, - # minimum_sys_memory, - # int(memory_details['free']), - # int(memory_details['huge_total'])) max_lvol = node_config.get("max_lvol") - - # if not satisfied: - # logger.warning( - # f"Not enough memory for the provided max_lvo: {max_lvol}, max_prov: {max_prov}..") ssd_pcie = node_config.get("ssd_pcis") if ssd_pcie: @@ -1120,6 +1090,40 @@ def add_node(cluster_id, node_addr, iface_name, data_nics_list, if not results: logger.error(f"Failed to start spdk: {err}") return False + number_of_alceml_devices = node_config.get("number_of_alcemls") + # Increase number of alcemls by one for the JM + number_of_alceml_devices += 1 + small_pool_count = node_config.get("small_pool_count") + large_pool_count = node_config.get("large_pool_count") + + cores = snode_api.read_allowed_list() + + if len(cores) == req_cpu_count: + new_distribution = snode_api.recalculate_cores_distribution(cores, number_of_alceml_devices) + poller_cpu_cores = new_distribution.get("poller_cpu_cores") + alceml_cpu_cores = new_distribution.get("alceml_cpu_cores") + distrib_cpu_cores = new_distribution.get("distrib_cpu_cores") + alceml_worker_cpu_cores = new_distribution.get("alceml_worker_cpu_cores") + jc_singleton_core = new_distribution.get("jc_singleton_core") + app_thread_core = new_distribution.get("app_thread_core") + jm_cpu_core = new_distribution.get("jm_cpu_core") + else: + poller_cpu_cores = node_config.get("distribution").get("poller_cpu_cores") + alceml_cpu_cores = node_config.get("distribution").get("alceml_cpu_cores") + distrib_cpu_cores = node_config.get("distribution").get("distrib_cpu_cores") + alceml_worker_cpu_cores = node_config.get("distribution").get("alceml_worker_cpu_cores") + jc_singleton_core = node_config.get("distribution").get("jc_singleton_core") + app_thread_core = node_config.get("distribution").get("app_thread_core") + jm_cpu_core = node_config.get("distribution").get("jm_cpu_core") + number_of_distribs = node_config.get("number_of_distribs") + + pollers_mask = utils.generate_mask(poller_cpu_cores) + app_thread_mask = utils.generate_mask(app_thread_core) + + if jc_singleton_core: + jc_singleton_mask = utils.decimal_to_hex_power_of_2(jc_singleton_core[0]) + jm_cpu_mask = utils.generate_mask(jm_cpu_core) + data_nics = [] @@ -1805,6 +1809,25 @@ def restart_storage_node( except Exception as e: logger.error(e) return False + req_cpu_count = len(utils.hexa_to_cpu_list(snode.spdk_cpu_mask)) + + cores = snode_api.read_allowed_list() + + if len(cores) == req_cpu_count: + new_distribution = snode_api.recalculate_cores_distribution(cores, snode.number_of_alceml_devices) + poller_cpu_cores = new_distribution.get("poller_cpu_cores") + snode.alceml_cpu_cores = new_distribution.get("alceml_cpu_cores") + snode.distrib_cpu_cores = new_distribution.get("distrib_cpu_cores") + snode.alceml_worker_cpu_cores = new_distribution.get("alceml_worker_cpu_cores") + jc_singleton_core = new_distribution.get("jc_singleton_core") + app_thread_core = new_distribution.get("app_thread_core") + jm_cpu_core = new_distribution.get("jm_cpu_core") + snode.pollers_mask = utils.generate_mask(poller_cpu_cores) + snode.app_thread_mask = utils.generate_mask(app_thread_core) + + if jc_singleton_core: + snode.jc_singleton_mask = utils.decimal_to_hex_power_of_2(jc_singleton_core[0]) + snode.jm_cpu_mask = utils.generate_mask(jm_cpu_core) if not results: logger.error(f"Failed to start spdk: {err}") @@ -1886,7 +1909,6 @@ def restart_storage_node( return False qpair = cluster.qpair_count - req_cpu_count = len(utils.hexa_to_cpu_list(snode.spdk_cpu_mask)) if cluster.fabric_tcp: ret = rpc_client.transport_create("TCP", qpair, 512 * (req_cpu_count + 1)) if not ret: diff --git a/simplyblock_core/utils/__init__.py b/simplyblock_core/utils/__init__.py index fb42df8d7..d25bfa779 100644 --- a/simplyblock_core/utils/__init__.py +++ b/simplyblock_core/utils/__init__.py @@ -564,12 +564,12 @@ def calculate_pool_count(alceml_count, number_of_distribs, cpu_count, poller_cou def calculate_minimum_hp_memory(small_pool_count, large_pool_count, lvol_count, max_prov, cpu_count): pool_consumption = (small_pool_count * 8 + large_pool_count * 128) / 1024 memory_consumption = (4 * cpu_count + 1.1 * pool_consumption + 22 * lvol_count) * ( - 1024 * 1024) + constants.EXTRA_HUGE_PAGE_MEMORY + 1024 * 1024) + constants.EXTRA_HUGE_PAGE_MEMORY return int(2.0 * memory_consumption) -def calculate_minimum_sys_memory(max_prov): - minimum_sys_memory = (2000 * 1024) * convert_size(max_prov, 'GB') + 500 * 1024 * 1024 +def calculate_minimum_sys_memory(ssd_list): + minimum_sys_memory = 2147483648 + get_total_capacity_of_nvme_devices(ssd_list) logger.debug(f"Minimum system memory is {humanbytes(minimum_sys_memory)}") return int(minimum_sys_memory) @@ -737,13 +737,15 @@ def convert_size(size: Union[int, str], unit: str, round_up: bool = False) -> in raw = size / (base ** exponent) return math.ceil(raw) if round_up else int(raw) + def first_six_chars(s: str) -> str: """ Returns the first six characters of a given string. If the string is shorter than six characters, returns the entire string. """ return s[:6] - + + def nearest_upper_power_of_2(n): # Check if n is already a power of 2 if (n & (n - 1)) == 0: @@ -1337,6 +1339,20 @@ def detect_nvmes(pci_allowed, pci_blocked, device_model, size_range, nvme_names) return nvmes +def get_total_capacity_of_nvme_devices(pci_lst): + json_string = get_nvme_list_verbose() + data = json.loads(json_string) + total_capacity = 0 + for device_entry in data.get('Devices', []): + for subsystem in device_entry.get('Subsystems', []): + for controller in subsystem.get('Controllers', []): + address = controller.get("Address") + if len(controller.get("Namespaces")) > 0 and address in pci_lst: + total_capacity = controller.get("Namespaces")[0].get("PhysicalSize") + + return int(total_capacity) + + def calculate_unisolated_cores(cores, cores_percentage=0): # calculate the number if unused system cores (UnIsolated cores) total = len(cores) @@ -1356,10 +1372,10 @@ def get_core_indexes(core_to_index, list_of_cores): def build_unisolated_stride( - all_cores: List[int], - num_unisolated: int, - client_qpair_count: int, - pool_stride: int = 2, + all_cores: List[int], + num_unisolated: int, + client_qpair_count: int, + pool_stride: int = 2, ) -> List[int]: """ Build a list of 'unisolated' CPUs by picking from per-qpair pools. @@ -1397,7 +1413,7 @@ def build_unisolated_stride( # Build pools pool_size = math.ceil(total / client_qpair_count) - pools = [cores[i * pool_size : min((i + 1) * pool_size, total)] for i in range(client_qpair_count)] + pools = [cores[i * pool_size: min((i + 1) * pool_size, total)] for i in range(client_qpair_count)] pools = [p for p in pools if p] # drop empties # Per-pool index (within each pool) @@ -1460,8 +1476,7 @@ def generate_core_allocation(cores_by_numa, sockets_to_use, nodes_per_socket, co continue all_cores = sorted(cores_by_numa[numa_node]) num_unisolated = calculate_unisolated_cores(all_cores, cores_percentage) - unisolated = build_unisolated_stride(all_cores,num_unisolated,constants.CLIENT_QPAIR_COUNT) - + unisolated = build_unisolated_stride(all_cores, num_unisolated, constants.CLIENT_QPAIR_COUNT) available_cores = [c for c in all_cores if c not in unisolated] q1 = len(available_cores) // 4 @@ -1577,7 +1592,7 @@ def regenerate_config(new_config, old_config, force=False): old_config["nodes"][i]["small_pool_count"] = small_pool_count old_config["nodes"][i]["large_pool_count"] = large_pool_count old_config["nodes"][i]["huge_page_memory"] = minimum_hp_memory - minimum_sys_memory = calculate_minimum_sys_memory(old_config["nodes"][i]["max_size"]) + minimum_sys_memory = calculate_minimum_sys_memory(old_config["nodes"][i]["ssd_pcis"]) old_config["nodes"][i]["sys_memory"] = minimum_sys_memory memory_details = node_utils.get_memory_details() @@ -1733,7 +1748,7 @@ def generate_configs(max_lvol, max_prov, sockets_to_use, nodes_per_socket, pci_a node_info["max_lvol"] = max_lvol node_info["max_size"] = max_prov node_info["huge_page_memory"] = max(minimum_hp_memory, max_prov) - minimum_sys_memory = calculate_minimum_sys_memory(max_prov) + minimum_sys_memory = calculate_minimum_sys_memory(node_info["ssd_pcis"]) node_info["sys_memory"] = minimum_sys_memory all_nodes.append(node_info) node_index += 1 @@ -2263,9 +2278,9 @@ def create_docker_service(cluster_docker: DockerClient, service_name: str, servi "com.docker.stack.namespace": "app"} ) + def create_k8s_service(namespace: str, deployment_name: str, container_name: str, service_file: str, container_image: str): - logger.info(f"Creating deployment: {deployment_name} in namespace {namespace}") load_kube_config_with_fallback() apps_v1 = client.AppsV1Api() @@ -2335,6 +2350,7 @@ def create_k8s_service(namespace: str, deployment_name: str, apps_v1.create_namespaced_deployment(namespace=namespace, body=deployment) logger.info(f"Deployment {deployment_name} created successfully.") + def clean_partitions(nvme_device: str): command = ['wipefs', '-a', nvme_device] print(" ".join(command)) @@ -2608,6 +2624,7 @@ def clean_devices(config_path, format, force): except json.JSONDecodeError as e: logger.error(f"Error decoding JSON: {e}") + def create_rpc_socket_mount(): try: @@ -2664,3 +2681,15 @@ def calculate_hp_only(max_lvol, number_of_devices, sockets_to_use, nodes_per_soc node_index += 1 return convert_size(minimum_hp_memory, 'MB') + +def recalculate_cores_distribution(cores, number_of_alcemls): + distribution = calculate_core_allocations(cores, number_of_alcemls) + core_to_index = {core: idx for idx, core in enumerate(cores)} + return { + "app_thread_core": get_core_indexes(core_to_index, distribution[0]), + "jm_cpu_core": get_core_indexes(core_to_index, distribution[1]), + "poller_cpu_cores": get_core_indexes(core_to_index, distribution[2]), + "alceml_cpu_cores": get_core_indexes(core_to_index, distribution[3]), + "alceml_worker_cpu_cores": get_core_indexes(core_to_index, distribution[4]), + "distrib_cpu_cores": get_core_indexes(core_to_index, distribution[5]), + "jc_singleton_core": get_core_indexes(core_to_index, distribution[6])} diff --git a/simplyblock_web/api/internal/storage_node/docker.py b/simplyblock_web/api/internal/storage_node/docker.py index 5281b2952..66efbfa30 100644 --- a/simplyblock_web/api/internal/storage_node/docker.py +++ b/simplyblock_web/api/internal/storage_node/docker.py @@ -73,7 +73,7 @@ def get_amazon_cloud_info(): import ec2_metadata import requests session = requests.session() - data = ec2_metadata.EC2Metadata(session=session).instance_identity_document # type: ignore[call-arg] + data = ec2_metadata.EC2Metadata(session=session).instance_identity_document # type: ignore[call-arg] return { "id": data["instanceId"], "type": data["instanceType"], @@ -269,7 +269,7 @@ def spdk_process_is_up(query: utils.RPCPortParams): except Exception as e: logger.error(e) logger.debug(f"function:spdk_process_is_up end f{req_unique_id}") - total_time = int(( time.time_ns()-req_unique_id)/(1000*1000*1000)) + total_time = int((time.time_ns() - req_unique_id) / (1000 * 1000 * 1000)) logger.debug(f"function:spdk_process_is_up total time {total_time}") return utils.get_response(False, f"container not found: /spdk_{query.rpc_port}") @@ -523,7 +523,7 @@ def delete_gpt_partitions_for_dev(body: utils.DeviceParams): cmd = f"parted -fs /dev/{device_name} mklabel gpt" out, err, ret_code = shell_utils.run_command(cmd) logger.info(f"out: {out}, err: {err}, ret_code: {ret_code}") - return utils.get_response(ret_code==0, error=err) + return utils.get_response(ret_code == 0, error=err) CPU_INFO = cpuinfo.get_cpu_info() @@ -718,3 +718,45 @@ def ifc_is_tcp(query: NicQuery): }) def is_alive(): return utils.get_response(True) + + +@api.get('/read_allowed_list', responses={ + 200: {'content': {'application/json': {'schema': utils.response_schema({ + 'type': 'object', + 'additionalProperties': True, + })}}}, +}) +def read_allowed_list(): + try: + with open("/etc/simplyblock/allowed_list") as f: + cores = [int(line.strip()) for line in f if line.strip()] + except Exception: + cores = [] + resp = utils.get_response(cores) + return resp + + +class CoresParams(BaseModel): + cores: Optional[List[int]] = Field(default=None) + number_of_alceml_devices: Optional[int] = Field(None, ge=0) + + +@api.post('/recalculate_cores_distribution', responses={ + 200: {'content': {'application/json': {'schema': utils.response_schema({ + 'type': 'boolean' + })}}}, +}) +def recalculate_cores_distribution(body: CoresParams): + cores = body.cores + number_of_alceml_devices = body.number_of_alceml_devices + distribution = init_utils.recalculate_cores_distribution(cores, number_of_alceml_devices) + + resp = utils.get_response({ + "app_thread_core": distribution["distribution"], + "jm_cpu_core": distribution["jm_cpu_core"], + "poller_cpu_cores": distribution["poller_cpu_cores"], + "alceml_cpu_cores": distribution["alceml_cpu_cores"], + "alceml_worker_cpu_cores": distribution["alceml_worker_cpu_cores"], + "distrib_cpu_cores": distribution["distrib_cpu_cores"], + "jc_singleton_core": distribution["jc_singleton_core"]}) + return resp diff --git a/simplyblock_web/api/internal/storage_node/kubernetes.py b/simplyblock_web/api/internal/storage_node/kubernetes.py index c3df14086..a2e2ae804 100644 --- a/simplyblock_web/api/internal/storage_node/kubernetes.py +++ b/simplyblock_web/api/internal/storage_node/kubernetes.py @@ -296,7 +296,6 @@ def spdk_process_start(body: SPDKParams): spdk_process_kill(query) node_prepration_job_name = "snode-spdk-job-" - node_prepration_core_name = "snode-spdk-core-isolate-" node_prepration_ubuntu_name = "snode-spdk-ubuntu-extra-" node_name = os.environ.get("HOSTNAME", "") @@ -314,23 +313,32 @@ def spdk_process_start(body: SPDKParams): # limit the job name length to 63 characters k8s_job_name_length = len(node_prepration_job_name+node_name) - core_name_length = len(node_prepration_core_name+node_name) ubuntu_name_length = len(node_prepration_ubuntu_name+node_name) if k8s_job_name_length > 63: node_prepration_job_name += node_name[k8s_job_name_length-63:] else: node_prepration_job_name += node_name - - if core_name_length > 63: - node_prepration_core_name += node_name[core_name_length-63:] - else: - node_prepration_core_name += node_name if ubuntu_name_length > 63: node_prepration_ubuntu_name += node_name[ubuntu_name_length-63:] else: node_prepration_ubuntu_name += node_name + cpu_topology_enabled = os.environ.get("CPU_TOPOLOGY_ENABLED", False) + if isinstance(cpu_topology_enabled, str): + cpu_topology_enabled = cpu_topology_enabled.strip().lower() in ("true") + skip_kubelet_configuration = os.environ.get("SKIP_KUBELET_CONFIGURATION", False) + if isinstance(skip_kubelet_configuration, str): + skip_kubelet_configuration = skip_kubelet_configuration.strip().lower() in ("true") + reserved_system_cpus = os.environ.get("RESERVED_SYSTEM_CPUS", "0,1") + node_prepration_core_name = "snode-spdk-core-isolate-" + if cpu_topology_enabled: + node_prepration_core_name = "snode-spdk-enable-cpu-topology-" + core_name_length = len(node_prepration_core_name+node_name) + if core_name_length > 63: + node_prepration_core_name += node_name[core_name_length-63:] + else: + node_prepration_core_name += node_name logger.debug(f"deploying k8s job to prepare worker: {node_name}") try: @@ -360,7 +368,9 @@ def spdk_process_start(body: SPDKParams): 'PCI_ALLOWED': ssd_pcie_list, 'TOTAL_HP': total_mem_mib, 'NSOCKET': body.socket, - 'FW_PORT': body.firewall_port + 'FW_PORT': body.firewall_port, + 'CPU_TOPOLOGY_ENABLED': cpu_topology_enabled, + 'RESERVED_SYSTEM_CPUS': reserved_system_cpus } if ubuntu_host: @@ -404,29 +414,17 @@ def spdk_process_start(body: SPDKParams): ) ) logger.info(f"Job deleted: '{job_resp.metadata.name}' in namespace '{namespace}") - - if core_isolate and not openshift: - core_template = env.get_template('storage_core_isolation.yaml.j2') - core_yaml = yaml.safe_load(core_template.render(values)) - batch_v1 = core_utils.get_k8s_batch_client() - core_resp = batch_v1.create_namespaced_job(namespace=namespace, body=core_yaml) - msg = f"Job created: '{core_resp.metadata.name}' in namespace '{namespace}" - logger.info(msg) - - node_utils_k8s.wait_for_job_completion(core_resp.metadata.name, namespace) - logger.info(f"Job '{core_resp.metadata.name}' completed successfully") - - batch_v1.delete_namespaced_job( - name=core_resp.metadata.name, - namespace=namespace, - body=V1DeleteOptions( - propagation_policy='Foreground', - grace_period_seconds=0 - ) - ) - logger.info(f"Job deleted: '{core_resp.metadata.name}' in namespace '{namespace}") - - elif core_isolate and openshift: + if (cpu_topology_enabled and not skip_kubelet_configuration) or (core_isolate and not cpu_topology_enabled): + if cpu_topology_enabled and not skip_kubelet_configuration: + if not openshift: + template_name = 'storage_cpu_topology.yaml.j2' + else: + template_name = 'oc_storage_cpu_topology.yaml.j2' + elif core_isolate: + if not openshift: + template_name = 'storage_core_isolation.yaml.j2' + else: + template_name = 'oc_storage_core_isolation.yaml.j2' batch_v1 = core_utils.get_k8s_batch_client() try: batch_v1.read_namespaced_job( @@ -453,16 +451,14 @@ def spdk_process_start(body: SPDKParams): logger.info(f"No pre-existing Job '{node_prepration_core_name}' found. Proceeding.") else: raise - - core_template = env.get_template('oc_storage_core_isolation.yaml.j2') + core_template = env.get_template(template_name) core_yaml = yaml.safe_load(core_template.render(values)) + batch_v1 = core_utils.get_k8s_batch_client() core_resp = batch_v1.create_namespaced_job(namespace=namespace, body=core_yaml) msg = f"Job created: '{core_resp.metadata.name}' in namespace '{namespace}" logger.info(msg) - node_utils_k8s.wait_for_job_completion(core_resp.metadata.name, namespace) logger.info(f"Job '{core_resp.metadata.name}' completed successfully") - batch_v1.delete_namespaced_job( name=core_resp.metadata.name, namespace=namespace, @@ -475,12 +471,13 @@ def spdk_process_start(body: SPDKParams): env = Environment(loader=PackageLoader('simplyblock_web', 'templates'), trim_blocks=True, lstrip_blocks=True) template = env.get_template('storage_deploy_spdk.yaml.j2') - dep = yaml.safe_load(template.render(values)) - logger.debug(dep) + docs = yaml.safe_load_all(template.render(values)) k8s_core_v1 = core_utils.get_k8s_core_client() - resp = k8s_core_v1.create_namespaced_pod(body=dep, namespace=namespace) - msg = f"Pod created: '{resp.metadata.name}' in namespace '{namespace}" - logger.info(msg) + for dep in docs: + logger.debug(dep) + resp = k8s_core_v1.create_namespaced_pod(body=dep, namespace=namespace) + msg = f"Pod created: '{resp.metadata.name}' in namespace '{namespace}'" + logger.info(msg) except Exception: return utils.get_response(False, f"Pod failed:\n{traceback.format_exc()}") @@ -689,3 +686,7 @@ def spdk_proxy_restart(query: utils.RPCPortParams): api.post('/format_device_with_4k')(snode_ops.format_device_with_4k) +api.get('/read_allowed_list')(snode_ops.read_allowed_list) + +api.post('/recalculate_cores_distribution')(snode_ops.recalculate_cores_distribution) + diff --git a/simplyblock_web/templates/oc_storage_core_isolation.yaml.j2 b/simplyblock_web/templates/oc_storage_core_isolation.yaml.j2 index 85bfd0f7b..9b7aa7706 100644 --- a/simplyblock_web/templates/oc_storage_core_isolation.yaml.j2 +++ b/simplyblock_web/templates/oc_storage_core_isolation.yaml.j2 @@ -113,7 +113,7 @@ spec: EOF echo "[INFO] Init setup and CPU isolation complete." - + echo "[INFO] Marking node as configured." touch "$MARKER" diff --git a/simplyblock_web/templates/oc_storage_cpu_topology.yaml.j2 b/simplyblock_web/templates/oc_storage_cpu_topology.yaml.j2 new file mode 100644 index 000000000..7c2504cbb --- /dev/null +++ b/simplyblock_web/templates/oc_storage_cpu_topology.yaml.j2 @@ -0,0 +1,99 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: {{ CORE_JOBNAME }} + namespace: {{ NAMESPACE }} +spec: + template: + spec: + restartPolicy: OnFailure + nodeSelector: + kubernetes.io/hostname: {{ HOSTNAME }} + hostNetwork: true + hostPID: true + serviceAccountName: simplyblock-storage-node-sa + tolerations: + - effect: NoSchedule + operator: Exists + - effect: NoExecute + operator: Exists + volumes: + - name: var-simplyblock + hostPath: + path: /var/simplyblock + containers: + - name: init-setup + image: alpine/kubectl:1.34.1 + securityContext: + privileged: true + volumeMounts: + - name: var-simplyblock + mountPath: /var/simplyblock + command: ["/bin/sh", "-c"] + args: + - | + set -e + MARKER="/var/simplyblock/.cpu_topology_applied" + if [[ -f "$MARKER" ]]; then + echo "[INFO] Node already configured. Skipping sleep and exiting..." + exit 0 + fi + + SHORT_ID=$(echo "{{ HOSTNAME }}" | rev | cut -d'-' -f1 | rev) + # Create the Machine config pool + cat <> "$KUBELET_CFG" + fi + } + + set_key "cpuManagerPolicy" "static" + set_key "cpuManagerReconcilePeriod" "5s" + set_key "topologyManagerPolicy" "single-numa-node" + set_key "topologyManagerScope" "pod" + set_key "reservedSystemCPUs" {{ RESERVED_SYSTEM_CPUS }} + + echo "[INFO] kubelet config updated:" + grep -E "^(cpuManagerPolicy|cpuManagerReconcilePeriod|topologyManagerPolicy|topologyManagerScope|reservedSystemCPUs):" "$KUBELET_CFG" || true + + # Ensure kubelet is actually using this config file. + # Many kubeadm setups already do via systemd: ExecStart=... --config=/var/lib/kubelet/config.yaml + # If not present, add a systemd drop-in to force it. + mkdir -p "$SYSTEMD_UNIT_DIR" + if ! grep -R --quiet -- "--config=/var/lib/kubelet/config.yaml" /host/etc/systemd/system /host/lib/systemd/system /host/usr/lib/systemd/system 2>/dev/null; then + echo "[WARN] Did not detect kubelet --config flag; adding drop-in $DROPIN" + cat > "$DROPIN" <<'EOF' + [Service] + Environment="KUBELET_EXTRA_ARGS=--config=/var/lib/kubelet/config.yaml" + EOF + fi + + # Restart kubelet + echo "removing old cpu_manager_state..." + rm -rf $CPU_MNG + + echo "[INFO] Restarting kubelet via host systemd..." + nsenter --target 1 --mount --uts --ipc --net --pid -- sh -c "systemctl daemon-reload && systemctl restart kubelet" + + echo "[INFO] Marking node as configured." + touch "$MARKER" + + echo "[INFO] Done." + \ No newline at end of file diff --git a/simplyblock_web/templates/storage_deploy_spdk.yaml.j2 b/simplyblock_web/templates/storage_deploy_spdk.yaml.j2 index 6e801f009..df152797a 100644 --- a/simplyblock_web/templates/storage_deploy_spdk.yaml.j2 +++ b/simplyblock_web/templates/storage_deploy_spdk.yaml.j2 @@ -66,6 +66,13 @@ spec: volumeMounts: - name: foundationdb mountPath: /etc/foundationdb + resources: + requests: + cpu: "100m" + memory: "64Mi" + limits: + cpu: "100m" + memory: "64Mi" containers: - name: spdk-container @@ -125,8 +132,16 @@ spec: limits: hugepages-2Mi: {{ MEM_MEGA }}Mi cpu: {{ CORES }} + {% if CPU_TOPOLOGY_ENABLED %} + memory: {{ MEM2_MEGA }}Mi + numa-align/numa-{{ NSOCKET }}: 1 + {% endif %} requests: hugepages-2Mi: {{ MEM_MEGA }}Mi + {% if CPU_TOPOLOGY_ENABLED %} + memory: {{ MEM2_MEGA }}Mi + numa-align/numa-{{ NSOCKET }}: 1 + {% endif %} - name: spdk-proxy-container image: {{ SIMPLYBLOCK_DOCKER_IMAGE }} @@ -148,7 +163,42 @@ spec: value: "True" - name: TIMEOUT value: "300" - {% if MODE == "docker" %} + {% if CPU_TOPOLOGY_ENABLED %} + resources: + limits: + cpu: 1 + memory: "128Mi" + requests: + cpu: 1 + memory: "128Mi" + {% endif %} + +{% if MODE == "docker" %} +--- +apiVersion: v1 +kind: Pod +metadata: + name: simplyblock-fluentd-{{ RPC_PORT }}-{{ CLUSTER_ID }} + namespace: {{ NAMESPACE }} + labels: + app: simplyblock-fluentd-app-{{ RPC_PORT }} +spec: + serviceAccountName: simplyblock-storage-node-sa + nodeSelector: + kubernetes.io/hostname: {{ HOSTNAME }} + tolerations: + - effect: NoSchedule + operator: Exists + - effect: NoExecute + operator: Exists + volumes: + - name: varlog + hostPath: + path: /var/log + - name: dockercontainerlogdirectory + hostPath: + path: /var/log/pods + containers: - name: fluentd image: public.ecr.aws/simply-block/fluentd-kubernetes-daemonset:v1.17.1-debian-graylog-1.2 imagePullPolicy: "Always" @@ -165,6 +215,8 @@ spec: value: "tcp" - name: FLUENTD_SYSTEMD_CONF value: "disable" + - name: FLUENT_CONTAINER_TAIL_PATH + value: "/var/log/pods/{{ NAMESPACE }}_snode-spdk-pod-{{ RPC_PORT }}-{{ CLUSTER_ID }}_*/*/*.log" - name: FLUENT_CONTAINER_TAIL_EXCLUDE_PATH value: /var/log/pods/fluent* - name: FLUENT_CONTAINER_TAIL_PARSER_TYPE