From 18b88b17ff6f5693462b1a94274a5d0f4f0e9d9e Mon Sep 17 00:00:00 2001 From: Xander Date: Thu, 9 Apr 2026 18:08:27 -0700 Subject: [PATCH] Adding delete file footers --- .../base_writer/equality_delete_writer.rs | 94 +++++++++++++++++++ crates/iceberg/src/writer/file_writer/mod.rs | 2 + .../src/writer/file_writer/parquet_writer.rs | 14 ++- .../src/writer/file_writer/rolling_writer.rs | 6 ++ 4 files changed, 115 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 9bc2bf9840..607380902b 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -33,6 +33,9 @@ use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileW use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; +const DELETE_TYPE_KEY: &str = "delete-type"; +const DELETE_FIELD_IDS_KEY: &str = "delete-field-ids"; + /// Builder for `EqualityDeleteWriter`. #[derive(Debug)] pub struct EqualityDeleteFileWriterBuilder< @@ -55,6 +58,17 @@ where inner: RollingFileWriterBuilder, config: EqualityDeleteWriterConfig, ) -> Self { + let field_ids_str = config + .equality_ids + .iter() + .map(|id| id.to_string()) + .collect::>() + .join(", "); + + let inner = inner + .with_footer_metadata(DELETE_TYPE_KEY.to_string(), "equality".to_string()) + .with_footer_metadata(DELETE_FIELD_IDS_KEY.to_string(), field_ids_str); + Self { inner, config } } } @@ -804,4 +818,84 @@ mod test { assert_eq!(to_write_projected, expect_batch); Ok(()) } + + #[tokio::test] + async fn test_equality_delete_writer_sets_delete_metadata_in_footer() + -> Result<(), anyhow::Error> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIO::new_with_fs(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let equality_ids = vec![0_i32, 1]; + let equality_config = + EqualityDeleteWriterConfig::new(equality_ids.clone(), schema.clone()).unwrap(); + let delete_schema = + arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap(); + + let pb = + ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(delete_schema)); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + pb, + file_io.clone(), + location_gen, + file_name_gen, + ); + let mut equality_delete_writer = + EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, equality_config) + .build(None) + .await?; + + // Write some data + let delete_arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap()); + let batch = RecordBatch::try_new(delete_arrow_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef, + Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])) as ArrayRef, + ]) + .unwrap(); + equality_delete_writer.write(batch).await?; + let res = equality_delete_writer.close().await?; + assert_eq!(res.len(), 1); + let data_file = res.into_iter().next().unwrap(); + + // Read back the Parquet footer metadata + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = ParquetRecordBatchReaderBuilder::try_new(input_content).unwrap(); + let kv_metadata = reader_builder + .metadata() + .file_metadata() + .key_value_metadata() + .expect("key_value_metadata should be present"); + + // Verify delete-type is set to "equality" + let delete_type = kv_metadata + .iter() + .find(|kv| kv.key == "delete-type") + .expect("delete-type key must be present in footer"); + assert_eq!(delete_type.value.as_deref(), Some("equality")); + + // Verify delete-field-ids matches the equality IDs + let field_ids = kv_metadata + .iter() + .find(|kv| kv.key == "delete-field-ids") + .expect("delete-field-ids key must be present in footer"); + assert_eq!(field_ids.value.as_deref(), Some("0, 1")); + + Ok(()) + } } diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 101919f5b3..53f0863197 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -41,6 +41,8 @@ pub trait FileWriterBuilder: Clone + Send + Sync + 'static { type R: FileWriter; /// Build file writer. fn build(&self, output_file: OutputFile) -> impl Future> + Send; + /// Add a key-value metadata entry to the file footer. + fn with_footer_metadata(self, key: String, value: String) -> Self; } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 840d1a5f16..886d377edc 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -27,7 +27,7 @@ use itertools::Itertools; use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; -use parquet::file::metadata::ParquetMetaData; +use parquet::file::metadata::{KeyValue, ParquetMetaData}; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; @@ -88,6 +88,18 @@ impl FileWriterBuilder for ParquetWriterBuilder { nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode), }) } + + fn with_footer_metadata(mut self, key: String, value: String) -> Self { + let mut kv_metadata = self.props.key_value_metadata().cloned().unwrap_or_default(); + kv_metadata.push(KeyValue::new(key, value)); + + self.props = self + .props + .into_builder() + .set_key_value_metadata(Some(kv_metadata)) + .build(); + self + } } /// A mapping from Parquet column path names to internal field id diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index b0b2d2f191..a60bf6f210 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -102,6 +102,12 @@ where } } + /// Add a key-value metadata entry to the file footer. + pub fn with_footer_metadata(mut self, key: String, value: String) -> Self { + self.inner_builder = self.inner_builder.with_footer_metadata(key, value); + self + } + /// Build a new [`RollingFileWriter`]. pub fn build(&self) -> RollingFileWriter { RollingFileWriter {