Skip to content

feat: multi-node scale-out for SLURM deployments (image sync, manifest restore, best-CSV aggregation)#112

Open
mkuznet1 wants to merge 8 commits intoROCm:developfrom
mkuznet1:pr-aicomnet-squash
Open

feat: multi-node scale-out for SLURM deployments (image sync, manifest restore, best-CSV aggregation)#112
mkuznet1 wants to merge 8 commits intoROCm:developfrom
mkuznet1:pr-aicomnet-squash

Conversation

@mkuznet1
Copy link
Copy Markdown

Summary

Closes the multi-node coordination gap in the SLURM path. The upstream refactor
made madengine multi-node observable (per-rank log staging, login-node
collect_results, partial manifest restore). It did not yet coordinate
which node owns the image, when workers can enter docker run,
whose CSV the aggregator should pick, and which manifest fields a worker
needs to reconstruct the builder's environment.
This PR adds that coordination, plus a handful of single-node issues that
only hurt under scale-out. Single-node behavior is unchanged — every new
path is gated on MAD_DOCKER_BUILDS, NNODES > 1, manifest presence, or
failure flow.

What's new

Multi-node coordination

  • Shared local-image tar cache (MAD_DOCKER_BUILDS): rank 0 docker saves,
    workers docker load. No per-node rebuilds or registry re-pulls.
  • TCP rendezvous barrier between rank 0 and workers so no one enters
    docker run against a half-loaded image. No shared FS required.
  • Rebuild-from-manifest fallback on workers where the local image is missing.
  • Best-match multi-node CSV picker in collect_results (ranks candidates
    by non-empty performance rows). Multi-node perf validation defers the
    empty-CSV verdict to the login-node step instead of failing the first rank.
  • Manifest restore on workers now covers docker_mounts, docker_build_arg,
    docker_gpus, gpu_vendor, guest_os — runtime values keep priority.
    Bug fix on the restore path
  • docker_env_vars restore no longer overwrites os.environ-sourced values
    (e.g. MAD_SECRETS_HFTOKEN) with unexpanded ${VAR} literals from an
    old manifest. Fixes HF 401 on Primus / gated models on rerun.
    Single-node issues exposed by scale-out
  • madengine --version / --help are side-effect-free (MAD_SETUP_MODEL_DIR
    is skipped; SLURM preflight probes run under env -u MODEL_DIR). No more
    GBs × N hosts on every job.
  • Duplicate-mount protection when a model supplies its own -v in
    additional_docker_run_options.
  • Expanded SLURM → Docker env pass-through for vLLM/SGLang disagg and Primus
    (xP, yD, PROXY_TYPE, PD_SYNC_ROOT, BARRIER_TIMEOUT_S,
    REQUIRE_RDMA, MODEL_NAME, MODEL_DIR, …).
  • Container-failure diagnostics: ps, sockets (ss/netstat/lsof),
    log tails of /run_logs and /myworkspace/<model_dir> captured on
    docker exec failure; rocEnvTool console timeouts added to the
    benign-pattern list; failure-path artifact preservation for
    perf_*.csv, perf-*.csv, benchmark_*_CONCURRENCY.log.

Validation

End-to-end on a real SLURM cluster (oci_cx7), three RDMA-enabled workloads:
MLPerf Llama-3.1 8B training (2 × 8 GPUs, 8 IB HCAs), Primus Megatron-LM
Llama-3.1 8B training (2 × 8 GPUs), vLLM disaggregated inference Llama-3.1 8B
(3 × 8 GPUs, UCX RDMA-CM). New integration tests cover the shared-tar-cache
paths (primary-saves, existing-tar-is-loaded, worker-waits-for-primary).

…nvocations

- core/constants.py: detect --version/--help via new _is_lightweight_cli_invocation()
  helper and skip MAD_SETUP_MODEL_DIR side effects; keep --version/--help output
  clean even when MAD_VERBOSE_CONFIG=true.
- slurm job.sh.j2: run preflight 'madengine --version' and 'madengine --help'
  probes under env -u MODEL_DIR on both single-node and multi-node paths so the
  probes don't inherit MODEL_DIR and trigger file copies.

Made-with: Cursor
…execution

When replaying a manifest on a compute node, also hydrate docker_mounts,
docker_build_arg, docker_gpus, gpu_vendor, and guest_os from the manifest
context if not already present in the runtime context. Runtime-detected
values keep priority; manifest values only fill in missing keys. This lets
distributed runs reuse host path mounts and build args that were resolved
at manifest-generation time but are not re-detected on compute nodes.

Made-with: Cursor
…ss-through

- Add ContainerRunner._extract_additional_mount_targets() to parse -v/--volume
  tokens out of model_info.additional_docker_run_options, and have get_mount_arg
  skip context-mounted paths whose container target is already covered, fixing
  "Duplicate mount point" errors from docker run.
- Extend the SLURM shell-to-docker env var pass-through list with disaggregated
  launcher and benchmarking vars (MODEL_NAME, MODEL_DIR, xP, yD, PD_SYNC_ROOT,
  PD_RUN_ID, PROXY_TYPE, ROUTER_PORT, BENCHMARK_PORT, SLURM_JOB_ID, OUTPUT_DIR,
  BARRIER_TIMEOUT_S, PROXY_CLOSE_TIMEOUT_S, REQUIRE_RDMA, KV_UCX_TLS,
  KV_UCX_SOCKADDR_TLS_PRIORITY) so vLLM/sglang disagg workloads work end-to-end.
- Add small ContainerRunner._get_build_args() helper that converts
  context.docker_build_arg into a docker --build-arg string; used by upcoming
  local-image build-from-manifest support.

Made-with: Cursor
- Wrap the model script's 'docker exec' with a try/except RuntimeError and,
  on failure, probe the failing container for a process table, listening
  socket snapshot (ss / netstat / lsof), and tails of /run_logs and
  /myworkspace/<model_dir> logs via Console.sh before re-raising. Each probe
  is independently best-effort and never suppresses the original failure.
- Extend benign log patterns in the failure-pattern scanner with rocEnvTool
  timeouts that don't affect run correctness ('RuntimeError: Console script
  timeout', 'rocEnvTool/console.py', 'rocEnvTool/rocenv_tool.py').
- Stage extra workload-level artifacts (perf_*.csv, perf-*.csv,
  benchmark_*_CONCURRENCY.log) from <model_dir>, <model_dir>/workdir and
  /run_logs/$SLURM_JOB_ID into cwd before run_directory cleanup so SLURM
  per-node result collection finds them for disagg launchers.

Made-with: Cursor
…des via TCP barrier

In distributed (multi-node) runs with --manifest-file, the pre-built
local image may not be present on every compute node (e.g. when
MAD_CONTAINER_IMAGE was built only on the submission node and no shared
image registry is used). Previously, such nodes would fall back to
`docker pull`, which fails for images that exist only locally.

Add a self-healing path that rebuilds the image from the manifest's
Dockerfile on the current node when it is missing, and fall back to
pull only if the build attempt itself fails. Introduce a TCP-based
rendezvous barrier between NODE_RANK=0 and worker nodes so that all
nodes finish image preparation before any container starts, without
requiring shared filesystem visibility between nodes.

Made-with: Cursor
…inated image preparation

Extract the local-image preparation logic into a dedicated helper
(_ensure_local_image_available) and add an optional shared tar cache,
enabled by the MAD_DOCKER_BUILDS environment variable.

* Node 0 builds (or pulls) the image, then `docker save` it into
  MAD_DOCKER_BUILDS/<image>.tar.
* Worker nodes wait on the existing TCP rendezvous barrier and then
  `docker load` the tar instead of rebuilding or hitting the registry.
* Without MAD_DOCKER_BUILDS, each node prepares the image independently
  (existing behavior).

This avoids redundant `docker build` work and unnecessary registry
traffic in multi-node runs, while keeping single-node behavior
unchanged. Add integration tests covering the primary-saves-tar,
existing-tar-is-loaded, and worker-waits-for-primary paths.

Made-with: Cursor
…ulti-node runs

Multi-node SLURM runs stage each node's copy of the workload-level
multi-results CSV into the login node's job collection directory, but
only some nodes observe the final throughput numbers and therefore
populate the "performance" column. Previously, collect_results took
the first matching candidate, which could be an empty file even when
another node produced a complete one; and container_runner marked
the result as failed on the first empty CSV it saw.

* collect_results now ranks per-node candidates by the number of
  non-empty "performance" rows and picks the richest file. Falls back
  to the previous behavior when no candidate has a performance column.
* container_runner no longer nulls out the performance path on an empty
  CSV in multi-node runs; it emits a warning instead and defers the
  final decision to the login-node aggregation step. Single-node runs
  keep the previous hard-error semantics.

Made-with: Cursor
…ifest placeholders

The manifest-context restore path for docker_env_vars unconditionally
overwrote values already populated by Context from os.environ (e.g.
MAD_SECRETS_HFTOKEN exported on the submission node). When a previously
generated manifest contained an unexpanded "${MAD_SECRETS_HFTOKEN}"
literal (because the variable was not exported at manifest-creation time),
the restore step clobbered the real token on rerun and the container
received the literal placeholder, causing HF 401 on gated models
(e.g. meta-llama/Llama-3.1-8B via Primus HuggingFaceTokenizer).

Switch to the same "set only if absent" semantics we already use for
docker_mounts and docker_build_arg, so os.environ-sourced values keep
priority and stale/unexpanded placeholders from old manifests no longer
override them.

Made-with: Cursor
@mkuznet1 mkuznet1 self-assigned this Apr 23, 2026
Copilot AI review requested due to automatic review settings April 23, 2026 15:56
@mkuznet1 mkuznet1 requested a review from gargrahul as a code owner April 23, 2026 15:56
@mkuznet1 mkuznet1 added the enhancement New feature or request label Apr 23, 2026
@mkuznet1 mkuznet1 added the help wanted Extra attention is needed label Apr 23, 2026
@mkuznet1 mkuznet1 requested a review from Cemberk as a code owner April 23, 2026 15:56
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds missing multi-node coordination for SLURM-based distributed runs, focusing on making “local image mode” usable at scale (shared image tar caching + rendezvous barrier), improving manifest restore fidelity on workers, and making login-node result collection choose the most complete CSV across nodes.

Changes:

  • Add shared local-image tar cache support (MAD_DOCKER_BUILDS) with rank-0 save and worker load, plus a TCP rendezvous barrier to prevent premature docker run.
  • Expand manifest restore to include additional runtime context (mounts/build args/GPU config) while preventing manifest docker env vars from overwriting env-sourced secrets.
  • Improve multi-node result collection by selecting the “best” multiple-results CSV (most non-empty performance rows) and deferring empty-CSV verdicts to aggregation.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
tests/integration/test_container_execution.py Adds integration coverage for shared tar-cache image availability logic.
src/madengine/orchestration/run_orchestrator.py Restores additional runtime context fields from manifest and avoids overwriting env-sourced docker_env_vars.
src/madengine/execution/container_runner.py Implements image tar caching, TCP barrier synchronization, duplicate-mount avoidance, extended env passthrough, and extra failure diagnostics/artifact staging.
src/madengine/deployment/templates/slurm/job.sh.j2 Makes madengine --help/--version probes side-effect-free by unsetting MODEL_DIR.
src/madengine/deployment/slurm.py Adds “best CSV” selection for multi-node multiple_results aggregation.
src/madengine/core/constants.py Skips MODEL_DIR setup and suppresses verbose config prints for --help/--version invocations.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +815 to +970
def _sync_after_local_image_ready(
self, run_image: str, timeout_s: int = 1800
) -> None:
"""Barrier for multi-node local-image runs so all nodes continue together.

Relies on a TCP rendezvous between ``NODE_RANK=0`` and worker nodes so
that no shared filesystem visibility is required. No-op for single-node
runs (``NNODES<=1``).
"""
nnodes_raw = os.environ.get("NNODES") or os.environ.get("WORLD_SIZE") or "1"
node_rank = os.environ.get("NODE_RANK") or os.environ.get("RANK") or "0"
try:
nnodes = int(nnodes_raw)
except Exception:
nnodes = 1
if nnodes <= 1:
return

self._tcp_image_ready_barrier(
nnodes=nnodes,
node_rank=node_rank,
timeout_s=timeout_s,
)

def _tcp_image_ready_barrier(
self, nnodes: int, node_rank: str, timeout_s: int
) -> None:
"""TCP rendezvous barrier that does not require shared filesystem visibility.

Node 0 listens on one of ``candidate_ports`` derived from ``MASTER_PORT``
and ``SLURM_JOB_ID``; workers send ``READY <token> <rank>`` and wait for
``GO <token> <rank>``. The port range and token defend against multiple
concurrent jobs reusing the same master host.
"""
master_addr = os.environ.get("MASTER_ADDR", "127.0.0.1")
job_id_raw = os.environ.get("SLURM_JOB_ID", "0")
try:
job_id = int(job_id_raw)
except Exception:
job_id = 0
token = f"JOB{job_id}"
master_port_raw = os.environ.get("MASTER_PORT", "29500")
try:
master_port = int(master_port_raw)
except Exception:
master_port = 29500
base_port = 43000 + ((master_port + job_id) % 1000)
candidate_ports = [base_port + i for i in range(0, 16)]
deadline = time.time() + timeout_s
rank_int = int(node_rank)

if rank_int == 0:
accepted = 0
peers = []
waiting: typing.Dict[int, socket.socket] = {}
server = None
port = None
try:
bind_errors = []
for candidate in candidate_ports:
trial = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
trial.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
trial.bind(("0.0.0.0", candidate))
server = trial
port = candidate
break
except Exception as e:
bind_errors.append({"port": candidate, "error": str(e)})
try:
trial.close()
except Exception:
pass
if server is None or port is None:
raise RuntimeError(
f"TCP barrier bind failed on all candidate ports: {bind_errors}"
)
server.listen(max(1, nnodes - 1))
server.settimeout(2.0)
while accepted < max(0, nnodes - 1) and time.time() < deadline:
try:
conn, addr = server.accept()
conn.settimeout(2.0)
payload = conn.recv(128).decode("utf-8", errors="ignore").strip()
parts = payload.split()
if len(parts) != 3 or parts[0] != "READY" or parts[1] != token:
conn.close()
continue
try:
worker_rank = int(parts[2])
except Exception:
conn.close()
continue
if worker_rank <= 0 or worker_rank >= nnodes:
conn.close()
continue
if worker_rank in waiting:
try:
waiting[worker_rank].close()
except Exception:
pass
waiting[worker_rank] = conn
peers.append(f"{addr[0]}:r{worker_rank}")
accepted = len(waiting)
except socket.timeout:
continue
if accepted < max(0, nnodes - 1):
raise RuntimeError(
f"TCP barrier timeout on master: accepted={accepted}/"
f"{max(0, nnodes - 1)} port={port}"
)
for worker_rank, conn in waiting.items():
try:
conn.sendall(f"GO {token} {worker_rank}\n".encode("utf-8"))
finally:
try:
conn.close()
except Exception:
pass
return
finally:
try:
if server is not None:
server.close()
except Exception:
pass

last_error = ""
connect_attempts = 0
while time.time() < deadline:
for candidate in candidate_ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connect_attempts += 1
try:
sock.settimeout(1.5)
sock.connect((master_addr, candidate))
sock.sendall(f"READY {token} {node_rank}\n".encode("utf-8"))
remaining_s = max(1.0, deadline - time.time())
sock.settimeout(remaining_s)
ack = sock.recv(128).decode("utf-8", errors="ignore").strip()
if ack == f"GO {token} {node_rank}":
return
last_error = f"unexpected_ack={ack!r} port={candidate}"
except Exception as e:
last_error = f"{e} port={candidate}"
finally:
try:
sock.close()
except Exception:
pass
time.sleep(1)

raise RuntimeError(
f"TCP barrier timeout on worker rank={node_rank} master={master_addr} "
f"ports={candidate_ports} attempts={connect_attempts} last_error={last_error}"
)
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new TCP barrier logic is non-trivial (socket bind/accept/connect loops, tokening, timeouts) but there are no tests covering it. Since tests were added for the tar-cache paths, it would be good to add unit tests that validate the barrier handshake and timeout behavior (can be done with socket mocks or loopback sockets).

Copilot uses AI. Check for mistakes.
for row in reader:
total_rows += 1
if has_perf_column:
value = (row.get("performance") or "").strip()
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_select_best_multiple_results_csv() strips whitespace from fieldnames to detect the presence of a "performance" column, but then reads values with row.get("performance"). If the actual CSV header is " performance" (or otherwise differs), this will incorrectly treat all rows as empty. Consider normalizing row keys (similar to the later {k.strip(): v for k,v in row.items()} logic in collect_results) or tracking the exact header name that matched after stripping.

Suggested change
value = (row.get("performance") or "").strip()
normalized_row = {
(k.strip() if isinstance(k, str) else k): v
for k, v in row.items()
}
value = (normalized_row.get("performance") or "").strip()

Copilot uses AI. Check for mistakes.
if "encapsulate_script" in manifest_context:
self.context.ctx["encapsulate_script"] = manifest_context["encapsulate_script"]
# Restore docker_env_vars from build context (e.g. MAD_SECRET_HFTOKEN for Primus HF-backed configs)
# Restore docker_env_vars from build context (e.g. MAD_SECRET_HFTOKEN for Primus HF-backed configs).
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment refers to MAD_SECRET_HFTOKEN, but Context appears to pass through secrets using the MAD_SECRETS* prefix (e.g., MAD_SECRETS_HFTOKEN). Updating the comment would avoid confusion when debugging manifest restores.

Suggested change
# Restore docker_env_vars from build context (e.g. MAD_SECRET_HFTOKEN for Primus HF-backed configs).
# Restore docker_env_vars from build context (e.g. MAD_SECRETS_HFTOKEN for Primus HF-backed configs).

Copilot uses AI. Check for mistakes.
return ""
build_args = ""
for key, value in docker_build_arg.items():
build_args += f"--build-arg {key}='{value}' "
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_build_args() interpolates build-arg keys/values into a shell command using single quotes, but values containing quotes/whitespace/$ will break the command (and Console.sh runs with shell=True). Build args should be shell-escaped (e.g., shlex.quote) or constructed as a list passed to subprocess without a shell.

Suggested change
build_args += f"--build-arg {key}='{value}' "
build_arg = f"{key}={value}"
build_args += f"--build-arg {shlex.quote(build_arg)} "

Copilot uses AI. Check for mistakes.
Comment on lines +798 to +802
build_args = self._get_build_args()
build_command = (
f"docker build --network=host -t {run_image} --pull -f {dockerfile} "
f"{build_args}{docker_context}"
)
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_build_local_image_from_manifest() builds a docker build ... command by concatenating unquoted run_image, dockerfile, and docker_context. Since Console.sh executes via shell=True, paths with spaces will fail and untrusted manifest values could lead to shell injection. Please quote these components (shlex.quote) or avoid the shell by executing an argv list.

Copilot uses AI. Check for mistakes.
base_port = 43000 + ((master_port + job_id) % 1000)
candidate_ports = [base_port + i for i in range(0, 16)]
deadline = time.time() + timeout_s
rank_int = int(node_rank)
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_sync_after_local_image_ready() passes node_rank through as a string and _tcp_image_ready_barrier() later does int(node_rank) without guarding. If NODE_RANK/RANK is unset/malformed, this will raise and abort the run. Consider reusing _get_node_rank() (int) here or wrapping the conversion with a fallback similar to nnodes parsing.

Suggested change
rank_int = int(node_rank)
try:
rank_int = int(node_rank)
except Exception:
rank_int = 0

Copilot uses AI. Check for mistakes.
Comment on lines +849 to +862
master_addr = os.environ.get("MASTER_ADDR", "127.0.0.1")
job_id_raw = os.environ.get("SLURM_JOB_ID", "0")
try:
job_id = int(job_id_raw)
except Exception:
job_id = 0
token = f"JOB{job_id}"
master_port_raw = os.environ.get("MASTER_PORT", "29500")
try:
master_port = int(master_port_raw)
except Exception:
master_port = 29500
base_port = 43000 + ((master_port + job_id) % 1000)
candidate_ports = [base_port + i for i in range(0, 16)]
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Barrier token is currently derived only from SLURM_JOB_ID (token = f"JOB{job_id}") and the server binds to 0.0.0.0 on a predictable port range. On shared clusters this makes it feasible for other users/processes to spoof READY/GO messages and interfere with synchronization. Consider allowing an explicit secret token via an env var (generated once in the job script and exported to all ranks) and/or binding specifically to the MASTER_ADDR interface.

Copilot uses AI. Check for mistakes.
Comment on lines +2264 to +2271
self._ensure_local_image_available(
run_image=run_image,
build_info=build_info,
model_info=model_info,
)
# Ensure all nodes reach this point before entering container run,
# otherwise workers may start while node 0 is still building / loading.
self._sync_after_local_image_ready(run_image=run_image)
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When MAD_DOCKER_BUILDS tar is missing, _ensure_local_image_available() already calls _sync_after_local_image_ready(), and run_models_from_manifest() calls the same barrier again immediately after. This creates a redundant second rendezvous in the slow-path and increases the chance of a flaky timeout. Consider having a single barrier location (either inside _ensure_local_image_available or in the caller) and calling it only once per image.

Suggested change
self._ensure_local_image_available(
run_image=run_image,
build_info=build_info,
model_info=model_info,
)
# Ensure all nodes reach this point before entering container run,
# otherwise workers may start while node 0 is still building / loading.
self._sync_after_local_image_ready(run_image=run_image)
# _ensure_local_image_available() is responsible for any required
# image-readiness synchronization, so avoid a second rendezvous here.
self._ensure_local_image_available(
run_image=run_image,
build_info=build_info,
model_info=model_info,
)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request help wanted Extra attention is needed

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants