diff --git a/Cargo.lock b/Cargo.lock index 607bc0e754..6240e5eb2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3369,6 +3369,7 @@ dependencies = [ "strum", "tempfile", "tokio", + "tracing", "typed-builder", "typetag", "url", diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index a7e0171337..78e52376fd 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -33,7 +33,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, - TableCommit, TableCreation, TableIdent, + Runtime, TableCommit, TableCreation, TableIdent, }; use iceberg_storage_opendal::OpenDalStorageFactory; @@ -58,6 +58,7 @@ pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; pub struct GlueCatalogBuilder { config: GlueCatalogConfig, storage_factory: Option>, + runtime: Runtime, } impl Default for GlueCatalogBuilder { @@ -71,6 +72,7 @@ impl Default for GlueCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + runtime: Runtime::default(), } } } @@ -83,6 +85,11 @@ impl CatalogBuilder for GlueCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = runtime; + self + } + fn load( mut self, name: impl Into, @@ -129,7 +136,7 @@ impl CatalogBuilder for GlueCatalogBuilder { )); } - GlueCatalog::new(self.config, self.storage_factory).await + GlueCatalog::new(self.config, self.storage_factory, self.runtime).await } } } @@ -151,6 +158,7 @@ pub struct GlueCatalog { config: GlueCatalogConfig, client: GlueClient, file_io: FileIO, + runtime: Runtime, } impl Debug for GlueCatalog { @@ -166,6 +174,7 @@ impl GlueCatalog { async fn new( config: GlueCatalogConfig, storage_factory: Option>, + runtime: Runtime, ) -> Result { let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await; let mut file_io_props = config.props.clone(); @@ -215,6 +224,7 @@ impl GlueCatalog { config, client: GlueClient(client), file_io, + runtime, }) } /// Get the catalogs `FileIO` @@ -273,6 +283,7 @@ impl GlueCatalog { NamespaceIdent::new(db_name), table_name.to_owned(), )) + .runtime(self.runtime.clone()) .build()?; Ok((table, version_id)) @@ -612,6 +623,7 @@ impl Catalog for GlueCatalog { .metadata_location(metadata_location_str) .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) + .runtime(self.runtime.clone()) .build() } @@ -846,6 +858,7 @@ impl Catalog for GlueCatalog { .metadata_location(metadata_location) .metadata(metadata) .file_io(self.file_io()) + .runtime(self.runtime.clone()) .build()?) } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 4a030c1104..3e192b6582 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -31,7 +31,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, - TableCommit, TableCreation, TableIdent, + Runtime, TableCommit, TableCreation, TableIdent, }; use volo_thrift::MaybeException; @@ -56,6 +56,7 @@ pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; pub struct HmsCatalogBuilder { config: HmsCatalogConfig, storage_factory: Option>, + runtime: Runtime, } impl Default for HmsCatalogBuilder { @@ -69,6 +70,7 @@ impl Default for HmsCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + runtime: Runtime::default(), } } } @@ -81,6 +83,11 @@ impl CatalogBuilder for HmsCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = runtime; + self + } + fn load( mut self, name: impl Into, @@ -133,7 +140,7 @@ impl CatalogBuilder for HmsCatalogBuilder { "Catalog warehouse is required", )) } else { - HmsCatalog::new(self.config, self.storage_factory) + HmsCatalog::new(self.config, self.storage_factory, self.runtime) } }; @@ -169,6 +176,7 @@ pub struct HmsCatalog { config: HmsCatalogConfig, client: HmsClient, file_io: FileIO, + runtime: Runtime, } impl Debug for HmsCatalog { @@ -184,6 +192,7 @@ impl HmsCatalog { fn new( config: HmsCatalogConfig, storage_factory: Option>, + runtime: Runtime, ) -> Result { let address = config .address @@ -223,6 +232,7 @@ impl HmsCatalog { config, client: HmsClient(client), file_io, + runtime, }) } /// Get the catalogs `FileIO` @@ -529,6 +539,7 @@ impl Catalog for HmsCatalog { .metadata_location(metadata_location_str) .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) + .runtime(self.runtime.clone()) .build() } @@ -567,6 +578,7 @@ impl Catalog for HmsCatalog { NamespaceIdent::new(db_name), table.name.clone(), )) + .runtime(self.runtime.clone()) .build() } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 7d5df24d52..45fba3f637 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -26,8 +26,8 @@ use async_trait::async_trait; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; use iceberg::table::Table; use iceberg::{ - Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, - TableCreation, TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, Runtime, + TableCommit, TableCreation, TableIdent, }; use itertools::Itertools; use reqwest::header::{ @@ -62,6 +62,7 @@ const PATH_V1: &str = "v1"; pub struct RestCatalogBuilder { config: RestCatalogConfig, storage_factory: Option>, + runtime: Runtime, } impl Default for RestCatalogBuilder { @@ -75,6 +76,7 @@ impl Default for RestCatalogBuilder { client: None, }, storage_factory: None, + runtime: Runtime::default(), } } } @@ -87,6 +89,11 @@ impl CatalogBuilder for RestCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = runtime; + self + } + fn load( mut self, name: impl Into, @@ -123,7 +130,11 @@ impl CatalogBuilder for RestCatalogBuilder { "Catalog uri is required", )) } else { - Ok(RestCatalog::new(self.config, self.storage_factory)) + Ok(RestCatalog::new( + self.config, + self.storage_factory, + self.runtime, + )) } }; @@ -351,15 +362,21 @@ pub struct RestCatalog { ctx: OnceCell, /// Storage factory for creating FileIO instances. storage_factory: Option>, + runtime: Runtime, } impl RestCatalog { /// Creates a `RestCatalog` from a [`RestCatalogConfig`]. - fn new(config: RestCatalogConfig, storage_factory: Option>) -> Self { + fn new( + config: RestCatalogConfig, + storage_factory: Option>, + runtime: Runtime, + ) -> Self { Self { user_config: config, ctx: OnceCell::new(), storage_factory, + runtime, } } @@ -790,7 +807,8 @@ impl Catalog for RestCatalog { let table_builder = Table::builder() .identifier(table_ident.clone()) .file_io(file_io) - .metadata(response.metadata); + .metadata(response.metadata) + .runtime(self.runtime.clone()); if let Some(metadata_location) = response.metadata_location { table_builder.metadata_location(metadata_location).build() @@ -846,7 +864,8 @@ impl Catalog for RestCatalog { let table_builder = Table::builder() .identifier(table_ident.clone()) .file_io(file_io) - .metadata(response.metadata); + .metadata(response.metadata) + .runtime(self.runtime.clone()); if let Some(metadata_location) = response.metadata_location { table_builder.metadata_location(metadata_location).build() @@ -982,6 +1001,7 @@ impl Catalog for RestCatalog { .file_io(file_io) .metadata(response.metadata) .metadata_location(metadata_location.clone()) + .runtime(self.runtime.clone()) .build() } @@ -1054,6 +1074,7 @@ impl Catalog for RestCatalog { .file_io(file_io) .metadata(response.metadata) .metadata_location(response.metadata_location) + .runtime(self.runtime.clone()) .build() } } @@ -1099,6 +1120,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); assert_eq!( @@ -1173,6 +1195,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1220,6 +1243,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1244,6 +1268,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1275,6 +1300,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1306,6 +1332,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1337,6 +1364,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1450,6 +1478,7 @@ mod tests { .props(props) .build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let token = catalog.context().await.unwrap().client.token().await; @@ -1497,6 +1526,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let _namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1527,6 +1557,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1578,6 +1609,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1677,6 +1709,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1730,6 +1763,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let namespaces = catalog @@ -1773,6 +1807,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let namespaces = catalog @@ -1806,6 +1841,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); assert!( @@ -1834,6 +1870,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); catalog @@ -1874,6 +1911,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let tables = catalog @@ -1942,6 +1980,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let tables = catalog @@ -2073,6 +2112,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let tables = catalog @@ -2117,6 +2157,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); catalog @@ -2146,6 +2187,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); assert!( @@ -2177,6 +2219,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); catalog @@ -2211,6 +2254,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let table = catalog @@ -2328,6 +2372,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let table = catalog @@ -2364,6 +2409,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let table_creation = TableCreation::builder() @@ -2513,6 +2559,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let table_creation = TableCreation::builder() @@ -2582,6 +2629,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let table1 = { @@ -2725,6 +2773,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let table1 = { @@ -2789,6 +2838,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let table_ident = TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string()); @@ -2840,6 +2890,7 @@ mod tests { let catalog = RestCatalog::new( RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), + Runtime::default(), ); let table_ident = diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index b88bd77d29..f1c006059a 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -32,7 +32,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, - TableCommit, TableCreation, TableIdent, + Runtime, TableCommit, TableCreation, TableIdent, }; use iceberg_storage_opendal::OpenDalStorageFactory; @@ -71,6 +71,7 @@ struct S3TablesCatalogConfig { pub struct S3TablesCatalogBuilder { config: S3TablesCatalogConfig, storage_factory: Option>, + runtime: Runtime, } /// Default builder for [`S3TablesCatalog`]. @@ -85,6 +86,7 @@ impl Default for S3TablesCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + runtime: Runtime::default(), } } } @@ -132,6 +134,11 @@ impl CatalogBuilder for S3TablesCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = runtime; + self + } + fn load( mut self, name: impl Into, @@ -172,7 +179,7 @@ impl CatalogBuilder for S3TablesCatalogBuilder { "Table bucket ARN is required", )) } else { - S3TablesCatalog::new(self.config, self.storage_factory).await + S3TablesCatalog::new(self.config, self.storage_factory, self.runtime).await } } } @@ -184,6 +191,7 @@ pub struct S3TablesCatalog { config: S3TablesCatalogConfig, s3tables_client: aws_sdk_s3tables::Client, file_io: FileIO, + runtime: Runtime, } impl S3TablesCatalog { @@ -191,6 +199,7 @@ impl S3TablesCatalog { async fn new( config: S3TablesCatalogConfig, storage_factory: Option>, + runtime: Runtime, ) -> Result { let s3tables_client = if let Some(client) = config.client.clone() { client @@ -214,6 +223,7 @@ impl S3TablesCatalog { config, s3tables_client, file_io, + runtime, }) } @@ -246,6 +256,7 @@ impl S3TablesCatalog { .metadata(metadata) .metadata_location(metadata_location) .file_io(self.file_io.clone()) + .runtime(self.runtime.clone()) .build()?; Ok((table, resp.version_token)) } @@ -544,6 +555,7 @@ impl Catalog for S3TablesCatalog { .metadata_location(metadata_location_str) .metadata(metadata) .file_io(self.file_io.clone()) + .runtime(self.runtime.clone()) .build()?; Ok(table) } @@ -726,7 +738,9 @@ mod tests { props: HashMap::new(), }; - Ok(Some(S3TablesCatalog::new(config, None).await?)) + Ok(Some( + S3TablesCatalog::new(config, None, Runtime::default()).await?, + )) } #[tokio::test] diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 7e468e7e37..86bd1a0e26 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -26,7 +26,7 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, - TableCommit, TableCreation, TableIdent, + Runtime, TableCommit, TableCreation, TableIdent, }; use sqlx::any::{AnyPoolOptions, AnyQueryResult, AnyRow, install_default_drivers}; use sqlx::{Any, AnyPool, Row, Transaction}; @@ -67,6 +67,7 @@ static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each con pub struct SqlCatalogBuilder { config: SqlCatalogConfig, storage_factory: Option>, + runtime: Runtime, } impl Default for SqlCatalogBuilder { @@ -80,6 +81,7 @@ impl Default for SqlCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + runtime: Runtime::default(), } } } @@ -143,6 +145,11 @@ impl CatalogBuilder for SqlCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = runtime; + self + } + fn load( mut self, name: impl Into, @@ -190,7 +197,7 @@ impl CatalogBuilder for SqlCatalogBuilder { )) } else { self.config.name = name; - SqlCatalog::new(self.config, self.storage_factory).await + SqlCatalog::new(self.config, self.storage_factory, self.runtime).await } } } @@ -221,6 +228,7 @@ pub struct SqlCatalog { warehouse_location: String, fileio: FileIO, sql_bind_style: SqlBindStyle, + runtime: Runtime, } #[derive(Debug, PartialEq, strum::EnumString, strum::Display)] @@ -237,6 +245,7 @@ impl SqlCatalog { async fn new( config: SqlCatalogConfig, storage_factory: Option>, + runtime: Runtime, ) -> Result { let factory = storage_factory.ok_or_else(|| { Error::new( @@ -303,6 +312,7 @@ impl SqlCatalog { warehouse_location: config.warehouse_location, fileio, sql_bind_style: config.sql_bind_style, + runtime, }) } @@ -810,6 +820,7 @@ impl Catalog for SqlCatalog { .identifier(identifier.clone()) .metadata_location(tbl_metadata_location) .metadata(metadata) + .runtime(self.runtime.clone()) .build()?) } @@ -880,6 +891,7 @@ impl Catalog for SqlCatalog { .metadata_location(tbl_metadata_location_str) .identifier(tbl_ident) .metadata(tbl_metadata) + .runtime(self.runtime.clone()) .build()?) } @@ -951,6 +963,7 @@ impl Catalog for SqlCatalog { .metadata_location(metadata_location) .metadata(metadata) .file_io(self.fileio.clone()) + .runtime(self.runtime.clone()) .build()?) } diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index aa1d0cd4a5..9428f18c16 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -74,7 +74,8 @@ serde_json = { workspace = true } serde_repr = { workspace = true } serde_with = { workspace = true } strum = { workspace = true, features = ["derive"] } -tokio = { workspace = true, optional = false, features = ["sync"] } +tokio = { workspace = true, optional = false, features = ["sync", "rt-multi-thread"] } +tracing = { workspace = true } typed-builder = { workspace = true } typetag = { workspace = true } url = { workspace = true } diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index ae97534d83..5147d74f1b 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -30,6 +30,7 @@ use crate::delete_vector::DeleteVector; use crate::expr::Predicate::AlwaysTrue; use crate::expr::{Predicate, Reference}; use crate::io::FileIO; +use crate::runtime::Runtime; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{ DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor, @@ -45,6 +46,7 @@ pub(crate) struct CachingDeleteFileLoader { /// Shared filter state to allow caching loaded deletes across multiple /// calls to `load_deletes` (e.g., across multiple file scan tasks). delete_filter: DeleteFilter, + runtime: Runtime, } // Intermediate context during processing of a delete file task. @@ -76,11 +78,16 @@ enum ParsedDeleteFileContext { #[allow(unused_variables)] impl CachingDeleteFileLoader { - pub(crate) fn new(file_io: FileIO, concurrency_limit_data_files: usize) -> Self { + pub(crate) fn new( + file_io: FileIO, + concurrency_limit_data_files: usize, + runtime: Runtime, + ) -> Self { CachingDeleteFileLoader { basic_delete_file_loader: BasicDeleteFileLoader::new(file_io), concurrency_limit_data_files, - delete_filter: DeleteFilter::default(), + delete_filter: DeleteFilter::new(runtime.clone()), + runtime, } } @@ -171,7 +178,7 @@ impl CachingDeleteFileLoader { let del_filter = self.delete_filter.clone(); let concurrency_limit_data_files = self.concurrency_limit_data_files; let basic_delete_file_loader = self.basic_delete_file_loader.clone(); - crate::runtime::spawn(async move { + self.runtime.io().spawn(async move { let result = async move { let mut del_filter = del_filter; let basic_delete_file_loader = basic_delete_file_loader.clone(); @@ -726,7 +733,8 @@ mod tests { let table_location = tmp_dir.path(); let file_io = FileIO::new_with_fs(); - let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_file_loader = + CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default()); let file_scan_tasks = setup(table_location); @@ -946,7 +954,8 @@ mod tests { }; // Load the deletes - should handle both types without error - let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_file_loader = + CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default()); let delete_filter = delete_file_loader .load_deletes(&file_scan_task.deletes, file_scan_task.schema_ref()) .await @@ -1017,7 +1026,8 @@ mod tests { let table_location = tmp_dir.path(); let file_io = FileIO::new_with_fs(); - let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_file_loader = + CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default()); let file_scan_tasks = setup(table_location); diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 6369938ce2..79373e994c 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -24,6 +24,7 @@ use tokio::sync::oneshot::Receiver; use crate::delete_vector::DeleteVector; use crate::expr::Predicate::AlwaysTrue; use crate::expr::{Bind, BoundPredicate, Predicate}; +use crate::runtime::Runtime; use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; use crate::spec::DataContentType; use crate::{Error, ErrorKind, Result}; @@ -53,9 +54,10 @@ struct DeleteFileFilterState { positional_deletes: HashMap, } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub(crate) struct DeleteFilter { state: Arc>, + runtime: Runtime, } /// Action to take when trying to start loading a positional delete file @@ -71,6 +73,14 @@ pub(crate) enum PosDelLoadAction { } impl DeleteFilter { + /// Create a new DeleteFilter with the given runtime. + pub(crate) fn new(runtime: Runtime) -> Self { + Self { + state: Arc::new(RwLock::new(DeleteFileFilterState::default())), + runtime, + } + } + /// Retrieve a delete vector for the data file associated with a given file scan task pub(crate) fn get_delete_vector( &self, @@ -245,7 +255,7 @@ impl DeleteFilter { let state = self.state.clone(); let delete_file_path = delete_file_path.to_string(); - crate::runtime::spawn(async move { + self.runtime.cpu().spawn(async move { let eq_del = eq_del.await.unwrap(); { let mut state = state.write().unwrap(); @@ -292,7 +302,8 @@ pub(crate) mod tests { let table_location = tmp_dir.path(); let file_io = FileIO::new_with_fs(); - let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_file_loader = + CachingDeleteFileLoader::new(file_io.clone(), 10, Runtime::default()); let file_scan_tasks = setup(table_location); @@ -503,7 +514,7 @@ pub(crate) mod tests { case_sensitive: true, }; - let filter = DeleteFilter::default(); + let filter = DeleteFilter::new(Runtime::default()); // ---------- insert equality delete predicate ---------- let pred = Reference::new("id").equal_to(Datum::long(10)); diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 700ba69262..a02b63f037 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -56,6 +56,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; +use crate::runtime::Runtime; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::util::available_parallelism; @@ -141,11 +142,12 @@ pub struct ArrowReaderBuilder { row_group_filtering_enabled: bool, row_selection_enabled: bool, parquet_read_options: ParquetReadOptions, + runtime: Runtime, } impl ArrowReaderBuilder { /// Create a new ArrowReaderBuilder - pub fn new(file_io: FileIO) -> Self { + pub fn new(file_io: FileIO, runtime: Runtime) -> Self { let num_cpus = available_parallelism().get(); ArrowReaderBuilder { @@ -155,6 +157,7 @@ impl ArrowReaderBuilder { row_group_filtering_enabled: true, row_selection_enabled: false, parquet_read_options: ParquetReadOptions::builder().build(), + runtime, } } @@ -217,6 +220,7 @@ impl ArrowReaderBuilder { delete_file_loader: CachingDeleteFileLoader::new( self.file_io.clone(), self.concurrency_limit_data_files, + self.runtime, ), concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, @@ -1978,6 +1982,7 @@ mod tests { use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::{Bind, Predicate, Reference}; use crate::io::FileIO; + use crate::runtime::Runtime; use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream}; use crate::spec::{ DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type, @@ -2163,7 +2168,7 @@ message schema { let (file_io, schema, table_location, _temp_dir) = setup_kleene_logic(data_for_col_a, DataType::Utf8); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let result_data = test_perform_read(predicate, schema, table_location, reader).await; @@ -2185,7 +2190,7 @@ message schema { let (file_io, schema, table_location, _temp_dir) = setup_kleene_logic(data_for_col_a, DataType::Utf8); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let result_data = test_perform_read(predicate, schema, table_location, reader).await; @@ -2249,7 +2254,7 @@ message schema { let (file_io, schema, table_location, _temp_dir) = setup_kleene_logic(data_for_col_a, DataType::LargeUtf8); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); for (predicate, expected) in predicates { println!("testing predicate {predicate}"); @@ -2594,7 +2599,7 @@ message schema { ); let file_io = FileIO::new_with_fs(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); // Task 1: read only the first row group let task1 = FileScanTask { @@ -2742,7 +2747,7 @@ message schema { writer.close().unwrap(); // Read the old Parquet file using the NEW schema (with column 'b') - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { file_size_in_bytes: std::fs::metadata(format!("{table_location}/old_file.parquet")) @@ -2912,7 +2917,7 @@ message schema { // Step 3: Read the data file with the delete applied let file_io = FileIO::new_with_fs(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let task = FileScanTask { file_size_in_bytes: std::fs::metadata(&data_file_path).unwrap().len(), @@ -3131,7 +3136,7 @@ message schema { ); let file_io = FileIO::new_with_fs(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { @@ -3344,7 +3349,7 @@ message schema { let rg1_length = row_group_1.compressed_size() as u64; let file_io = FileIO::new_with_fs(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { @@ -3455,7 +3460,7 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -3556,7 +3561,7 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -3646,7 +3651,7 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -3750,7 +3755,7 @@ message schema { } writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -3883,7 +3888,7 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -3983,7 +3988,7 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { @@ -4093,7 +4098,7 @@ message schema { let predicate = Reference::new("id").less_than(Datum::int(5)); // Enable both row_group_filtering and row_selection - triggered the panic - let reader = ArrowReaderBuilder::new(file_io) + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()) .with_row_group_filtering_enabled(true) .with_row_selection_enabled(true) .build(); @@ -4187,7 +4192,7 @@ message schema { } // Read with concurrency=1 (fast-path) - let reader = ArrowReaderBuilder::new(file_io) + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()) .with_data_file_concurrency_limit(1) .build(); @@ -4409,7 +4414,7 @@ message schema { writer.close().unwrap(); // Read the Parquet file with partition spec and data - let reader = ArrowReaderBuilder::new(file_io).build(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::default()).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { file_size_in_bytes: std::fs::metadata(format!("{table_location}/data.parquet")) diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 8fa5c479c3..f3eab7dd9d 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -27,6 +27,7 @@ use itertools::Itertools; use super::namespace_state::NamespaceState; use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory}; +use crate::runtime::Runtime; use crate::spec::{TableMetadata, TableMetadataBuilder}; use crate::table::Table; use crate::{ @@ -45,6 +46,7 @@ const LOCATION: &str = "location"; pub struct MemoryCatalogBuilder { config: MemoryCatalogConfig, storage_factory: Option>, + runtime: Runtime, } impl Default for MemoryCatalogBuilder { @@ -56,6 +58,7 @@ impl Default for MemoryCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + runtime: Runtime::default(), } } } @@ -68,6 +71,11 @@ impl CatalogBuilder for MemoryCatalogBuilder { self } + fn with_runtime(mut self, runtime: Runtime) -> Self { + self.runtime = runtime; + self + } + fn load( mut self, name: impl Into, @@ -100,7 +108,7 @@ impl CatalogBuilder for MemoryCatalogBuilder { "Catalog warehouse is required", )) } else { - MemoryCatalog::new(self.config, self.storage_factory) + MemoryCatalog::new(self.config, self.storage_factory, self.runtime) } }; @@ -121,6 +129,7 @@ pub struct MemoryCatalog { root_namespace_state: Mutex, file_io: FileIO, warehouse_location: String, + runtime: Runtime, } impl MemoryCatalog { @@ -128,6 +137,7 @@ impl MemoryCatalog { fn new( config: MemoryCatalogConfig, storage_factory: Option>, + runtime: Runtime, ) -> Result { // Use provided factory or default to MemoryStorageFactory let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory)); @@ -136,6 +146,7 @@ impl MemoryCatalog { root_namespace_state: Mutex::new(NamespaceState::default()), file_io: FileIOBuilder::new(factory).with_props(config.props).build(), warehouse_location: config.warehouse, + runtime, }) } @@ -153,6 +164,7 @@ impl MemoryCatalog { .metadata(metadata) .metadata_location(metadata_location.to_string()) .file_io(self.file_io.clone()) + .runtime(self.runtime.clone()) .build() } } @@ -307,6 +319,7 @@ impl Catalog for MemoryCatalog { .metadata_location(metadata_location.to_string()) .metadata(metadata) .identifier(table_ident) + .runtime(self.runtime.clone()) .build() } @@ -378,6 +391,7 @@ impl Catalog for MemoryCatalog { .metadata_location(metadata_location) .metadata(metadata) .identifier(table_ident.clone()) + .runtime(self.runtime.clone()) .build() } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index f296cf2260..10f340132e 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -40,6 +40,7 @@ use typed_builder::TypedBuilder; use uuid::Uuid; use crate::io::StorageFactory; +use crate::runtime::Runtime; use crate::spec::{ EncryptedKey, FormatVersion, PartitionStatisticsFile, Schema, SchemaId, Snapshot, SnapshotReference, SortOrder, StatisticsFile, TableMetadata, TableMetadataBuilder, @@ -152,6 +153,13 @@ pub trait CatalogBuilder: Default + Debug + Send + Sync { /// ``` fn with_storage_factory(self, storage_factory: Arc) -> Self; + /// Set a custom tokio Runtime to use for spawning async tasks. + /// + /// When a Runtime is provided, the catalog will propagate it to all tables + /// it creates. Tasks such as scan planning and delete file processing + /// will be spawned on this runtime. + fn with_runtime(self, runtime: Runtime) -> Self; + /// Create a new catalog instance. fn load( self, diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 4f6fd28483..07394bb855 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -23,7 +23,7 @@ use futures::StreamExt; use futures::channel::mpsc::{Sender, channel}; use tokio::sync::Notify; -use crate::runtime::spawn; +use crate::runtime::Runtime; use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, DataFile, Struct}; @@ -53,7 +53,7 @@ struct PopulatedDeleteFileIndex { impl DeleteFileIndex { /// create a new `DeleteFileIndex` along with the sender that populates it with delete files - pub(crate) fn new() -> (DeleteFileIndex, Sender) { + pub(crate) fn new(runtime: Runtime) -> (DeleteFileIndex, Sender) { // TODO: what should the channel limit be? let (tx, rx) = channel(10); let notify = Arc::new(Notify::new()); @@ -62,7 +62,7 @@ impl DeleteFileIndex { ))); let delete_file_stream = rx.boxed(); - spawn({ + runtime.io().spawn({ let state = state.clone(); async move { let delete_files: Vec = diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index ae0708146b..4e346460f5 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -90,6 +90,7 @@ pub mod transaction; pub mod transform; mod runtime; +pub use runtime::{Runtime, RuntimeHandle}; pub mod arrow; pub(crate) mod delete_file_index; diff --git a/crates/iceberg/src/runtime/mod.rs b/crates/iceberg/src/runtime/mod.rs index 61aa623f58..5b0554763b 100644 --- a/crates/iceberg/src/runtime/mod.rs +++ b/crates/iceberg/src/runtime/mod.rs @@ -17,12 +17,16 @@ // This module contains the async runtime abstraction for iceberg. +use std::fmt; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use tokio::task; +use tracing::warn; +/// Wrapper around tokio's JoinHandle that panics on task failure. pub struct JoinHandle(task::JoinHandle); impl Unpin for JoinHandle {} @@ -39,37 +43,245 @@ impl Future for JoinHandle { } } -#[allow(dead_code)] -pub fn spawn(f: F) -> JoinHandle -where - F: std::future::Future + Send + 'static, - F::Output: Send + 'static, -{ - JoinHandle(task::spawn(f)) +/// Handle to a single tokio runtime. Holds an optional `Arc` to keep the +/// runtime alive when we own it, and a `Handle` for spawning. +#[derive(Clone)] +pub struct RuntimeHandle { + /// Keeps the tokio runtime alive when we own it (`Runtime::new`). + /// `None` when borrowing an existing runtime via `Handle::try_current()`. + _owned: Option>, + handle: tokio::runtime::Handle, } -#[allow(dead_code)] -pub fn spawn_blocking(f: F) -> JoinHandle -where - F: FnOnce() -> T + Send + 'static, - T: Send + 'static, -{ - JoinHandle(task::spawn_blocking(f)) +impl fmt::Debug for RuntimeHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RuntimeHandle").finish() + } +} + +impl RuntimeHandle { + /// Create a handle that owns the given tokio runtime. + fn new(runtime: Arc) -> Self { + let handle = runtime.handle().clone(); + Self { + _owned: Some(runtime), + handle, + } + } + + /// Create a handle that borrows an existing tokio runtime via its handle. + fn from_handle(handle: tokio::runtime::Handle) -> Self { + Self { + _owned: None, + handle, + } + } + + /// Spawn an async task. + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + JoinHandle(self.handle.spawn(future)) + } + + /// Spawn a blocking task. + pub fn spawn_blocking(&self, f: F) -> JoinHandle + where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, + { + JoinHandle(self.handle.spawn_blocking(f)) + } +} + +/// Iceberg's runtime abstraction. +/// +/// Contains separate handles for IO-bound and CPU-bound work. When constructed +/// with a single tokio runtime, both `io()` and `cpu()` route to the same one. +/// Use `new_with_split` to provide dedicated runtimes for each category. +/// +/// Cloning is cheap (Arc clones internally). +#[derive(Clone)] +pub struct Runtime { + io: RuntimeHandle, + cpu: RuntimeHandle, +} + +impl fmt::Debug for Runtime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Runtime").finish() + } +} + +impl Runtime { + /// Create a Runtime backed by a single tokio runtime for all work. + pub fn new(runtime: Arc) -> Self { + let handle = RuntimeHandle::new(runtime); + Self { + io: handle.clone(), + cpu: handle, + } + } + + /// Create a Runtime with separate tokio runtimes for IO and CPU work. + pub fn new_with_split( + io_runtime: Arc, + cpu_runtime: Arc, + ) -> Self { + Self { + io: RuntimeHandle::new(io_runtime), + cpu: RuntimeHandle::new(cpu_runtime), + } + } + + /// Handle for IO-bound work (network fetches, file reads). + pub fn io(&self) -> &RuntimeHandle { + &self.io + } + + /// Handle for CPU-bound work (decoding, predicate eval, projection). + pub fn cpu(&self) -> &RuntimeHandle { + &self.cpu + } +} + +impl Default for Runtime { + fn default() -> Self { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let rh = RuntimeHandle::from_handle(handle); + return Self { + io: rh.clone(), + cpu: rh, + }; + } + + warn!( + "No tokio runtime found. Creating a new multi-thread runtime for iceberg. \ + Consider providing an explicit Runtime via CatalogBuilder::with_runtime() \ + or TableBuilder::runtime() to avoid unexpected resource usage." + ); + + let rt = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to build default tokio runtime"), + ); + Self::new(rt) + } } #[cfg(test)] mod tests { use super::*; - #[tokio::test] - async fn test_tokio_spawn() { - let handle = spawn(async { 1 + 1 }); - assert_eq!(handle.await, 2); + fn test_runtime() -> Runtime { + let tokio_rt = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to build tokio runtime"), + ); + Runtime::new(tokio_rt) + } + + fn block_on(rt: &Runtime, f: F) -> F::Output { + rt.io._owned.as_ref().unwrap().block_on(f) + } + + #[test] + fn test_runtime_default_creates_working_runtime() { + let rt = Runtime::default(); + let handle = rt.io().spawn(async { 1 + 1 }); + let result = block_on(&rt, handle); + assert_eq!(result, 2); + } + + #[test] + fn test_runtime_spawn_io() { + let rt = test_runtime(); + let handle = rt.io().spawn(async { 1 + 1 }); + let result = block_on(&rt, handle); + assert_eq!(result, 2); + } + + #[test] + fn test_runtime_spawn_cpu() { + let rt = test_runtime(); + let handle = rt.cpu().spawn(async { 3 + 4 }); + let result = block_on(&rt, handle); + assert_eq!(result, 7); + } + + #[test] + fn test_runtime_spawn_blocking() { + let rt = test_runtime(); + let handle = rt.cpu().spawn_blocking(|| 1 + 1); + let result = block_on(&rt, handle); + assert_eq!(result, 2); + } + + #[test] + fn test_runtime_new_with_custom_runtime() { + let tokio_rt = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to build tokio runtime"), + ); + let rt = Runtime::new(tokio_rt); + let handle = rt.io().spawn(async { 42 }); + let result = block_on(&rt, handle); + assert_eq!(result, 42); + } + + #[test] + fn test_runtime_single_shares_handle() { + let rt = test_runtime(); + // When built with a single runtime, io and cpu point to the same handle + assert!(Arc::ptr_eq( + rt.io._owned.as_ref().unwrap(), + rt.cpu._owned.as_ref().unwrap() + )); + } + + #[test] + fn test_runtime_split_uses_separate_handles() { + let io_rt = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let cpu_rt = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let rt = Runtime::new_with_split(io_rt, cpu_rt); + assert!(!Arc::ptr_eq( + rt.io._owned.as_ref().unwrap(), + rt.cpu._owned.as_ref().unwrap() + )); + } + + #[test] + fn test_runtime_clone_shares_arc() { + let rt = test_runtime(); + let rt2 = rt.clone(); + assert!(Arc::ptr_eq( + rt.io._owned.as_ref().unwrap(), + rt2.io._owned.as_ref().unwrap() + )); } - #[tokio::test] - async fn test_tokio_spawn_blocking() { - let handle = spawn_blocking(|| 1 + 1); - assert_eq!(handle.await, 2); + #[test] + fn test_runtime_debug() { + let rt = test_runtime(); + let debug_str = format!("{:?}", rt); + assert!(debug_str.contains("Runtime")); } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 4a1e27bdc1..f83ad7b723 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -37,7 +37,7 @@ use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluato use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name}; -use crate::runtime::spawn; +use crate::runtime::Runtime; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; use crate::util::available_parallelism; @@ -210,6 +210,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + runtime: self.table.runtime().clone(), }); }; current_snapshot_id.clone() @@ -303,6 +304,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + runtime: self.table.runtime().clone(), }) } } @@ -331,6 +333,8 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, + + runtime: Runtime, } impl TableScan { @@ -352,7 +356,7 @@ impl TableScan { // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); - let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); + let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(self.runtime.clone()); let manifest_list = plan_context.get_manifest_list().await?; @@ -367,9 +371,13 @@ impl TableScan { )?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); + let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); + let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); + + let rt = self.runtime.clone(); // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s - spawn(async move { + rt.io().spawn(async move { let result = futures::stream::iter(manifest_file_contexts) .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move { ctx.fetch_manifest_and_stream_manifest_entries().await @@ -381,61 +389,85 @@ impl TableScan { } }); - let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); - let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); - // Process the delete file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_delete_ctx_rx - .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { - spawn(async move { - Self::process_delete_manifest_entry(manifest_entry_context, tx).await - }) - .await - }, - ) - .await; + { + let rt = rt.clone(); + let rt_inner = rt.clone(); + rt.cpu() + .spawn(async move { + let result = manifest_entry_delete_ctx_rx + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| { + let rt_inner = rt_inner.clone(); + async move { + rt_inner + .cpu() + .spawn(async move { + Self::process_delete_manifest_entry( + manifest_entry_context, + tx, + ) + .await + }) + .await + } + }, + ) + .await; - if let Err(error) = result { - let _ = channel_for_delete_manifest_entry_error - .send(Err(error)) - .await; - } - }) - .await; + if let Err(error) = result { + let _ = channel_for_delete_manifest_entry_error + .send(Err(error)) + .await; + } + }) + .await; + } // Process the data file [`ManifestEntry`] stream in parallel - spawn(async move { - let result = manifest_entry_data_ctx_rx - .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) - .try_for_each_concurrent( - concurrency_limit_manifest_entries, - |(manifest_entry_context, tx)| async move { - spawn(async move { - Self::process_data_manifest_entry(manifest_entry_context, tx).await - }) - .await - }, - ) - .await; + { + let rt_inner = rt.clone(); + rt.cpu().spawn(async move { + let result = manifest_entry_data_ctx_rx + .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| { + let rt_inner = rt_inner.clone(); + async move { + rt_inner + .cpu() + .spawn(async move { + Self::process_data_manifest_entry( + manifest_entry_context, + tx, + ) + .await + }) + .await + } + }, + ) + .await; - if let Err(error) = result { - let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; - } - }); + if let Err(error) = result { + let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; + } + }); + } Ok(file_scan_task_rx.boxed()) } /// Returns an [`ArrowRecordBatchStream`]. pub async fn to_arrow(&self) -> Result { - let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) - .with_data_file_concurrency_limit(self.concurrency_limit_data_files) - .with_row_group_filtering_enabled(self.row_group_filtering_enabled) - .with_row_selection_enabled(self.row_selection_enabled); + let mut arrow_reader_builder = + ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone()) + .with_data_file_concurrency_limit(self.concurrency_limit_data_files) + .with_row_group_filtering_enabled(self.row_group_filtering_enabled) + .with_row_selection_enabled(self.row_selection_enabled); if let Some(batch_size) = self.batch_size { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); @@ -1360,14 +1392,22 @@ pub mod tests { .unwrap(); assert_eq!(plan_task.len(), 2); - let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); + let reader = ArrowReaderBuilder::new( + fixture.table.file_io().clone(), + fixture.table.runtime().clone(), + ) + .build(); let batch_stream = reader .clone() .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .unwrap(); let batch_1: Vec<_> = batch_stream.try_collect().await.unwrap(); - let reader = ArrowReaderBuilder::new(fixture.table.file_io().clone()).build(); + let reader = ArrowReaderBuilder::new( + fixture.table.file_io().clone(), + fixture.table.runtime().clone(), + ) + .build(); let batch_stream = reader .read(Box::pin(stream::iter(vec![Ok(plan_task.remove(0))]))) .unwrap(); diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 56ddbd51ba..8f0e588a34 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -23,6 +23,7 @@ use crate::arrow::ArrowReaderBuilder; use crate::inspect::MetadataTable; use crate::io::FileIO; use crate::io::object_cache::ObjectCache; +use crate::runtime::Runtime; use crate::scan::TableScanBuilder; use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -36,6 +37,7 @@ pub struct TableBuilder { readonly: bool, disable_cache: bool, cache_size_bytes: Option, + runtime: Runtime, } impl TableBuilder { @@ -48,6 +50,7 @@ impl TableBuilder { readonly: false, disable_cache: false, cache_size_bytes: None, + runtime: Runtime::default(), } } @@ -95,6 +98,12 @@ impl TableBuilder { self } + /// Set the Runtime for this table to use when spawning tasks. + pub fn runtime(mut self, runtime: Runtime) -> Self { + self.runtime = runtime; + self + } + /// build the Table pub fn build(self) -> Result { let Self { @@ -105,6 +114,7 @@ impl TableBuilder { readonly, disable_cache, cache_size_bytes, + runtime, } = self; let Some(file_io) = file_io else { @@ -146,6 +156,7 @@ impl TableBuilder { identifier, readonly, object_cache, + runtime, }) } } @@ -159,6 +170,7 @@ pub struct Table { identifier: TableIdent, readonly: bool, object_cache: Arc, + runtime: Runtime, } impl Table { @@ -230,6 +242,11 @@ impl Table { MetadataTable::new(self) } + /// Returns a reference to the Runtime. + pub(crate) fn runtime(&self) -> &Runtime { + &self.runtime + } + /// Returns the flag indicating whether the `Table` is readonly or not pub fn readonly(&self) -> bool { self.readonly @@ -242,7 +259,7 @@ impl Table { /// Create a reader for the table. pub fn reader_builder(&self) -> ArrowReaderBuilder { - ArrowReaderBuilder::new(self.file_io.clone()) + ArrowReaderBuilder::new(self.file_io.clone(), self.runtime.clone()) } } @@ -326,7 +343,7 @@ impl StaticTable { /// Create a reader for the table. pub fn reader_builder(&self) -> ArrowReaderBuilder { - ArrowReaderBuilder::new(self.0.file_io.clone()) + ArrowReaderBuilder::new(self.0.file_io.clone(), self.0.runtime.clone()) } }