diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index f87a30265a17b..6f605d7203bf8 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -24,7 +24,7 @@ use crate::{ apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter, }; use arrow::array::{RecordBatch, RecordBatchOptions}; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, SchemaBuilder}; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -213,6 +213,7 @@ impl FileOpener for ParquetOpener { self.projection .project_schema(self.table_schema.table_schema())?, ); + let virtual_columns = Arc::clone(self.table_schema.virtual_columns()); // Build a combined map for replacing column references with literal values. // This includes: @@ -348,7 +349,10 @@ impl FileOpener for ParquetOpener { // unnecessary I/O. We decide later if it is needed to evaluate the // pruning predicates. Thus default to not requesting it from the // underlying reader. - let mut options = ArrowReaderOptions::new().with_page_index(false); + let mut options = ArrowReaderOptions::new() + .with_page_index(false) + .with_virtual_columns(virtual_columns.to_vec())?; + #[cfg(feature = "parquet_encryption")] if let Some(fd_val) = file_decryption_properties { options = options.with_file_decryption_properties(Arc::clone(&fd_val)); @@ -461,9 +465,18 @@ impl FileOpener for ParquetOpener { // Filter pushdown: evaluate predicates during scan if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() { + // Predicate pushdown (or rather, projection) is not supported for virtual columns, + // so we must remove the virtual columns from the schema used for building the row filter + let mut schema_builder = + SchemaBuilder::from(physical_file_schema.as_ref()); + for i in 0..virtual_columns.len() { + schema_builder.remove(physical_file_schema.fields().len() - i - 1); + } + let pushdown_schema = Arc::new(schema_builder.finish()); + let row_filter = row_filter::build_row_filter( &predicate, - &physical_file_schema, + &pushdown_schema, builder.metadata(), reorder_predicates, &file_metrics, @@ -601,8 +614,18 @@ impl FileOpener for ParquetOpener { // metrics from the arrow reader itself let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + // The projection mask should only include physical parquet columns. We know that + // projection column indices > the number of root parquet columns must be virtual or + // partition columns, since these are after the physical columns in the table schema. + let parquet_num_columns = + builder.parquet_schema().root_schema().get_fields().len(); let indices = projection.column_indices(); - let mask = ProjectionMask::roots(builder.parquet_schema(), indices); + let parquet_indices: Vec = indices + .iter() + .filter(|&idx| idx < &parquet_num_columns) + .copied() + .collect(); + let mask = ProjectionMask::roots(builder.parquet_schema(), parquet_indices); let stream = builder .with_projection(mask) @@ -610,13 +633,17 @@ impl FileOpener for ParquetOpener { .with_metrics(arrow_reader_metrics.clone()) .build()?; + // The reader's stream.schema() doesn't include virtual columns, so we add them. + let mut schema_builder = SchemaBuilder::from(stream.schema().as_ref()); + schema_builder.extend(virtual_columns.iter().cloned()); + let stream_schema = Arc::new(schema_builder.finish()); + let files_ranges_pruned_statistics = file_metrics.files_ranges_pruned_statistics.clone(); let predicate_cache_inner_records = file_metrics.predicate_cache_inner_records.clone(); let predicate_cache_records = file_metrics.predicate_cache_records.clone(); - let stream_schema = Arc::clone(stream.schema()); // Check if we need to replace the schema to handle things like differing nullability or metadata. // See note below about file vs. output schema. let replace_schema = !stream_schema.eq(&output_schema); @@ -1018,7 +1045,8 @@ mod test { use super::{ConstantColumns, constant_columns_from_stats}; use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, opener::ParquetOpener}; - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use arrow::array::{Array, AsArray}; + use arrow::datatypes::{DataType, Field, Int32Type, Int64Type, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ ColumnStatistics, DataFusionError, ScalarValue, Statistics, record_batch, @@ -1038,7 +1066,7 @@ mod test { use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{Stream, StreamExt}; use object_store::{ObjectStore, memory::InMemory, path::Path}; - use parquet::arrow::ArrowWriter; + use parquet::arrow::{ArrowWriter, RowNumber}; use parquet::file::properties::WriterProperties; /// Builder for creating [`ParquetOpener`] instances with sensible defaults for tests. @@ -1159,16 +1187,16 @@ mod test { let table_schema = self.table_schema.expect( "ParquetOpenerBuilder: table_schema must be set via with_schema() or with_table_schema()", ); - let file_schema = Arc::clone(table_schema.file_schema()); + let full_schema = table_schema.table_schema(); let projection = if let Some(projection) = self.projection { projection } else if let Some(indices) = self.projection_indices { - ProjectionExprs::from_indices(&indices, &file_schema) + // Use table_schema (which includes virtual columns) for projection + ProjectionExprs::from_indices(&indices, full_schema) } else { - // Default: project all columns - let all_indices: Vec = (0..file_schema.fields().len()).collect(); - ProjectionExprs::from_indices(&all_indices, &file_schema) + let all_indices: Vec = (0..full_schema.fields().len()).collect(); + ProjectionExprs::from_indices(&all_indices, full_schema) }; ParquetOpener { @@ -2004,4 +2032,649 @@ mod test { "Reverse scan with non-contiguous row groups should correctly map RowSelection" ); } + + /// Options for reading parquet files in tests + #[derive(Default)] + struct ReadOptions { + projection: Option>, + partition_values: Option>, + predicate: Option>, + } + + /// Writes a batch to parquet and reads it back with the given options. + /// Returns a single RecordBatch. + async fn read_parquet( + batch: arrow::record_batch::RecordBatch, + table_schema: TableSchema, + options: ReadOptions, + ) -> arrow::record_batch::RecordBatch { + let store = Arc::new(InMemory::new()) as Arc; + + let data_size = + write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await; + + let mut file = PartitionedFile::new("test.parquet".to_string(), data_size as u64); + if let Some(partition_values) = options.partition_values { + file.partition_values = partition_values; + } + + let mut builder = ParquetOpenerBuilder::new() + .with_store(Arc::clone(&store)) + .with_table_schema(table_schema.clone()); + if let Some(projection) = options.projection { + builder = builder.with_projection_indices(&projection); + } + if let Some(predicate) = options.predicate { + builder = builder + .with_predicate(predicate) + .with_pushdown_filters(true); + } + let opener = builder.build(); + let mut stream = opener.open(file).unwrap().await.unwrap(); + + let mut batches = vec![]; + while let Some(Ok(batch)) = stream.next().await { + batches.push(batch); + } + assert_eq!(batches.len(), 1, "Expected exactly one batch"); + batches.into_iter().next().unwrap() + } + + #[tokio::test] + async fn test_virtual_columns() { + let parquet_data = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let table_schema = TableSchema::new_with_virtual_columns( + Arc::clone(&parquet_data.schema()), + vec![row_number_field], + vec![], + ); + let batch = + read_parquet(parquet_data, table_schema, ReadOptions::default()).await; + + let output_schema = batch.schema(); + assert_eq!( + output_schema.fields().len(), + 2, + "Output should have 2 columns (a and row_index)" + ); + assert_eq!(output_schema.field(0).name(), "a"); + assert_eq!(output_schema.field(1).name(), "row_index"); + + let a_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(a_values, vec![1, 2, 3]); + + let row_index_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + } + + #[tokio::test] + async fn test_virtual_columns_with_projections() { + let parquet_data = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let table_schema = TableSchema::new_with_virtual_columns( + Arc::clone(&parquet_data.schema()), + vec![row_number_field], + vec![], + ); + + // Project only the virtual column (index 1) + let batch = read_parquet( + parquet_data, + table_schema, + ReadOptions { + projection: Some(vec![1]), + ..Default::default() + }, + ) + .await; + + let output_schema = batch.schema(); + assert_eq!( + output_schema.fields().len(), + 1, + "Output should have 1 column (row_index)" + ); + assert_eq!(output_schema.field(0).name(), "row_index"); + + let row_index_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + } + + #[tokio::test] + async fn test_virtual_columns_with_partition_columns() { + let parquet_data = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let partition_col = Arc::new(Field::new("region", DataType::Utf8, false)); + let table_schema = TableSchema::new_with_virtual_columns( + Arc::clone(&parquet_data.schema()), + vec![row_number_field], + vec![partition_col], + ); + + // Project all columns: file column (0), virtual column (1), partition column (2) + let batch = read_parquet( + parquet_data, + table_schema, + ReadOptions { + partition_values: Some(vec![ScalarValue::Utf8(Some( + "europe".to_string(), + ))]), + ..Default::default() + }, + ) + .await; + + let output_schema = batch.schema(); + assert_eq!( + output_schema.fields().len(), + 3, + "Output should have 3 columns (a, row_index, region)" + ); + assert_eq!(output_schema.field(0).name(), "a"); + assert_eq!(output_schema.field(1).name(), "row_index"); + assert_eq!(output_schema.field(2).name(), "region"); + + let a_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(a_values, vec![1, 2, 3], "File column 'a' values"); + + let row_index_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!( + row_index_values, + vec![0, 1, 2], + "Virtual column 'row_index' values" + ); + + let region_values = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap().to_string()) + .collect::>(); + assert_eq!( + region_values, + vec!["europe", "europe", "europe"], + "Partition column 'region' values" + ); + } + + #[tokio::test] + async fn test_partition_and_virtual_columns_only() { + let parquet_data = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let partition_col = Arc::new(Field::new("year", DataType::Int32, false)); + let table_schema = TableSchema::new_with_virtual_columns( + Arc::clone(&parquet_data.schema()), + vec![row_number_field], + vec![partition_col], + ); + + let batch = read_parquet( + parquet_data, + table_schema, + ReadOptions { + projection: Some(vec![1, 2]), + partition_values: Some(vec![ScalarValue::Int32(Some(2026))]), + ..Default::default() + }, + ) + .await; + + let output_schema = batch.schema(); + assert_eq!( + output_schema.fields().len(), + 2, + "Output should have 2 columns (row_index, year)" + ); + assert_eq!(output_schema.field(0).name(), "row_index"); + assert_eq!(output_schema.field(1).name(), "year"); + + let row_index_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!( + row_index_values, + vec![0, 1, 2], + "Virtual column 'row_index' values" + ); + + let year_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!( + year_values, + vec![2026, 2026, 2026], + "Partition column 'year' values" + ); + } + + #[tokio::test] + async fn test_nested_schema_projections() { + use arrow::array::{ArrayRef, Int32Array, StructArray}; + // Create nested schema: a: {b: int32, c: {d: int32, e: int32}} + let inner_struct_fields = vec![ + Field::new("d", DataType::Int32, false), + Field::new("e", DataType::Int32, false), + ]; + let inner_struct_type = DataType::Struct(inner_struct_fields.clone().into()); + + let outer_struct_fields = vec![ + Field::new("b", DataType::Int32, false), + Field::new("c", inner_struct_type.clone(), false), + ]; + let outer_struct_type = DataType::Struct(outer_struct_fields.clone().into()); + + let a_struct = { + let d_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30])); + let e_array: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 300])); + let c_struct = StructArray::from(vec![ + (Arc::new(inner_struct_fields[0].clone()), d_array), + (Arc::new(inner_struct_fields[1].clone()), e_array), + ]); + let b_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let c_array: ArrayRef = Arc::new(c_struct); + Arc::new(StructArray::from(vec![ + (Arc::new(outer_struct_fields[0].clone()), b_array), + (Arc::new(outer_struct_fields[1].clone()), c_array), + ])) + }; + + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + + // File schema: x: int32, a: {b: int32, c: {d: int32, e: int32}}, y: int32 + // Table schema: x, a, y, row_index (indices 0, 1, 2, 3) + let file_schema = Arc::new(Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("a", outer_struct_type.clone(), false), + Field::new("y", DataType::Int32, false), + ])); + + let x_array: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 300])); + let y_array: ArrayRef = Arc::new(Int32Array::from(vec![1000, 2000, 3000])); + + let parquet_data = arrow::record_batch::RecordBatch::try_new( + file_schema.clone(), + vec![x_array, a_struct, y_array], + ) + .unwrap(); + + let table_schema = TableSchema::new_with_virtual_columns( + file_schema, + vec![row_number_field], + vec![], + ); + + // Test 1: Read all columns including row_index + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions::default(), + ) + .await; + + assert_eq!(batch.schema().fields().len(), 4); + assert_eq!(batch.schema().field(0).name(), "x"); + assert_eq!(batch.schema().field(1).name(), "a"); + assert_eq!(batch.schema().field(2).name(), "y"); + assert_eq!(batch.schema().field(3).name(), "row_index"); + + let row_index_values = batch + .column(3) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + + let a_col = batch.column(1).as_struct(); + let b_values = a_col + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(b_values, vec![1, 2, 3]); + } + + // Test 2: Project nested struct and row_index only + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + projection: Some(vec![1, 3]), + ..Default::default() + }, + ) + .await; + + assert_eq!(batch.schema().fields().len(), 2); + assert_eq!(batch.schema().field(0).name(), "a"); + assert_eq!(batch.schema().field(1).name(), "row_index"); + + let row_index_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + + let a_col = batch.column(0).as_struct(); + let c_col = a_col.column(1).as_struct(); + let d_values = c_col + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(d_values, vec![10, 20, 30]); + } + + // Test 3: Project only primitive columns with row_index (skip nested struct) + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + projection: Some(vec![0, 2, 3]), // x, y, row_index - skip 'a' + ..Default::default() + }, + ) + .await; + + assert_eq!(batch.schema().fields().len(), 3); + assert_eq!(batch.schema().field(0).name(), "x"); + assert_eq!(batch.schema().field(1).name(), "y"); + assert_eq!(batch.schema().field(2).name(), "row_index"); + + let x_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + let y_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + let row_index_values = batch + .column(2) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(x_values, vec![100, 200, 300]); + assert_eq!(y_values, vec![1000, 2000, 3000]); + assert_eq!(row_index_values, vec![0, 1, 2]); + } + + // Test 4: Project only the nested column (without row_index) + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + projection: Some(vec![1]), + ..Default::default() + }, + ) + .await; + + assert_eq!(batch.schema().fields().len(), 1); + assert_eq!(batch.schema().field(0).name(), "a"); + + let a_col = batch.column(0).as_struct(); + let b_values = a_col + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(b_values, vec![1, 2, 3]); + } + + // Test 5: Project columns in different order with row_index + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + projection: Some(vec![3, 2, 0, 1]), // row_index, y, x, a (reordered) + ..Default::default() + }, + ) + .await; + + assert_eq!(batch.schema().fields().len(), 4); + assert_eq!(batch.schema().field(0).name(), "row_index"); + assert_eq!(batch.schema().field(1).name(), "y"); + assert_eq!(batch.schema().field(2).name(), "x"); + assert_eq!(batch.schema().field(3).name(), "a"); + + let row_index_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + let y_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + let x_values = batch + .column(2) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + assert_eq!(y_values, vec![1000, 2000, 3000]); + assert_eq!(x_values, vec![100, 200, 300]); + + let a_col = batch.column(3).as_struct(); + let b_values = a_col + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(b_values, vec![1, 2, 3]); + } + + // Test 6: Project only row_index + { + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + projection: Some(vec![3]), + ..Default::default() + }, + ) + .await; + + assert_eq!(batch.schema().fields().len(), 1); + assert_eq!(batch.schema().field(0).name(), "row_index"); + + let row_index_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2]); + } + } + + #[tokio::test] + async fn test_predicate_with_virtual_columns() { + let parquet_data = record_batch!(( + "a", + Int32, + vec![Some(10), Some(20), Some(30), Some(40), Some(50)] + )) + .unwrap(); + + let row_number_field = Arc::new( + Field::new("row_index", DataType::Int64, false) + .with_extension_type(RowNumber), + ); + let table_schema = TableSchema::new_with_virtual_columns( + parquet_data.schema(), + vec![row_number_field], + vec![], + ); + + // Test 1: Filter on file column (a > 20) with virtual column in schema + { + let expr = col("a").gt(lit(20)); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + predicate: Some(predicate), + ..Default::default() + }, + ) + .await; + + let a_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(a_values, vec![30, 40, 50]); + + let row_index_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![2, 3, 4]); + } + + // Test 2: Filter on virtual column does not have predicate pushdown + { + let expr = col("row_index").eq(lit(2)); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + predicate: Some(predicate), + projection: Some(vec![0, 1]), // a and row_index + ..Default::default() + }, + ) + .await; + + let a_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(a_values, vec![10, 20, 30, 40, 50]); + + let row_index_values = batch + .column(1) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1, 2, 3, 4]); + } + + // Test 3: Project only virtual column with predicate on file column + { + let expr = col("a").lt(lit(30)); + let predicate = logical2physical(&expr, table_schema.table_schema()); + + let batch = read_parquet( + parquet_data.clone(), + table_schema.clone(), + ReadOptions { + predicate: Some(predicate), + projection: Some(vec![1]), // Only row_index + ..Default::default() + }, + ) + .await; + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), "row_index"); + + let row_index_values = batch + .column(0) + .as_primitive::() + .into_iter() + .flatten() + .collect::>(); + assert_eq!(row_index_values, vec![0, 1]); + } + } } diff --git a/datafusion/datasource/src/table_schema.rs b/datafusion/datasource/src/table_schema.rs index a45cdbaaea076..ce895bef014b9 100644 --- a/datafusion/datasource/src/table_schema.rs +++ b/datafusion/datasource/src/table_schema.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Helper struct to manage table schemas with partition columns +//! Helper struct to manage table schemas with partition columns and virtual columns use arrow::datatypes::{FieldRef, SchemaBuilder, SchemaRef}; use std::sync::Arc; @@ -23,10 +23,12 @@ use std::sync::Arc; /// Helper to hold table schema information for partitioned data sources. /// /// When reading partitioned data (such as Hive-style partitioning), a table's schema -/// consists of two parts: +/// consists of multiple parts: /// 1. **File schema**: The schema of the actual data files on disk /// 2. **Partition columns**: Columns that are encoded in the directory structure, /// not stored in the files themselves +/// 3. **Virtual columns**: Columns computed during read (e.g., row numbers), +/// not stored in the files /// /// # Example: Partitioned Table /// @@ -38,14 +40,16 @@ use std::sync::Arc; /// /// In this case: /// - **File schema**: The schema of `data.parquet` files (e.g., `[user_id, amount]`) +/// - **Virtual columns**: Computed columns like `row_index` for row numbers /// - **Partition columns**: `[date, region]` extracted from the directory path -/// - **Table schema**: The full schema combining both (e.g., `[user_id, amount, date, region]`) +/// - **Table schema**: The full schema combining all (e.g., `[user_id, amount, row_index, date, region]`) /// /// # When to Use /// /// Use `TableSchema` when: /// - Reading partitioned data sources (Parquet, CSV, etc. with Hive-style partitioning) /// - You need to efficiently access different schema representations without reconstructing them +/// - You need virtual columns like row numbers computed during read /// - You want to avoid repeatedly concatenating file and partition schemas /// /// For non-partitioned data or when working with a single schema representation, @@ -57,12 +61,19 @@ use std::sync::Arc; /// to any representation without repeated allocations or reconstructions. #[derive(Debug, Clone)] pub struct TableSchema { - /// The schema of the data files themselves, without partition columns. + /// The schema of the data files themselves, without virtual columns or partition columns. /// /// For example, if your Parquet files contain `[user_id, amount]`, /// this field holds that schema. file_schema: SchemaRef, + /// Virtual columns that are computed during read. + /// + /// These columns are not stored in the data files but are generated during + /// query execution. Examples include row index (0-based) using parquet's + /// RowNumber extension type. + virtual_columns: Arc>, + /// Columns that are derived from the directory structure (partitioning scheme). /// /// For Hive-style partitioning like `/date=2025-10-10/region=us-west/`, @@ -72,10 +83,11 @@ pub struct TableSchema { /// row during query execution based on the file's location. table_partition_cols: Arc>, - /// The complete table schema: file_schema columns followed by partition columns. + /// The complete table schema: file_schema columns followed by virtual columns + /// and partition columns. /// - /// This is pre-computed during construction by concatenating `file_schema` - /// and `table_partition_cols`, so it can be returned as a cheap reference. + /// This is pre-computed during construction by concatenating `file_schema`, + /// `virtual_columns`, and `table_partition_cols`, so it can be returned as a cheap reference. table_schema: SchemaRef, } @@ -121,12 +133,63 @@ impl TableSchema { builder.extend(table_partition_cols.iter().cloned()); Self { file_schema, + virtual_columns: Arc::new(vec![]), + table_partition_cols: Arc::new(table_partition_cols), + table_schema: Arc::new(builder.finish()), + } + } + + /// Create a new TableSchema from a file schema and virtual columns. + /// + /// The table schema is automatically computed by appending the virtual columns + /// to the file schema. + /// + /// # Arguments + /// + /// * `file_schema` - Schema of the data files (without virtual columns) + /// * `virtual_columns` - Virtual columns to append to each row + /// + /// # Example + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow::datatypes::{Schema, Field, DataType}; + /// # use datafusion_datasource::TableSchema; + /// let file_schema = Arc::new(Schema::new(vec![ + /// Field::new("user_id", DataType::Int64, false), + /// ])); + /// + /// let virtual_cols = vec![ + /// Arc::new(Field::new("row_index", DataType::Int64, false)), + /// ]; + /// + /// let partition_cols = vec![ + /// Arc::new(Field::new("date", DataType::Utf8, false)), + /// Arc::new(Field::new("region", DataType::Utf8, false)), + /// ]; + /// + /// let table_schema = TableSchema::new_with_virtual_columns(file_schema, virtual_cols, partition_cols); + /// + /// // Table schema will have 2 columns: user_id, row_index + /// assert_eq!(table_schema.table_schema().fields().len(), 2); + /// ``` + pub fn new_with_virtual_columns( + file_schema: SchemaRef, + virtual_columns: Vec, + table_partition_cols: Vec, + ) -> Self { + let mut builder = SchemaBuilder::from(file_schema.as_ref()); + builder.extend(virtual_columns.iter().cloned()); + builder.extend(table_partition_cols.iter().cloned()); + Self { + file_schema, + virtual_columns: Arc::new(virtual_columns), table_partition_cols: Arc::new(table_partition_cols), table_schema: Arc::new(builder.finish()), } } - /// Create a new TableSchema with no partition columns. + /// Create a new TableSchema with no partition columns or virtual columns. /// /// You should prefer calling [`TableSchema::new`] if you have partition columns at /// construction time since it avoids re-computing the table schema. @@ -149,19 +212,51 @@ impl TableSchema { ); table_partition_cols.extend(partition_cols); } + self.recompute_table_schema(); + self + } + + /// Add virtual columns to an existing TableSchema, returning a new instance. + /// + /// Virtual columns are computed during read (e.g., row numbers) and are not + /// stored in the data files. + pub fn with_virtual_columns(mut self, virtual_cols: Vec) -> Self { + if self.virtual_columns.is_empty() { + self.virtual_columns = Arc::new(virtual_cols); + } else { + // Append to existing virtual columns + let virtual_columns = Arc::get_mut(&mut self.virtual_columns).expect( + "Expected to be the sole owner of virtual_columns since this function accepts mut self", + ); + virtual_columns.extend(virtual_cols); + } + self.recompute_table_schema(); + self + } + + /// Recompute the table schema from file schema, partition columns, and virtual columns. + fn recompute_table_schema(&mut self) { let mut builder = SchemaBuilder::from(self.file_schema.as_ref()); + builder.extend(self.virtual_columns.iter().cloned()); builder.extend(self.table_partition_cols.iter().cloned()); self.table_schema = Arc::new(builder.finish()); - self } - /// Get the file schema (without partition columns). + /// Get the file schema (without virtual columns or partition columns). /// /// This is the schema of the actual data files on disk. pub fn file_schema(&self) -> &SchemaRef { &self.file_schema } + /// Get the virtual columns. + /// + /// These are columns computed during read (e.g., row index) that + /// will be appended to each row during query execution. + pub fn virtual_columns(&self) -> &Arc> { + &self.virtual_columns + } + /// Get the table partition columns. /// /// These are the columns derived from the directory structure that @@ -170,10 +265,10 @@ impl TableSchema { &self.table_partition_cols } - /// Get the full table schema (file schema + partition columns). + /// Get the full table schema (file schema + virtual columns + partition columns). /// /// This is the complete schema that will be seen by queries, combining - /// both the columns from the files and the partition columns. + /// all columns from files, virtual, and partition columns. pub fn table_schema(&self) -> &SchemaRef { &self.table_schema } @@ -276,4 +371,45 @@ mod tests { &expected_schema ); } + + #[test] + fn test_virtual_columns() { + let file_schema = Arc::new(Schema::new(vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + ])); + + let virtual_cols = + vec![Arc::new(Field::new("row_index", DataType::Int64, false))]; + + let partition_cols = vec![Arc::new(Field::new("date", DataType::Utf8, false))]; + + let table_schema = TableSchema::new_with_virtual_columns( + file_schema.clone(), + virtual_cols.clone(), + partition_cols.clone(), + ); + + // Verify file schema + assert_eq!(table_schema.file_schema().as_ref(), file_schema.as_ref()); + + // Verify virtual columns + assert_eq!(table_schema.virtual_columns().len(), 1); + assert_eq!(table_schema.virtual_columns()[0].name(), "row_index"); + + // Verify partition columns + assert_eq!(table_schema.table_partition_cols().len(), 1); + assert_eq!(table_schema.table_partition_cols()[0].name(), "date"); + + // Verify full table schema has all columns in correct order: + // file_schema + virtual_columns + partition_columns + let expected_fields = vec![ + Field::new("user_id", DataType::Int64, false), + Field::new("amount", DataType::Float64, false), + Field::new("row_index", DataType::Int64, false), + Field::new("date", DataType::Utf8, false), + ]; + let expected_schema = Schema::new(expected_fields); + assert_eq!(table_schema.table_schema().as_ref(), &expected_schema); + } }