add elastic scheduling and make the system more stable#969
add elastic scheduling and make the system more stable#969helloyongyang merged 1 commit intoModelTC:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces automated instance management and auto-scaling to the disaggregated service architecture. Key changes include a new controller for dynamic lifecycle management of service instances, hardware-supported RDMA atomic operations, and enhanced monitoring with internal queue metrics. Feedback identifies a potential deadlock in the controller's locking strategy, recommending reentrant locks, and points out an indexing mismatch in output file naming that could break test scripts. Suggestions were also made to improve subprocess logging and make backpressure thresholds configurable.
| self._rdma_handshake_thread_request: Thread | None = None | ||
| self._rdma_handshake_thread_phase1: Thread | None = None | ||
| self._rdma_handshake_thread_phase2: Thread | None = None | ||
| self._instance_lock = Lock() |
There was a problem hiding this comment.
The _instance_lock must be a reentrant lock (RLock). The _monitor_callback acquires this lock and then calls reclaim_instance or create_instance, both of which attempt to acquire the same lock. Using a non-reentrant Lock will cause a deadlock in the monitor thread.
| self._instance_lock = Lock() | |
| self._instance_lock = RLock() |
| with self._instance_lock: | ||
| service_instance_count = sum(1 for meta in self._managed_instances.values() if meta.get("instance_type") == service_type) | ||
|
|
||
| queues_empty_for_service = bool(low_metric.get("all_queues_empty", False)) and int(low_metric.get("queue_total_pending", -1)) == 0 | ||
|
|
||
| if low_utilization < scale_in_threshold and service_instance_count > 1 and queues_empty_for_service and now - last_scale_ts[service_type] >= scale_cooldown_seconds: | ||
| try: | ||
| target_instance_address = self._instance_address_from_monitor_node(low_monitor_address) | ||
| self.reclaim_instance(service_type, target_instance_address) | ||
| last_scale_ts[service_type] = now | ||
| self.logger.info( | ||
| "Auto-scale in triggered: service=%s low_gpu_utilization=%.2f reclaimed_instance=%s", | ||
| service_type, | ||
| low_utilization, | ||
| target_instance_address, | ||
| ) | ||
| except Exception as exc: | ||
| self.logger.warning( | ||
| "Auto-scale in skipped for service=%s low_gpu_utilization=%.2f reason=%s", | ||
| service_type, | ||
| low_utilization, | ||
| exc, | ||
| ) |
There was a problem hiding this comment.
The check for service_instance_count > 1 and the call to reclaim_instance should be atomic to prevent race conditions. With _instance_lock changed to an RLock, the entire scale-in logic should be moved inside the lock block.
with self._instance_lock:
service_instance_count = sum(1 for meta in self._managed_instances.values() if meta.get("instance_type") == service_type)
queues_empty_for_service = bool(low_metric.get("all_queues_empty", False)) and int(low_metric.get("queue_total_pending", -1)) == 0
if low_utilization < scale_in_threshold and service_instance_count > 1 and queues_empty_for_service and now - last_scale_ts[service_type] >= scale_cooldown_seconds:
try:
target_instance_address = self._instance_address_from_monitor_node(low_monitor_address)
self.reclaim_instance(service_type, target_instance_address)
last_scale_ts[service_type] = now
self.logger.info(
"Auto-scale in triggered: service=%s low_gpu_utilization=%.2f reclaimed_instance=%s",
service_type,
low_utilization,
target_instance_address,
)
except Exception as exc:
self.logger.warning(
"Auto-scale in skipped for service=%s low_gpu_utilization=%.2f reason=%s",
service_type,
low_utilization,
exc,
)| if base_save_path: | ||
| save_path = Path(base_save_path) | ||
| request_config["save_path"] = str(save_path.with_name(f"{save_path.stem}{i + 1}{save_path.suffix}")) | ||
| request_config["save_path"] = str(save_path.with_name(f"{save_path.stem}{i}{save_path.suffix}")) |
There was a problem hiding this comment.
There is an indexing mismatch for the output video files. The controller now uses 0-indexing (i), but the test script scripts/disagg/run_wan_t2v_service.sh still expects 1-indexed files. This will cause the test script to wait indefinitely for the last file.
| request_config["save_path"] = str(save_path.with_name(f"{save_path.stem}{i}{save_path.suffix}")) | |
| request_config["save_path"] = str(save_path.with_name(f"{save_path.stem}{i + 1}{save_path.suffix}")) |
| import time | ||
| from collections.abc import Mapping | ||
| from pathlib import Path | ||
| from threading import Event, Lock, Thread |
| ] | ||
| env = os.environ.copy() | ||
| env["CUDA_VISIBLE_DEVICES"] = str(gpu_id) | ||
| process = subprocess.Popen(cmd, env=env) |
There was a problem hiding this comment.
| self.logger.exception("Failed to connect phase1 request RDMA buffer, will retry") | ||
|
|
||
| if self._phase1_rdma_buffer is not None: | ||
| if self._phase1_rdma_buffer is not None and len(req_queue) + len(waiting_queue) < 2: |
This pull request introduces several significant enhancements and new features across the disaggregated service framework, primarily focusing on improved RDMA atomic operation support, dynamic instance management in the controller, and monitoring extensibility. The most impactful changes include full support for RDMA atomic verbs (fetch-and-add and compare-and-swap), controller logic for dynamic GPU/instance lifecycle management, and new hooks for reporting custom metrics. Additionally, there are improvements to configuration flexibility and command-line overrides.
RDMA atomic operations and server/client enhancements:
Added true remote atomic fetch-and-add (
rdma_faa) and compare-and-swap (rdma_cas) operations toRDMAClient, including support for the corresponding RDMA opcodes and access flags. Both client and server now register memory regions withREMOTE_ATOMICaccess, and QP attributes are updated accordingly. This enables real atomic operations over RDMA, replacing previous best-effort shims. [1] [2] [3] [4] [5] [6]Updated example/test code to demonstrate usage of the new atomic RDMA operations, including writing, reading, fetch-and-add, and compare-and-swap.
Controller instance lifecycle and scheduling:
Implemented dynamic GPU/instance management in the controller, including:
Added helper methods for mapping between instance addresses and monitor nodes, and for recursively converting configuration objects to plain Python types.
Monitoring and metrics extensibility:
Monitorclass, allowing injection of custom metrics into the monitoring output in a thread-safe manner. [1] [2]Configuration and CLI improvements:
ranksfield to the disaggregation config for explicit rank/GPU count configuration.--engine_rankcommand-line argument to override engine rank for service roles, and logic to apply this override inrun_service.py. [1] [2]Other improvements:
These changes collectively enable more robust, flexible, and scalable operation of the disaggregated video service platform.