feat: multi-node scale-out for SLURM deployments (image sync, manifest restore, best-CSV aggregation)#112
feat: multi-node scale-out for SLURM deployments (image sync, manifest restore, best-CSV aggregation)#112mkuznet1 wants to merge 8 commits intoROCm:developfrom
Conversation
…nvocations - core/constants.py: detect --version/--help via new _is_lightweight_cli_invocation() helper and skip MAD_SETUP_MODEL_DIR side effects; keep --version/--help output clean even when MAD_VERBOSE_CONFIG=true. - slurm job.sh.j2: run preflight 'madengine --version' and 'madengine --help' probes under env -u MODEL_DIR on both single-node and multi-node paths so the probes don't inherit MODEL_DIR and trigger file copies. Made-with: Cursor
…execution When replaying a manifest on a compute node, also hydrate docker_mounts, docker_build_arg, docker_gpus, gpu_vendor, and guest_os from the manifest context if not already present in the runtime context. Runtime-detected values keep priority; manifest values only fill in missing keys. This lets distributed runs reuse host path mounts and build args that were resolved at manifest-generation time but are not re-detected on compute nodes. Made-with: Cursor
…ss-through - Add ContainerRunner._extract_additional_mount_targets() to parse -v/--volume tokens out of model_info.additional_docker_run_options, and have get_mount_arg skip context-mounted paths whose container target is already covered, fixing "Duplicate mount point" errors from docker run. - Extend the SLURM shell-to-docker env var pass-through list with disaggregated launcher and benchmarking vars (MODEL_NAME, MODEL_DIR, xP, yD, PD_SYNC_ROOT, PD_RUN_ID, PROXY_TYPE, ROUTER_PORT, BENCHMARK_PORT, SLURM_JOB_ID, OUTPUT_DIR, BARRIER_TIMEOUT_S, PROXY_CLOSE_TIMEOUT_S, REQUIRE_RDMA, KV_UCX_TLS, KV_UCX_SOCKADDR_TLS_PRIORITY) so vLLM/sglang disagg workloads work end-to-end. - Add small ContainerRunner._get_build_args() helper that converts context.docker_build_arg into a docker --build-arg string; used by upcoming local-image build-from-manifest support. Made-with: Cursor
- Wrap the model script's 'docker exec' with a try/except RuntimeError and,
on failure, probe the failing container for a process table, listening
socket snapshot (ss / netstat / lsof), and tails of /run_logs and
/myworkspace/<model_dir> logs via Console.sh before re-raising. Each probe
is independently best-effort and never suppresses the original failure.
- Extend benign log patterns in the failure-pattern scanner with rocEnvTool
timeouts that don't affect run correctness ('RuntimeError: Console script
timeout', 'rocEnvTool/console.py', 'rocEnvTool/rocenv_tool.py').
- Stage extra workload-level artifacts (perf_*.csv, perf-*.csv,
benchmark_*_CONCURRENCY.log) from <model_dir>, <model_dir>/workdir and
/run_logs/$SLURM_JOB_ID into cwd before run_directory cleanup so SLURM
per-node result collection finds them for disagg launchers.
Made-with: Cursor
…des via TCP barrier In distributed (multi-node) runs with --manifest-file, the pre-built local image may not be present on every compute node (e.g. when MAD_CONTAINER_IMAGE was built only on the submission node and no shared image registry is used). Previously, such nodes would fall back to `docker pull`, which fails for images that exist only locally. Add a self-healing path that rebuilds the image from the manifest's Dockerfile on the current node when it is missing, and fall back to pull only if the build attempt itself fails. Introduce a TCP-based rendezvous barrier between NODE_RANK=0 and worker nodes so that all nodes finish image preparation before any container starts, without requiring shared filesystem visibility between nodes. Made-with: Cursor
…inated image preparation Extract the local-image preparation logic into a dedicated helper (_ensure_local_image_available) and add an optional shared tar cache, enabled by the MAD_DOCKER_BUILDS environment variable. * Node 0 builds (or pulls) the image, then `docker save` it into MAD_DOCKER_BUILDS/<image>.tar. * Worker nodes wait on the existing TCP rendezvous barrier and then `docker load` the tar instead of rebuilding or hitting the registry. * Without MAD_DOCKER_BUILDS, each node prepares the image independently (existing behavior). This avoids redundant `docker build` work and unnecessary registry traffic in multi-node runs, while keeping single-node behavior unchanged. Add integration tests covering the primary-saves-tar, existing-tar-is-loaded, and worker-waits-for-primary paths. Made-with: Cursor
…ulti-node runs Multi-node SLURM runs stage each node's copy of the workload-level multi-results CSV into the login node's job collection directory, but only some nodes observe the final throughput numbers and therefore populate the "performance" column. Previously, collect_results took the first matching candidate, which could be an empty file even when another node produced a complete one; and container_runner marked the result as failed on the first empty CSV it saw. * collect_results now ranks per-node candidates by the number of non-empty "performance" rows and picks the richest file. Falls back to the previous behavior when no candidate has a performance column. * container_runner no longer nulls out the performance path on an empty CSV in multi-node runs; it emits a warning instead and defers the final decision to the login-node aggregation step. Single-node runs keep the previous hard-error semantics. Made-with: Cursor
…ifest placeholders
The manifest-context restore path for docker_env_vars unconditionally
overwrote values already populated by Context from os.environ (e.g.
MAD_SECRETS_HFTOKEN exported on the submission node). When a previously
generated manifest contained an unexpanded "${MAD_SECRETS_HFTOKEN}"
literal (because the variable was not exported at manifest-creation time),
the restore step clobbered the real token on rerun and the container
received the literal placeholder, causing HF 401 on gated models
(e.g. meta-llama/Llama-3.1-8B via Primus HuggingFaceTokenizer).
Switch to the same "set only if absent" semantics we already use for
docker_mounts and docker_build_arg, so os.environ-sourced values keep
priority and stale/unexpanded placeholders from old manifests no longer
override them.
Made-with: Cursor
There was a problem hiding this comment.
Pull request overview
This PR adds missing multi-node coordination for SLURM-based distributed runs, focusing on making “local image mode” usable at scale (shared image tar caching + rendezvous barrier), improving manifest restore fidelity on workers, and making login-node result collection choose the most complete CSV across nodes.
Changes:
- Add shared local-image tar cache support (
MAD_DOCKER_BUILDS) with rank-0 save and worker load, plus a TCP rendezvous barrier to prevent prematuredocker run. - Expand manifest restore to include additional runtime context (mounts/build args/GPU config) while preventing manifest docker env vars from overwriting env-sourced secrets.
- Improve multi-node result collection by selecting the “best” multiple-results CSV (most non-empty
performancerows) and deferring empty-CSV verdicts to aggregation.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/integration/test_container_execution.py | Adds integration coverage for shared tar-cache image availability logic. |
| src/madengine/orchestration/run_orchestrator.py | Restores additional runtime context fields from manifest and avoids overwriting env-sourced docker_env_vars. |
| src/madengine/execution/container_runner.py | Implements image tar caching, TCP barrier synchronization, duplicate-mount avoidance, extended env passthrough, and extra failure diagnostics/artifact staging. |
| src/madengine/deployment/templates/slurm/job.sh.j2 | Makes madengine --help/--version probes side-effect-free by unsetting MODEL_DIR. |
| src/madengine/deployment/slurm.py | Adds “best CSV” selection for multi-node multiple_results aggregation. |
| src/madengine/core/constants.py | Skips MODEL_DIR setup and suppresses verbose config prints for --help/--version invocations. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _sync_after_local_image_ready( | ||
| self, run_image: str, timeout_s: int = 1800 | ||
| ) -> None: | ||
| """Barrier for multi-node local-image runs so all nodes continue together. | ||
|
|
||
| Relies on a TCP rendezvous between ``NODE_RANK=0`` and worker nodes so | ||
| that no shared filesystem visibility is required. No-op for single-node | ||
| runs (``NNODES<=1``). | ||
| """ | ||
| nnodes_raw = os.environ.get("NNODES") or os.environ.get("WORLD_SIZE") or "1" | ||
| node_rank = os.environ.get("NODE_RANK") or os.environ.get("RANK") or "0" | ||
| try: | ||
| nnodes = int(nnodes_raw) | ||
| except Exception: | ||
| nnodes = 1 | ||
| if nnodes <= 1: | ||
| return | ||
|
|
||
| self._tcp_image_ready_barrier( | ||
| nnodes=nnodes, | ||
| node_rank=node_rank, | ||
| timeout_s=timeout_s, | ||
| ) | ||
|
|
||
| def _tcp_image_ready_barrier( | ||
| self, nnodes: int, node_rank: str, timeout_s: int | ||
| ) -> None: | ||
| """TCP rendezvous barrier that does not require shared filesystem visibility. | ||
|
|
||
| Node 0 listens on one of ``candidate_ports`` derived from ``MASTER_PORT`` | ||
| and ``SLURM_JOB_ID``; workers send ``READY <token> <rank>`` and wait for | ||
| ``GO <token> <rank>``. The port range and token defend against multiple | ||
| concurrent jobs reusing the same master host. | ||
| """ | ||
| master_addr = os.environ.get("MASTER_ADDR", "127.0.0.1") | ||
| job_id_raw = os.environ.get("SLURM_JOB_ID", "0") | ||
| try: | ||
| job_id = int(job_id_raw) | ||
| except Exception: | ||
| job_id = 0 | ||
| token = f"JOB{job_id}" | ||
| master_port_raw = os.environ.get("MASTER_PORT", "29500") | ||
| try: | ||
| master_port = int(master_port_raw) | ||
| except Exception: | ||
| master_port = 29500 | ||
| base_port = 43000 + ((master_port + job_id) % 1000) | ||
| candidate_ports = [base_port + i for i in range(0, 16)] | ||
| deadline = time.time() + timeout_s | ||
| rank_int = int(node_rank) | ||
|
|
||
| if rank_int == 0: | ||
| accepted = 0 | ||
| peers = [] | ||
| waiting: typing.Dict[int, socket.socket] = {} | ||
| server = None | ||
| port = None | ||
| try: | ||
| bind_errors = [] | ||
| for candidate in candidate_ports: | ||
| trial = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
| try: | ||
| trial.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | ||
| trial.bind(("0.0.0.0", candidate)) | ||
| server = trial | ||
| port = candidate | ||
| break | ||
| except Exception as e: | ||
| bind_errors.append({"port": candidate, "error": str(e)}) | ||
| try: | ||
| trial.close() | ||
| except Exception: | ||
| pass | ||
| if server is None or port is None: | ||
| raise RuntimeError( | ||
| f"TCP barrier bind failed on all candidate ports: {bind_errors}" | ||
| ) | ||
| server.listen(max(1, nnodes - 1)) | ||
| server.settimeout(2.0) | ||
| while accepted < max(0, nnodes - 1) and time.time() < deadline: | ||
| try: | ||
| conn, addr = server.accept() | ||
| conn.settimeout(2.0) | ||
| payload = conn.recv(128).decode("utf-8", errors="ignore").strip() | ||
| parts = payload.split() | ||
| if len(parts) != 3 or parts[0] != "READY" or parts[1] != token: | ||
| conn.close() | ||
| continue | ||
| try: | ||
| worker_rank = int(parts[2]) | ||
| except Exception: | ||
| conn.close() | ||
| continue | ||
| if worker_rank <= 0 or worker_rank >= nnodes: | ||
| conn.close() | ||
| continue | ||
| if worker_rank in waiting: | ||
| try: | ||
| waiting[worker_rank].close() | ||
| except Exception: | ||
| pass | ||
| waiting[worker_rank] = conn | ||
| peers.append(f"{addr[0]}:r{worker_rank}") | ||
| accepted = len(waiting) | ||
| except socket.timeout: | ||
| continue | ||
| if accepted < max(0, nnodes - 1): | ||
| raise RuntimeError( | ||
| f"TCP barrier timeout on master: accepted={accepted}/" | ||
| f"{max(0, nnodes - 1)} port={port}" | ||
| ) | ||
| for worker_rank, conn in waiting.items(): | ||
| try: | ||
| conn.sendall(f"GO {token} {worker_rank}\n".encode("utf-8")) | ||
| finally: | ||
| try: | ||
| conn.close() | ||
| except Exception: | ||
| pass | ||
| return | ||
| finally: | ||
| try: | ||
| if server is not None: | ||
| server.close() | ||
| except Exception: | ||
| pass | ||
|
|
||
| last_error = "" | ||
| connect_attempts = 0 | ||
| while time.time() < deadline: | ||
| for candidate in candidate_ports: | ||
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
| connect_attempts += 1 | ||
| try: | ||
| sock.settimeout(1.5) | ||
| sock.connect((master_addr, candidate)) | ||
| sock.sendall(f"READY {token} {node_rank}\n".encode("utf-8")) | ||
| remaining_s = max(1.0, deadline - time.time()) | ||
| sock.settimeout(remaining_s) | ||
| ack = sock.recv(128).decode("utf-8", errors="ignore").strip() | ||
| if ack == f"GO {token} {node_rank}": | ||
| return | ||
| last_error = f"unexpected_ack={ack!r} port={candidate}" | ||
| except Exception as e: | ||
| last_error = f"{e} port={candidate}" | ||
| finally: | ||
| try: | ||
| sock.close() | ||
| except Exception: | ||
| pass | ||
| time.sleep(1) | ||
|
|
||
| raise RuntimeError( | ||
| f"TCP barrier timeout on worker rank={node_rank} master={master_addr} " | ||
| f"ports={candidate_ports} attempts={connect_attempts} last_error={last_error}" | ||
| ) |
There was a problem hiding this comment.
The new TCP barrier logic is non-trivial (socket bind/accept/connect loops, tokening, timeouts) but there are no tests covering it. Since tests were added for the tar-cache paths, it would be good to add unit tests that validate the barrier handshake and timeout behavior (can be done with socket mocks or loopback sockets).
| for row in reader: | ||
| total_rows += 1 | ||
| if has_perf_column: | ||
| value = (row.get("performance") or "").strip() |
There was a problem hiding this comment.
_select_best_multiple_results_csv() strips whitespace from fieldnames to detect the presence of a "performance" column, but then reads values with row.get("performance"). If the actual CSV header is " performance" (or otherwise differs), this will incorrectly treat all rows as empty. Consider normalizing row keys (similar to the later {k.strip(): v for k,v in row.items()} logic in collect_results) or tracking the exact header name that matched after stripping.
| value = (row.get("performance") or "").strip() | |
| normalized_row = { | |
| (k.strip() if isinstance(k, str) else k): v | |
| for k, v in row.items() | |
| } | |
| value = (normalized_row.get("performance") or "").strip() |
| if "encapsulate_script" in manifest_context: | ||
| self.context.ctx["encapsulate_script"] = manifest_context["encapsulate_script"] | ||
| # Restore docker_env_vars from build context (e.g. MAD_SECRET_HFTOKEN for Primus HF-backed configs) | ||
| # Restore docker_env_vars from build context (e.g. MAD_SECRET_HFTOKEN for Primus HF-backed configs). |
There was a problem hiding this comment.
Comment refers to MAD_SECRET_HFTOKEN, but Context appears to pass through secrets using the MAD_SECRETS* prefix (e.g., MAD_SECRETS_HFTOKEN). Updating the comment would avoid confusion when debugging manifest restores.
| # Restore docker_env_vars from build context (e.g. MAD_SECRET_HFTOKEN for Primus HF-backed configs). | |
| # Restore docker_env_vars from build context (e.g. MAD_SECRETS_HFTOKEN for Primus HF-backed configs). |
| return "" | ||
| build_args = "" | ||
| for key, value in docker_build_arg.items(): | ||
| build_args += f"--build-arg {key}='{value}' " |
There was a problem hiding this comment.
_get_build_args() interpolates build-arg keys/values into a shell command using single quotes, but values containing quotes/whitespace/$ will break the command (and Console.sh runs with shell=True). Build args should be shell-escaped (e.g., shlex.quote) or constructed as a list passed to subprocess without a shell.
| build_args += f"--build-arg {key}='{value}' " | |
| build_arg = f"{key}={value}" | |
| build_args += f"--build-arg {shlex.quote(build_arg)} " |
| build_args = self._get_build_args() | ||
| build_command = ( | ||
| f"docker build --network=host -t {run_image} --pull -f {dockerfile} " | ||
| f"{build_args}{docker_context}" | ||
| ) |
There was a problem hiding this comment.
_build_local_image_from_manifest() builds a docker build ... command by concatenating unquoted run_image, dockerfile, and docker_context. Since Console.sh executes via shell=True, paths with spaces will fail and untrusted manifest values could lead to shell injection. Please quote these components (shlex.quote) or avoid the shell by executing an argv list.
| base_port = 43000 + ((master_port + job_id) % 1000) | ||
| candidate_ports = [base_port + i for i in range(0, 16)] | ||
| deadline = time.time() + timeout_s | ||
| rank_int = int(node_rank) |
There was a problem hiding this comment.
_sync_after_local_image_ready() passes node_rank through as a string and _tcp_image_ready_barrier() later does int(node_rank) without guarding. If NODE_RANK/RANK is unset/malformed, this will raise and abort the run. Consider reusing _get_node_rank() (int) here or wrapping the conversion with a fallback similar to nnodes parsing.
| rank_int = int(node_rank) | |
| try: | |
| rank_int = int(node_rank) | |
| except Exception: | |
| rank_int = 0 |
| master_addr = os.environ.get("MASTER_ADDR", "127.0.0.1") | ||
| job_id_raw = os.environ.get("SLURM_JOB_ID", "0") | ||
| try: | ||
| job_id = int(job_id_raw) | ||
| except Exception: | ||
| job_id = 0 | ||
| token = f"JOB{job_id}" | ||
| master_port_raw = os.environ.get("MASTER_PORT", "29500") | ||
| try: | ||
| master_port = int(master_port_raw) | ||
| except Exception: | ||
| master_port = 29500 | ||
| base_port = 43000 + ((master_port + job_id) % 1000) | ||
| candidate_ports = [base_port + i for i in range(0, 16)] |
There was a problem hiding this comment.
Barrier token is currently derived only from SLURM_JOB_ID (token = f"JOB{job_id}") and the server binds to 0.0.0.0 on a predictable port range. On shared clusters this makes it feasible for other users/processes to spoof READY/GO messages and interfere with synchronization. Consider allowing an explicit secret token via an env var (generated once in the job script and exported to all ranks) and/or binding specifically to the MASTER_ADDR interface.
| self._ensure_local_image_available( | ||
| run_image=run_image, | ||
| build_info=build_info, | ||
| model_info=model_info, | ||
| ) | ||
| # Ensure all nodes reach this point before entering container run, | ||
| # otherwise workers may start while node 0 is still building / loading. | ||
| self._sync_after_local_image_ready(run_image=run_image) |
There was a problem hiding this comment.
When MAD_DOCKER_BUILDS tar is missing, _ensure_local_image_available() already calls _sync_after_local_image_ready(), and run_models_from_manifest() calls the same barrier again immediately after. This creates a redundant second rendezvous in the slow-path and increases the chance of a flaky timeout. Consider having a single barrier location (either inside _ensure_local_image_available or in the caller) and calling it only once per image.
| self._ensure_local_image_available( | |
| run_image=run_image, | |
| build_info=build_info, | |
| model_info=model_info, | |
| ) | |
| # Ensure all nodes reach this point before entering container run, | |
| # otherwise workers may start while node 0 is still building / loading. | |
| self._sync_after_local_image_ready(run_image=run_image) | |
| # _ensure_local_image_available() is responsible for any required | |
| # image-readiness synchronization, so avoid a second rendezvous here. | |
| self._ensure_local_image_available( | |
| run_image=run_image, | |
| build_info=build_info, | |
| model_info=model_info, | |
| ) |
Summary
Closes the multi-node coordination gap in the SLURM path. The upstream refactor
made madengine multi-node observable (per-rank log staging, login-node
collect_results, partial manifest restore). It did not yet coordinatewhich node owns the image, when workers can enter
docker run,whose CSV the aggregator should pick, and which manifest fields a worker
needs to reconstruct the builder's environment.
This PR adds that coordination, plus a handful of single-node issues that
only hurt under scale-out. Single-node behavior is unchanged — every new
path is gated on
MAD_DOCKER_BUILDS,NNODES > 1, manifest presence, orfailure flow.
What's new
Multi-node coordination
MAD_DOCKER_BUILDS): rank 0docker saves,workers
docker load. No per-node rebuilds or registry re-pulls.docker runagainst a half-loaded image. No shared FS required.collect_results(ranks candidatesby non-empty
performancerows). Multi-node perf validation defers theempty-CSV verdict to the login-node step instead of failing the first rank.
docker_mounts,docker_build_arg,docker_gpus,gpu_vendor,guest_os— runtime values keep priority.Bug fix on the restore path
docker_env_varsrestore no longer overwritesos.environ-sourced values(e.g.
MAD_SECRETS_HFTOKEN) with unexpanded${VAR}literals from anold manifest. Fixes HF 401 on Primus / gated models on rerun.
Single-node issues exposed by scale-out
madengine --version/--helpare side-effect-free (MAD_SETUP_MODEL_DIRis skipped; SLURM preflight probes run under
env -u MODEL_DIR). No moreGBs × N hosts on every job.
-vinadditional_docker_run_options.(
xP,yD,PROXY_TYPE,PD_SYNC_ROOT,BARRIER_TIMEOUT_S,REQUIRE_RDMA,MODEL_NAME,MODEL_DIR, …).ps, sockets (ss/netstat/lsof),log tails of
/run_logsand/myworkspace/<model_dir>captured ondocker execfailure;rocEnvToolconsole timeouts added to thebenign-pattern list; failure-path artifact preservation for
perf_*.csv,perf-*.csv,benchmark_*_CONCURRENCY.log.Validation
End-to-end on a real SLURM cluster (
oci_cx7), three RDMA-enabled workloads:MLPerf Llama-3.1 8B training (2 × 8 GPUs, 8 IB HCAs), Primus Megatron-LM
Llama-3.1 8B training (2 × 8 GPUs), vLLM disaggregated inference Llama-3.1 8B
(3 × 8 GPUs, UCX RDMA-CM). New integration tests cover the shared-tar-cache
paths (primary-saves, existing-tar-is-loaded, worker-waits-for-primary).