Skip to content
Open
4 changes: 2 additions & 2 deletions simplyblock_core/env_var
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SIMPLY_BLOCK_COMMAND_NAME=sbcli-dev
SIMPLY_BLOCK_VERSION=19.2.31

SIMPLY_BLOCK_DOCKER_IMAGE=simplyblock/simplyblock:R25.10-Hotfix
SIMPLY_BLOCK_SPDK_ULTRA_IMAGE=simplyblock/spdk:hotfix-profiling-latest
SIMPLY_BLOCK_DOCKER_IMAGE=public.ecr.aws/simply-block/simplyblock:R25.10-Hotfix-os-fix
SIMPLY_BLOCK_SPDK_ULTRA_IMAGE=public.ecr.aws/simply-block/ultra:R25.10-Hotfix-os-fix-latest
2 changes: 1 addition & 1 deletion simplyblock_core/scripts/charts/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ grafana:
image:
simplyblock:
repository: "public.ecr.aws/simply-block/simplyblock"
tag: "R25.10-Hotfix"
tag: "R25.10-Hotfix-os-fix"
pullPolicy: "Always"

ports:
Expand Down
10 changes: 10 additions & 0 deletions simplyblock_core/snode_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ def is_live(self):
def info(self):
return self._request("GET", "info")

def read_allowed_list(self):
return self._request("GET", "read_allowed_list")

def recalculate_cores_distribution(self, cores, number_of_alceml_devices):
params = {
"cores": cores,
"number_of_alceml_devices": number_of_alceml_devices
}
return self._request("POST", "recalculate_cores_distribution", params)

def spdk_process_start(self, l_cores, spdk_mem, spdk_image=None, spdk_debug=None, cluster_ip=None,
fdb_connection=None, namespace=None, server_ip=None, rpc_port=None,
rpc_username=None, rpc_password=None, multi_threading_enabled=False, timeout=0, ssd_pcie=None,
Expand Down
84 changes: 53 additions & 31 deletions simplyblock_core/storage_node_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,21 +981,6 @@ def add_node(cluster_id, node_addr, iface_name, data_nics_list,
logger.error(
f"ERROR: The provided cpu mask {req_cpu_count} has values greater than 63, which is not allowed")
return False
poller_cpu_cores = node_config.get("distribution").get("poller_cpu_cores")
alceml_cpu_cores = node_config.get("distribution").get("alceml_cpu_cores")
distrib_cpu_cores = node_config.get("distribution").get("distrib_cpu_cores")
alceml_worker_cpu_cores = node_config.get("distribution").get("alceml_worker_cpu_cores")
jc_singleton_core = node_config.get("distribution").get("jc_singleton_core")
app_thread_core = node_config.get("distribution").get("app_thread_core")
jm_cpu_core = node_config.get("distribution").get("jm_cpu_core")
number_of_distribs = node_config.get("number_of_distribs")

pollers_mask = utils.generate_mask(poller_cpu_cores)
app_thread_mask = utils.generate_mask(app_thread_core)

if jc_singleton_core:
jc_singleton_mask = utils.decimal_to_hex_power_of_2(jc_singleton_core[0])
jm_cpu_mask = utils.generate_mask(jm_cpu_core)

# Calculate pool count
max_prov = 0
Expand All @@ -1005,12 +990,6 @@ def add_node(cluster_id, node_addr, iface_name, data_nics_list,
logger.error(f"Incorrect max-prov value {max_prov}")
return False

number_of_alceml_devices = node_config.get("number_of_alcemls")
# for jm
number_of_alceml_devices += 1
small_pool_count = node_config.get("small_pool_count")
large_pool_count = node_config.get("large_pool_count")

minimum_hp_memory = node_config.get("huge_page_memory")

minimum_hp_memory = max(minimum_hp_memory, max_prov)
Expand All @@ -1030,16 +1009,7 @@ def add_node(cluster_id, node_addr, iface_name, data_nics_list,

# Calculate minimum sys memory
minimum_sys_memory = node_config.get("sys_memory")

# satisfied, spdk_mem = utils.calculate_spdk_memory(minimum_hp_memory,
# minimum_sys_memory,
# int(memory_details['free']),
# int(memory_details['huge_total']))
max_lvol = node_config.get("max_lvol")

# if not satisfied:
# logger.warning(
# f"Not enough memory for the provided max_lvo: {max_lvol}, max_prov: {max_prov}..")
ssd_pcie = node_config.get("ssd_pcis")

if ssd_pcie:
Expand Down Expand Up @@ -1120,6 +1090,40 @@ def add_node(cluster_id, node_addr, iface_name, data_nics_list,
if not results:
logger.error(f"Failed to start spdk: {err}")
return False
number_of_alceml_devices = node_config.get("number_of_alcemls")
# Increase number of alcemls by one for the JM
number_of_alceml_devices += 1
small_pool_count = node_config.get("small_pool_count")
large_pool_count = node_config.get("large_pool_count")

cores = snode_api.read_allowed_list()

if len(cores) == req_cpu_count:
new_distribution = snode_api.recalculate_cores_distribution(cores, number_of_alceml_devices)
poller_cpu_cores = new_distribution.get("poller_cpu_cores")
alceml_cpu_cores = new_distribution.get("alceml_cpu_cores")
distrib_cpu_cores = new_distribution.get("distrib_cpu_cores")
alceml_worker_cpu_cores = new_distribution.get("alceml_worker_cpu_cores")
jc_singleton_core = new_distribution.get("jc_singleton_core")
app_thread_core = new_distribution.get("app_thread_core")
jm_cpu_core = new_distribution.get("jm_cpu_core")
else:
poller_cpu_cores = node_config.get("distribution").get("poller_cpu_cores")
alceml_cpu_cores = node_config.get("distribution").get("alceml_cpu_cores")
distrib_cpu_cores = node_config.get("distribution").get("distrib_cpu_cores")
alceml_worker_cpu_cores = node_config.get("distribution").get("alceml_worker_cpu_cores")
jc_singleton_core = node_config.get("distribution").get("jc_singleton_core")
app_thread_core = node_config.get("distribution").get("app_thread_core")
jm_cpu_core = node_config.get("distribution").get("jm_cpu_core")
number_of_distribs = node_config.get("number_of_distribs")

pollers_mask = utils.generate_mask(poller_cpu_cores)
app_thread_mask = utils.generate_mask(app_thread_core)

if jc_singleton_core:
jc_singleton_mask = utils.decimal_to_hex_power_of_2(jc_singleton_core[0])
jm_cpu_mask = utils.generate_mask(jm_cpu_core)


data_nics = []

Expand Down Expand Up @@ -1805,6 +1809,25 @@ def restart_storage_node(
except Exception as e:
logger.error(e)
return False
req_cpu_count = len(utils.hexa_to_cpu_list(snode.spdk_cpu_mask))

cores = snode_api.read_allowed_list()

if len(cores) == req_cpu_count:
new_distribution = snode_api.recalculate_cores_distribution(cores, snode.number_of_alceml_devices)
poller_cpu_cores = new_distribution.get("poller_cpu_cores")
snode.alceml_cpu_cores = new_distribution.get("alceml_cpu_cores")
snode.distrib_cpu_cores = new_distribution.get("distrib_cpu_cores")
snode.alceml_worker_cpu_cores = new_distribution.get("alceml_worker_cpu_cores")
jc_singleton_core = new_distribution.get("jc_singleton_core")
app_thread_core = new_distribution.get("app_thread_core")
jm_cpu_core = new_distribution.get("jm_cpu_core")
snode.pollers_mask = utils.generate_mask(poller_cpu_cores)
snode.app_thread_mask = utils.generate_mask(app_thread_core)

if jc_singleton_core:
snode.jc_singleton_mask = utils.decimal_to_hex_power_of_2(jc_singleton_core[0])
snode.jm_cpu_mask = utils.generate_mask(jm_cpu_core)

if not results:
logger.error(f"Failed to start spdk: {err}")
Expand Down Expand Up @@ -1886,7 +1909,6 @@ def restart_storage_node(
return False

qpair = cluster.qpair_count
req_cpu_count = len(utils.hexa_to_cpu_list(snode.spdk_cpu_mask))
if cluster.fabric_tcp:
ret = rpc_client.transport_create("TCP", qpair, 512 * (req_cpu_count + 1))
if not ret:
Expand Down
57 changes: 43 additions & 14 deletions simplyblock_core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,12 @@ def calculate_pool_count(alceml_count, number_of_distribs, cpu_count, poller_cou
def calculate_minimum_hp_memory(small_pool_count, large_pool_count, lvol_count, max_prov, cpu_count):
pool_consumption = (small_pool_count * 8 + large_pool_count * 128) / 1024
memory_consumption = (4 * cpu_count + 1.1 * pool_consumption + 22 * lvol_count) * (
1024 * 1024) + constants.EXTRA_HUGE_PAGE_MEMORY
1024 * 1024) + constants.EXTRA_HUGE_PAGE_MEMORY
return int(2.0 * memory_consumption)


def calculate_minimum_sys_memory(max_prov):
minimum_sys_memory = (2000 * 1024) * convert_size(max_prov, 'GB') + 500 * 1024 * 1024
def calculate_minimum_sys_memory(ssd_list):
minimum_sys_memory = 2147483648 + get_total_capacity_of_nvme_devices(ssd_list)

logger.debug(f"Minimum system memory is {humanbytes(minimum_sys_memory)}")
return int(minimum_sys_memory)
Expand Down Expand Up @@ -737,13 +737,15 @@ def convert_size(size: Union[int, str], unit: str, round_up: bool = False) -> in
raw = size / (base ** exponent)
return math.ceil(raw) if round_up else int(raw)


def first_six_chars(s: str) -> str:
"""
Returns the first six characters of a given string.
If the string is shorter than six characters, returns the entire string.
"""
return s[:6]



def nearest_upper_power_of_2(n):
# Check if n is already a power of 2
if (n & (n - 1)) == 0:
Expand Down Expand Up @@ -1337,6 +1339,20 @@ def detect_nvmes(pci_allowed, pci_blocked, device_model, size_range, nvme_names)
return nvmes


def get_total_capacity_of_nvme_devices(pci_lst):
json_string = get_nvme_list_verbose()
data = json.loads(json_string)
total_capacity = 0
for device_entry in data.get('Devices', []):
for subsystem in device_entry.get('Subsystems', []):
for controller in subsystem.get('Controllers', []):
address = controller.get("Address")
if len(controller.get("Namespaces")) > 0 and address in pci_lst:
total_capacity = controller.get("Namespaces")[0].get("PhysicalSize")

return int(total_capacity)


def calculate_unisolated_cores(cores, cores_percentage=0):
# calculate the number if unused system cores (UnIsolated cores)
total = len(cores)
Expand All @@ -1356,10 +1372,10 @@ def get_core_indexes(core_to_index, list_of_cores):


def build_unisolated_stride(
all_cores: List[int],
num_unisolated: int,
client_qpair_count: int,
pool_stride: int = 2,
all_cores: List[int],
num_unisolated: int,
client_qpair_count: int,
pool_stride: int = 2,
) -> List[int]:
"""
Build a list of 'unisolated' CPUs by picking from per-qpair pools.
Expand Down Expand Up @@ -1397,7 +1413,7 @@ def build_unisolated_stride(

# Build pools
pool_size = math.ceil(total / client_qpair_count)
pools = [cores[i * pool_size : min((i + 1) * pool_size, total)] for i in range(client_qpair_count)]
pools = [cores[i * pool_size: min((i + 1) * pool_size, total)] for i in range(client_qpair_count)]
pools = [p for p in pools if p] # drop empties

# Per-pool index (within each pool)
Expand Down Expand Up @@ -1460,8 +1476,7 @@ def generate_core_allocation(cores_by_numa, sockets_to_use, nodes_per_socket, co
continue
all_cores = sorted(cores_by_numa[numa_node])
num_unisolated = calculate_unisolated_cores(all_cores, cores_percentage)
unisolated = build_unisolated_stride(all_cores,num_unisolated,constants.CLIENT_QPAIR_COUNT)

unisolated = build_unisolated_stride(all_cores, num_unisolated, constants.CLIENT_QPAIR_COUNT)

available_cores = [c for c in all_cores if c not in unisolated]
q1 = len(available_cores) // 4
Expand Down Expand Up @@ -1577,7 +1592,7 @@ def regenerate_config(new_config, old_config, force=False):
old_config["nodes"][i]["small_pool_count"] = small_pool_count
old_config["nodes"][i]["large_pool_count"] = large_pool_count
old_config["nodes"][i]["huge_page_memory"] = minimum_hp_memory
minimum_sys_memory = calculate_minimum_sys_memory(old_config["nodes"][i]["max_size"])
minimum_sys_memory = calculate_minimum_sys_memory(old_config["nodes"][i]["ssd_pcis"])
old_config["nodes"][i]["sys_memory"] = minimum_sys_memory

memory_details = node_utils.get_memory_details()
Expand Down Expand Up @@ -1733,7 +1748,7 @@ def generate_configs(max_lvol, max_prov, sockets_to_use, nodes_per_socket, pci_a
node_info["max_lvol"] = max_lvol
node_info["max_size"] = max_prov
node_info["huge_page_memory"] = max(minimum_hp_memory, max_prov)
minimum_sys_memory = calculate_minimum_sys_memory(max_prov)
minimum_sys_memory = calculate_minimum_sys_memory(node_info["ssd_pcis"])
node_info["sys_memory"] = minimum_sys_memory
all_nodes.append(node_info)
node_index += 1
Expand Down Expand Up @@ -2263,9 +2278,9 @@ def create_docker_service(cluster_docker: DockerClient, service_name: str, servi
"com.docker.stack.namespace": "app"}
)


def create_k8s_service(namespace: str, deployment_name: str,
container_name: str, service_file: str, container_image: str):

logger.info(f"Creating deployment: {deployment_name} in namespace {namespace}")
load_kube_config_with_fallback()
apps_v1 = client.AppsV1Api()
Expand Down Expand Up @@ -2335,6 +2350,7 @@ def create_k8s_service(namespace: str, deployment_name: str,
apps_v1.create_namespaced_deployment(namespace=namespace, body=deployment)
logger.info(f"Deployment {deployment_name} created successfully.")


def clean_partitions(nvme_device: str):
command = ['wipefs', '-a', nvme_device]
print(" ".join(command))
Expand Down Expand Up @@ -2608,6 +2624,7 @@ def clean_devices(config_path, format, force):
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON: {e}")


def create_rpc_socket_mount():
try:

Expand Down Expand Up @@ -2664,3 +2681,15 @@ def calculate_hp_only(max_lvol, number_of_devices, sockets_to_use, nodes_per_soc

node_index += 1
return convert_size(minimum_hp_memory, 'MB')

def recalculate_cores_distribution(cores, number_of_alcemls):
distribution = calculate_core_allocations(cores, number_of_alcemls)
core_to_index = {core: idx for idx, core in enumerate(cores)}
return {
"app_thread_core": get_core_indexes(core_to_index, distribution[0]),
"jm_cpu_core": get_core_indexes(core_to_index, distribution[1]),
"poller_cpu_cores": get_core_indexes(core_to_index, distribution[2]),
"alceml_cpu_cores": get_core_indexes(core_to_index, distribution[3]),
"alceml_worker_cpu_cores": get_core_indexes(core_to_index, distribution[4]),
"distrib_cpu_cores": get_core_indexes(core_to_index, distribution[5]),
"jc_singleton_core": get_core_indexes(core_to_index, distribution[6])}
41 changes: 41 additions & 0 deletions simplyblock_web/api/internal/storage_node/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,3 +718,44 @@ def ifc_is_tcp(query: NicQuery):
})
def is_alive():
return utils.get_response(True)


@api.get('/read_allowed_list', responses={
200: {'content': {'application/json': {'schema': utils.response_schema({
'type': 'object',
'additionalProperties': True,
})}}},
})
def read_allowed_list():
cores = []
try:
with open("/etc/simplyblock/allowed_list") as f:
cores = [int(line.strip()) for line in f if line.strip()]
except Exception:
return []
return cores


class CoresParams(BaseModel):
cores: Optional[List[int]] = Field(default=None)
number_of_alceml_devices: Optional[int] = Field(None, ge=0)

@api.post('/recalculate_cores_distribution', responses={
200: {'content': {'application/json': {'schema': utils.response_schema({
'type': 'boolean'
})}}},
})
def recalculate_cores_distribution(body: CoresParams):
cores = body.cores
number_of_alceml_devices = body.number_of_alceml_devices
distribution = init_utils.recalculate_cores_distribution(cores, number_of_alceml_devices)

resp = utils.get_response({
"app_thread_core": distribution["distribution"],
"jm_cpu_core": distribution["jm_cpu_core"] ,
"poller_cpu_cores": distribution["poller_cpu_cores"],
"alceml_cpu_cores": distribution["alceml_cpu_cores"],
"alceml_worker_cpu_cores": distribution["alceml_worker_cpu_cores"],
"distrib_cpu_cores": distribution["distrib_cpu_cores"],
"jc_singleton_core": distribution["jc_singleton_core"]})
return resp
Loading
Loading