Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/ensure-jars-have-correct-contents.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ allowed_expr+="|^org/apache/spark/sql/$"
allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$"
allowed_expr+="|^org/apache/spark/CometPlugin.class$"
allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$"
allowed_expr+="|^org/apache/spark/CometExecutorPlugin.*$"
allowed_expr+="|^org/apache/spark/CometSource.*$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$"
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$"
Expand Down
39 changes: 33 additions & 6 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use std::{sync::Arc, task::Poll};
use tokio::runtime::Runtime;
use tokio::runtime::{Handle, Runtime};
use tokio::sync::mpsc;

use crate::execution::memory_pools::{
Expand All @@ -117,7 +117,7 @@ use std::sync::OnceLock;
#[cfg(feature = "jemalloc")]
use tikv_jemalloc_ctl::{epoch, stats};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs/source/contributor-guide/development.md is now stale in two spots and
could be updated separately once this lands:

  • Line 87 calls the runtime a Lazy<Runtime> static. It is OnceLock today and
    becomes Mutex<Option<Runtime>> here, torn down on plugin shutdown.
  • Line 60 says the AttachGuard detaches the thread when dropped. The
    attachment is actually cached in thread-local storage and only released when
    the worker thread exits. That detail is exactly why the runtime has to be shut
    down for the JVM to exit, so it is worth correcting.

static TOKIO_RUNTIME: OnceLock<Runtime> = OnceLock::new();
static TOKIO_RUNTIME: Mutex<Option<Runtime>> = Mutex::new(None);

#[cfg(feature = "jemalloc")]
fn log_jemalloc_usage() {
Expand Down Expand Up @@ -211,12 +211,39 @@ fn build_runtime(default_worker_threads: Option<usize>) -> Runtime {
/// Initialize the global Tokio runtime with the given default worker thread count.
/// If the runtime is already initialized, this is a no-op.
pub fn init_runtime(default_worker_threads: usize) {
TOKIO_RUNTIME.get_or_init(|| build_runtime(Some(default_worker_threads)));
let mut guard = TOKIO_RUNTIME.lock();
if guard.is_none() {
*guard = Some(build_runtime(Some(default_worker_threads)));
}
}

/// Returns a handle to the global Tokio runtime, lazily initializing it if needed.
///
/// A [`Handle`] is returned (rather than a `&'static Runtime`) so that the runtime
/// can be torn down via [`release_runtime`]. The handle is cheap to clone and can be
/// used with `spawn` / `block_on` just like a `Runtime`.
pub fn get_runtime() -> Handle {
let mut guard = TOKIO_RUNTIME.lock();
guard
.get_or_insert_with(|| build_runtime(None))
.handle()
.clone()
}

/// Function to get a handle to the global Tokio runtime
pub fn get_runtime() -> &'static Runtime {
TOKIO_RUNTIME.get_or_init(|| build_runtime(None))
/// Tears down the global Tokio runtime, if it has been initialized.
///
/// The runtime is moved out of the global slot and shut down in the background so the
/// calling (JNI) thread is not blocked waiting for worker threads to finish. Any handles
/// previously returned by [`get_runtime`] will start failing their spawns once the runtime
/// is gone, so this must only be called when no native execution is in flight.
///
/// Must not be called from within the runtime's own worker threads, otherwise the shutdown
/// would deadlock/panic.
pub fn release_runtime() {
let runtime = TOKIO_RUNTIME.lock().take();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch on the root cause. The runtime lived in a static, and Rust never
drops statics, so the worker threads never exited. Moving to
Mutex<Option<Runtime>> + take() is what finally lets the runtime drop. That
is the real fix.

One question on shutdown_background(). It signals the workers and returns
without joining them, so the JVM-exit only unblocks asynchronously as each
worker runs its thread-local detach on the way out. Did you consider
shutdown_timeout(...) instead? It would make teardown deterministic, and the
plugin shutdown thread is not latency sensitive. Not blocking, just curious
about the rationale so we can capture it in the doc comment.

if let Some(runtime) = runtime {
runtime.shutdown_background();
}
}

/// Returns a short name for an OpStruct variant.
Expand Down
28 changes: 18 additions & 10 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,24 @@ impl IcebergScanExec {

let task_stream = futures::stream::iter(tasks.into_iter().map(Ok)).boxed();

// iceberg-rust's ArrowReader spawns IO/CPU work onto an iceberg::Runtime. execute() runs
// on the JVM-called thread outside any tokio context, so Runtime::current() would panic;
// build it from Comet's global runtime, which is where the stream is later polled.
let reader =
iceberg::arrow::ArrowReaderBuilder::new(file_io, IcebergRuntime::new(get_runtime()))
.with_batch_size(batch_size)
.with_data_file_concurrency_limit(self.data_file_concurrency_limit)
.with_row_selection_enabled(true)
.with_metadata_size_hint(512 * 1024) // Same as DataFusion's default
.build();
// iceberg-rust's ArrowReader spawns IO/CPU work onto an iceberg::Runtime, which only needs
// a tokio handle. execute() runs on the JVM-called thread outside any tokio context, so we
// enter Comet's global runtime to capture its handle (this is where the stream is later
// polled). Capturing the handle rather than borrowing the runtime keeps it tear-downable
// via release_runtime.
let iceberg_runtime = {
let handle = get_runtime();
let _guard = handle.enter();
IcebergRuntime::try_current().map_err(|e| {
DataFusionError::Execution(format!("Failed to build Iceberg runtime: {e}"))
})?
};
let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io, iceberg_runtime)
.with_batch_size(batch_size)
.with_data_file_concurrency_limit(self.data_file_concurrency_limit)
.with_row_selection_enabled(true)
.with_metadata_size_hint(512 * 1024) // Same as DataFusion's default
.build();

// Pass all tasks to iceberg-rust at once to utilize its flatten_unordered
// parallelization, avoiding overhead of single-task streams
Expand Down
6 changes: 6 additions & 0 deletions native/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_init(
})
}

#[no_mangle]
/// Releases the global Tokio runtime used by Comet native execution.
pub extern "system" fn Java_org_apache_comet_NativeBase_release(_e: EnvUnowned, _class: JClass) {
execution::jni_api::release_runtime();
}

const LOG_PATTERN: &str = "{d(%y/%m/%d %H:%M:%S)} {l} {f}: {m}{n}";

/// JNI method to check if a specific feature is enabled in the native Rust code.
Expand Down
3 changes: 3 additions & 0 deletions spark/src/main/java/org/apache/comet/NativeBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ private static String resourceName() {
*/
static native void init(String logConfPath, String logLevel);

/** Release native resources */
public static native void release();

/**
* Check if a specific feature is enabled in the native library.
*
Expand Down
28 changes: 26 additions & 2 deletions spark/src/main/scala/org/apache/spark/Plugins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
import org.apache.spark.sql.internal.StaticSQLConf

import org.apache.comet.{CometSparkSessionExtensions, NativeBase}
import org.apache.comet.CometConf.{COMET_METRICS_ENABLED, COMET_ONHEAP_ENABLED}
import org.apache.comet.CometSparkSessionExtensions

/**
* Comet driver plugin. This class is loaded by Spark's plugin framework. It will be instantiated
Expand Down Expand Up @@ -95,6 +95,10 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
override def shutdown(): Unit = {
logInfo("CometDriverPlugin shutdown")

if (NativeBase.isLoaded) {
NativeBase.release()
}

super.shutdown()
}

Expand Down Expand Up @@ -148,12 +152,32 @@ object CometDriverPlugin extends Logging {
}
}

class CometExecutorPlugin extends ExecutorPlugin with Logging {

override def init(ctx: PluginContext, extraConf: ju.Map[String, String]): Unit = {
logInfo("CometExecutorPlugin init")

super.init(ctx, extraConf)
}

override def shutdown(): Unit = {
logInfo("CometExecutorPlugin shutdown")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In local mode both plugins live in the same JVM and share the one native
runtime, so both shutdown() paths call NativeBase.release(). It works
because release_runtime() does take(), which makes the second call a no-op.
Could we add a short comment noting that the double release is expected and
safe? It is not obvious when reading either plugin on its own. The executor
plugin shuts down before the driver plugin in local mode, so the executor wins
the take().


if (NativeBase.isLoaded) {
NativeBase.release()
}

super.shutdown()
}

}

/**
* The Comet plugin for Spark. To enable this plugin, set the config "spark.plugins" to
* `org.apache.spark.CometPlugin`
*/
class CometPlugin extends SparkPlugin with Logging {
override def driverPlugin(): DriverPlugin = new CometDriverPlugin

override def executorPlugin(): ExecutorPlugin = null
override def executorPlugin(): ExecutorPlugin = new CometExecutorPlugin
}
Loading