-
Notifications
You must be signed in to change notification settings - Fork 336
feat: release tokio runtime on driver/executor exit #4734
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,7 +91,7 @@ use std::collections::HashMap; | |
| use std::path::PathBuf; | ||
| use std::time::{Duration, Instant}; | ||
| use std::{sync::Arc, task::Poll}; | ||
| use tokio::runtime::Runtime; | ||
| use tokio::runtime::{Handle, Runtime}; | ||
| use tokio::sync::mpsc; | ||
|
|
||
| use crate::execution::memory_pools::{ | ||
|
|
@@ -117,7 +117,7 @@ use std::sync::OnceLock; | |
| #[cfg(feature = "jemalloc")] | ||
| use tikv_jemalloc_ctl::{epoch, stats}; | ||
|
|
||
| static TOKIO_RUNTIME: OnceLock<Runtime> = OnceLock::new(); | ||
| static TOKIO_RUNTIME: Mutex<Option<Runtime>> = Mutex::new(None); | ||
|
|
||
| #[cfg(feature = "jemalloc")] | ||
| fn log_jemalloc_usage() { | ||
|
|
@@ -211,12 +211,39 @@ fn build_runtime(default_worker_threads: Option<usize>) -> Runtime { | |
| /// Initialize the global Tokio runtime with the given default worker thread count. | ||
| /// If the runtime is already initialized, this is a no-op. | ||
| pub fn init_runtime(default_worker_threads: usize) { | ||
| TOKIO_RUNTIME.get_or_init(|| build_runtime(Some(default_worker_threads))); | ||
| let mut guard = TOKIO_RUNTIME.lock(); | ||
| if guard.is_none() { | ||
| *guard = Some(build_runtime(Some(default_worker_threads))); | ||
| } | ||
| } | ||
|
|
||
| /// Returns a handle to the global Tokio runtime, lazily initializing it if needed. | ||
| /// | ||
| /// A [`Handle`] is returned (rather than a `&'static Runtime`) so that the runtime | ||
| /// can be torn down via [`release_runtime`]. The handle is cheap to clone and can be | ||
| /// used with `spawn` / `block_on` just like a `Runtime`. | ||
| pub fn get_runtime() -> Handle { | ||
| let mut guard = TOKIO_RUNTIME.lock(); | ||
| guard | ||
| .get_or_insert_with(|| build_runtime(None)) | ||
| .handle() | ||
| .clone() | ||
| } | ||
|
|
||
| /// Function to get a handle to the global Tokio runtime | ||
| pub fn get_runtime() -> &'static Runtime { | ||
| TOKIO_RUNTIME.get_or_init(|| build_runtime(None)) | ||
| /// Tears down the global Tokio runtime, if it has been initialized. | ||
| /// | ||
| /// The runtime is moved out of the global slot and shut down in the background so the | ||
| /// calling (JNI) thread is not blocked waiting for worker threads to finish. Any handles | ||
| /// previously returned by [`get_runtime`] will start failing their spawns once the runtime | ||
| /// is gone, so this must only be called when no native execution is in flight. | ||
| /// | ||
| /// Must not be called from within the runtime's own worker threads, otherwise the shutdown | ||
| /// would deadlock/panic. | ||
| pub fn release_runtime() { | ||
| let runtime = TOKIO_RUNTIME.lock().take(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch on the root cause. The runtime lived in a One question on |
||
| if let Some(runtime) = runtime { | ||
| runtime.shutdown_background(); | ||
| } | ||
| } | ||
|
|
||
| /// Returns a short name for an OpStruct variant. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,8 +28,8 @@ import org.apache.spark.internal.Logging | |
| import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR} | ||
| import org.apache.spark.sql.internal.StaticSQLConf | ||
|
|
||
| import org.apache.comet.{CometSparkSessionExtensions, NativeBase} | ||
| import org.apache.comet.CometConf.{COMET_METRICS_ENABLED, COMET_ONHEAP_ENABLED} | ||
| import org.apache.comet.CometSparkSessionExtensions | ||
|
|
||
| /** | ||
| * Comet driver plugin. This class is loaded by Spark's plugin framework. It will be instantiated | ||
|
|
@@ -95,6 +95,10 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl | |
| override def shutdown(): Unit = { | ||
| logInfo("CometDriverPlugin shutdown") | ||
|
|
||
| if (NativeBase.isLoaded) { | ||
| NativeBase.release() | ||
| } | ||
|
|
||
| super.shutdown() | ||
| } | ||
|
|
||
|
|
@@ -148,12 +152,32 @@ object CometDriverPlugin extends Logging { | |
| } | ||
| } | ||
|
|
||
| class CometExecutorPlugin extends ExecutorPlugin with Logging { | ||
|
|
||
| override def init(ctx: PluginContext, extraConf: ju.Map[String, String]): Unit = { | ||
| logInfo("CometExecutorPlugin init") | ||
|
|
||
| super.init(ctx, extraConf) | ||
| } | ||
|
|
||
| override def shutdown(): Unit = { | ||
| logInfo("CometExecutorPlugin shutdown") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In local mode both plugins live in the same JVM and share the one native |
||
|
|
||
| if (NativeBase.isLoaded) { | ||
| NativeBase.release() | ||
| } | ||
|
|
||
| super.shutdown() | ||
| } | ||
|
|
||
| } | ||
|
|
||
| /** | ||
| * The Comet plugin for Spark. To enable this plugin, set the config "spark.plugins" to | ||
| * `org.apache.spark.CometPlugin` | ||
| */ | ||
| class CometPlugin extends SparkPlugin with Logging { | ||
| override def driverPlugin(): DriverPlugin = new CometDriverPlugin | ||
|
|
||
| override def executorPlugin(): ExecutorPlugin = null | ||
| override def executorPlugin(): ExecutorPlugin = new CometExecutorPlugin | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs/source/contributor-guide/development.mdis now stale in two spots andcould be updated separately once this lands:
Lazy<Runtime>static. It isOnceLocktoday andbecomes
Mutex<Option<Runtime>>here, torn down on plugin shutdown.AttachGuarddetaches the thread when dropped. Theattachment is actually cached in thread-local storage and only released when
the worker thread exits. That detail is exactly why the runtime has to be shut
down for the JVM to exit, so it is worth correcting.