diff --git a/Cargo.lock b/Cargo.lock index 297b566f46..d00fce338e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1127,6 +1127,20 @@ name = "bytemuck" version = "1.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" +dependencies = [ + "bytemuck_derive", +] + +[[package]] +name = "bytemuck_derive" +version = "1.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "byteorder" @@ -3056,6 +3070,7 @@ version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b" dependencies = [ + "bytemuck", "cfg-if", "crunchy", "num-traits", @@ -3361,6 +3376,7 @@ dependencies = [ "reqwest", "roaring", "serde", + "serde_arrow", "serde_bytes", "serde_derive", "serde_json", @@ -4130,6 +4146,21 @@ dependencies = [ "twox-hash", ] +[[package]] +name = "marrow" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5240d6977234968ff9ad254bfa73aa397fb51e41dcb22b1eb85835e9295485b" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "bytemuck", + "half", + "serde", +] + [[package]] name = "md-5" version = "0.10.6" @@ -5893,6 +5924,21 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_arrow" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2784e59a0315568e850cb01ddadf458f8c09e28d8cfc4880c2cc08f5dc3444e0" +dependencies = [ + "arrow-array", + "arrow-schema", + "bytemuck", + "chrono", + "half", + "marrow", + "serde", +] + [[package]] name = "serde_bytes" version = "0.11.19" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index aa1d0cd4a5..18729176dc 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -91,6 +91,7 @@ rand = { workspace = true } regex = { workspace = true } tempfile = { workspace = true } minijinja = { workspace = true } +serde_arrow = { version = "0.14", features = ["arrow-58"] } [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 700ba69262..9ccf1ac3d7 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -1100,7 +1100,7 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result Result usize { + if ty.is_primitive() { + 1 + } else { + ty.get_fields().iter().map(|f| leaf_count(f)).sum() + } +} + +/// Builds a mapping from fallback field IDs to leaf column indices for Parquet files +/// without embedded field IDs. Returns entries only for primitive top-level fields. +/// +/// Must use top-level field positions (not leaf column positions) to stay consistent +/// with `add_fallback_field_ids_to_arrow_schema`, which assigns ordinal IDs to +/// top-level Arrow fields. Using leaf positions instead would produce wrong indices +/// when nested types (struct/list/map) expand into multiple leaf columns. +/// +/// Mirrors iceberg-java's ParquetSchemaUtil.addFallbackIds() which iterates +/// fileSchema.getFields() assigning ordinal IDs to top-level fields. fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap { let mut column_map = HashMap::new(); + let mut leaf_idx = 0; - // 1-indexed to match iceberg-java's convention - for (idx, _field) in parquet_schema.columns().iter().enumerate() { - let field_id = (idx + 1) as i32; - column_map.insert(field_id, idx); + for (top_pos, field) in parquet_schema.root_schema().get_fields().iter().enumerate() { + let field_id = (top_pos + 1) as i32; + if field.is_primitive() { + column_map.insert(field_id, leaf_idx); + } + leaf_idx += leaf_count(field); } column_map @@ -1409,7 +1431,7 @@ impl PredicateConverter<'_> { return Err(Error::new( ErrorKind::DataInvalid, format!( - "Leave column `{}` in predicates isn't a root column in Parquet schema.", + "Leaf column `{}` in predicates isn't a root column in Parquet schema.", reference.field().name ), )); @@ -1423,7 +1445,7 @@ impl PredicateConverter<'_> { .ok_or(Error::new( ErrorKind::DataInvalid, format!( - "Leave column `{}` in predicates cannot be found in the required column indices.", + "Leaf column `{}` in predicates cannot be found in the required column indices.", reference.field().name ), ))?; @@ -4667,4 +4689,204 @@ message schema { assert_eq!(result[1], expected_1); assert_eq!(result[2], expected_2); } + + /// Regression for : + /// predicate on a column after nested types in a migrated file (no field IDs). + /// Schema has struct, list, and map columns before the predicate target (`id`), + /// exercising the fallback field ID mapping across all nested type variants. + #[tokio::test] + async fn test_predicate_on_migrated_file_with_nested_types() { + use serde::{Deserialize, Serialize}; + use serde_arrow::schema::{SchemaLike, TracingOptions}; + + #[derive(Serialize, Deserialize)] + struct Person { + name: String, + age: i32, + } + + #[derive(Serialize, Deserialize)] + struct Row { + person: Person, + people: Vec, + props: std::collections::BTreeMap, + id: i32, + } + + let rows = vec![ + Row { + person: Person { + name: "Alice".into(), + age: 30, + }, + people: vec![Person { + name: "Alice".into(), + age: 30, + }], + props: [("k1".into(), "v1".into())].into(), + id: 1, + }, + Row { + person: Person { + name: "Bob".into(), + age: 25, + }, + people: vec![Person { + name: "Bob".into(), + age: 25, + }], + props: [("k2".into(), "v2".into())].into(), + id: 2, + }, + Row { + person: Person { + name: "Carol".into(), + age: 40, + }, + people: vec![Person { + name: "Carol".into(), + age: 40, + }], + props: [("k3".into(), "v3".into())].into(), + id: 3, + }, + ]; + + let tracing_options = TracingOptions::default() + .map_as_struct(false) + .strings_as_large_utf8(false) + .sequence_as_large_list(false); + let fields = Vec::::from_type::(tracing_options).unwrap(); + let arrow_schema = Arc::new(ArrowSchema::new(fields.clone())); + let batch = serde_arrow::to_record_batch(&fields, &rows).unwrap(); + + // Fallback field IDs: person=1, people=2, props=3, id=4 + let iceberg_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required( + 1, + "person", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::required( + 5, + "name", + Type::Primitive(PrimitiveType::String), + ) + .into(), + NestedField::required(6, "age", Type::Primitive(PrimitiveType::Int)) + .into(), + ])), + ) + .into(), + NestedField::required( + 2, + "people", + Type::List(crate::spec::ListType { + element_field: NestedField::required( + 7, + "element", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::required( + 8, + "name", + Type::Primitive(PrimitiveType::String), + ) + .into(), + NestedField::required( + 9, + "age", + Type::Primitive(PrimitiveType::Int), + ) + .into(), + ])), + ) + .into(), + }), + ) + .into(), + NestedField::required( + 3, + "props", + Type::Map(crate::spec::MapType { + key_field: NestedField::required( + 10, + "key", + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::required( + 11, + "value", + Type::Primitive(PrimitiveType::String), + ) + .into(), + }), + ) + .into(), + NestedField::required(4, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_path = format!("{table_location}/1.parquet"); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema, Some(props)).unwrap(); + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + + let predicate = Reference::new("id").greater_than(Datum::int(1)); + + let reader = ArrowReaderBuilder::new(FileIO::new_with_fs()) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true) + .build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(), + start: 0, + length: 0, + record_count: None, + data_file_path: file_path, + data_file_format: DataFileFormat::Parquet, + schema: iceberg_schema.clone(), + project_field_ids: vec![4], + predicate: Some(predicate.bind(iceberg_schema, true).unwrap()), + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let ids: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_primitive::() + .values() + .iter() + .copied() + }) + .collect(); + assert_eq!(ids, vec![2, 3]); + } }