Skip to content

add elastic scheduling and make the system more stable#969

Merged
helloyongyang merged 1 commit intoModelTC:mainfrom
zhtshr:zht_dev
Mar 31, 2026
Merged

add elastic scheduling and make the system more stable#969
helloyongyang merged 1 commit intoModelTC:mainfrom
zhtshr:zht_dev

Conversation

@zhtshr
Copy link
Copy Markdown
Contributor

@zhtshr zhtshr commented Mar 31, 2026

This pull request introduces several significant enhancements and new features across the disaggregated service framework, primarily focusing on improved RDMA atomic operation support, dynamic instance management in the controller, and monitoring extensibility. The most impactful changes include full support for RDMA atomic verbs (fetch-and-add and compare-and-swap), controller logic for dynamic GPU/instance lifecycle management, and new hooks for reporting custom metrics. Additionally, there are improvements to configuration flexibility and command-line overrides.

RDMA atomic operations and server/client enhancements:

  • Added true remote atomic fetch-and-add (rdma_faa) and compare-and-swap (rdma_cas) operations to RDMAClient, including support for the corresponding RDMA opcodes and access flags. Both client and server now register memory regions with REMOTE_ATOMIC access, and QP attributes are updated accordingly. This enables real atomic operations over RDMA, replacing previous best-effort shims. [1] [2] [3] [4] [5] [6]

  • Updated example/test code to demonstrate usage of the new atomic RDMA operations, including writing, reading, fetch-and-add, and compare-and-swap.

Controller instance lifecycle and scheduling:

  • Implemented dynamic GPU/instance management in the controller, including:

    • Per-GPU scheduling, cooldown/reuse logic, and idle pool management.
    • Methods to create and reclaim encoder/transformer/decoder service instances as subprocesses, with robust port/state checks and error handling.
    • Support for launching subprocesses with correct CUDA device visibility and configuration.
  • Added helper methods for mapping between instance addresses and monitor nodes, and for recursively converting configuration objects to plain Python types.

Monitoring and metrics extensibility:

  • Added support for registering an extra metrics provider in the Monitor class, allowing injection of custom metrics into the monitoring output in a thread-safe manner. [1] [2]

Configuration and CLI improvements:

  • Added a ranks field to the disaggregation config for explicit rank/GPU count configuration.
  • Introduced an --engine_rank command-line argument to override engine rank for service roles, and logic to apply this override in run_service.py. [1] [2]

Other improvements:

  • Miscellaneous: Added missing imports and typing annotations in the controller for robustness.

These changes collectively enable more robust, flexible, and scalable operation of the disaggregated video service platform.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces automated instance management and auto-scaling to the disaggregated service architecture. Key changes include a new controller for dynamic lifecycle management of service instances, hardware-supported RDMA atomic operations, and enhanced monitoring with internal queue metrics. Feedback identifies a potential deadlock in the controller's locking strategy, recommending reentrant locks, and points out an indexing mismatch in output file naming that could break test scripts. Suggestions were also made to improve subprocess logging and make backpressure thresholds configurable.

self._rdma_handshake_thread_request: Thread | None = None
self._rdma_handshake_thread_phase1: Thread | None = None
self._rdma_handshake_thread_phase2: Thread | None = None
self._instance_lock = Lock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The _instance_lock must be a reentrant lock (RLock). The _monitor_callback acquires this lock and then calls reclaim_instance or create_instance, both of which attempt to acquire the same lock. Using a non-reentrant Lock will cause a deadlock in the monitor thread.

Suggested change
self._instance_lock = Lock()
self._instance_lock = RLock()

Comment on lines +497 to +519
with self._instance_lock:
service_instance_count = sum(1 for meta in self._managed_instances.values() if meta.get("instance_type") == service_type)

queues_empty_for_service = bool(low_metric.get("all_queues_empty", False)) and int(low_metric.get("queue_total_pending", -1)) == 0

if low_utilization < scale_in_threshold and service_instance_count > 1 and queues_empty_for_service and now - last_scale_ts[service_type] >= scale_cooldown_seconds:
try:
target_instance_address = self._instance_address_from_monitor_node(low_monitor_address)
self.reclaim_instance(service_type, target_instance_address)
last_scale_ts[service_type] = now
self.logger.info(
"Auto-scale in triggered: service=%s low_gpu_utilization=%.2f reclaimed_instance=%s",
service_type,
low_utilization,
target_instance_address,
)
except Exception as exc:
self.logger.warning(
"Auto-scale in skipped for service=%s low_gpu_utilization=%.2f reason=%s",
service_type,
low_utilization,
exc,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The check for service_instance_count > 1 and the call to reclaim_instance should be atomic to prevent race conditions. With _instance_lock changed to an RLock, the entire scale-in logic should be moved inside the lock block.

                with self._instance_lock:
                    service_instance_count = sum(1 for meta in self._managed_instances.values() if meta.get("instance_type") == service_type)
                    queues_empty_for_service = bool(low_metric.get("all_queues_empty", False)) and int(low_metric.get("queue_total_pending", -1)) == 0

                    if low_utilization < scale_in_threshold and service_instance_count > 1 and queues_empty_for_service and now - last_scale_ts[service_type] >= scale_cooldown_seconds:
                        try:
                            target_instance_address = self._instance_address_from_monitor_node(low_monitor_address)
                            self.reclaim_instance(service_type, target_instance_address)
                            last_scale_ts[service_type] = now
                            self.logger.info(
                                "Auto-scale in triggered: service=%s low_gpu_utilization=%.2f reclaimed_instance=%s",
                                service_type,
                                low_utilization,
                                target_instance_address,
                            )
                        except Exception as exc:
                            self.logger.warning(
                                "Auto-scale in skipped for service=%s low_gpu_utilization=%.2f reason=%s",
                                service_type,
                                low_utilization,
                                exc,
                            )

if base_save_path:
save_path = Path(base_save_path)
request_config["save_path"] = str(save_path.with_name(f"{save_path.stem}{i + 1}{save_path.suffix}"))
request_config["save_path"] = str(save_path.with_name(f"{save_path.stem}{i}{save_path.suffix}"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is an indexing mismatch for the output video files. The controller now uses 0-indexing (i), but the test script scripts/disagg/run_wan_t2v_service.sh still expects 1-indexed files. This will cause the test script to wait indefinitely for the last file.

Suggested change
request_config["save_path"] = str(save_path.with_name(f"{save_path.stem}{i}{save_path.suffix}"))
request_config["save_path"] = str(save_path.with_name(f"{save_path.stem}{i + 1}{save_path.suffix}"))

import time
from collections.abc import Mapping
from pathlib import Path
from threading import Event, Lock, Thread
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Import RLock to support reentrant locking, which is necessary for the instance management logic in the monitor callback.

Suggested change
from threading import Event, Lock, Thread
from threading import Event, Lock, RLock, Thread

]
env = os.environ.copy()
env["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
process = subprocess.Popen(cmd, env=env)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Subprocesses are launched without redirecting their output. This causes logs from all service instances to be interleaved in the controller's output, making it difficult to monitor or debug individual instances. Consider redirecting stdout and stderr to instance-specific log files.

self.logger.exception("Failed to connect phase1 request RDMA buffer, will retry")

if self._phase1_rdma_buffer is not None:
if self._phase1_rdma_buffer is not None and len(req_queue) + len(waiting_queue) < 2:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The backpressure limit 2 is hardcoded. Consider making this threshold configurable to allow tuning based on GPU capacity and workload requirements.

@helloyongyang helloyongyang merged commit da8ea2f into ModelTC:main Mar 31, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants