Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileW
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};

const DELETE_TYPE_KEY: &str = "delete-type";
const DELETE_FIELD_IDS_KEY: &str = "delete-field-ids";

/// Builder for `EqualityDeleteWriter`.
#[derive(Debug)]
pub struct EqualityDeleteFileWriterBuilder<
Expand All @@ -55,6 +58,17 @@ where
inner: RollingFileWriterBuilder<B, L, F>,
config: EqualityDeleteWriterConfig,
) -> Self {
let field_ids_str = config
.equality_ids
.iter()
.map(|id| id.to_string())
.collect::<Vec<_>>()
.join(", ");

let inner = inner
.with_footer_metadata(DELETE_TYPE_KEY.to_string(), "equality".to_string())
.with_footer_metadata(DELETE_FIELD_IDS_KEY.to_string(), field_ids_str);

Self { inner, config }
}
}
Expand Down Expand Up @@ -804,4 +818,84 @@ mod test {
assert_eq!(to_write_projected, expect_batch);
Ok(())
}

#[tokio::test]
async fn test_equality_delete_writer_sets_delete_metadata_in_footer()
-> Result<(), anyhow::Error> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIO::new_with_fs();
let location_gen = DefaultLocationGenerator::with_data_location(
temp_dir.path().to_str().unwrap().to_string(),
);
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);

let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(0, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);

let equality_ids = vec![0_i32, 1];
let equality_config =
EqualityDeleteWriterConfig::new(equality_ids.clone(), schema.clone()).unwrap();
let delete_schema =
arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();

let pb =
ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(delete_schema));
let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
pb,
file_io.clone(),
location_gen,
file_name_gen,
);
let mut equality_delete_writer =
EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, equality_config)
.build(None)
.await?;

// Write some data
let delete_arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
let batch = RecordBatch::try_new(delete_arrow_schema, vec![
Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
Arc::new(arrow_array::StringArray::from(vec!["a", "b", "c"])) as ArrayRef,
])
.unwrap();
equality_delete_writer.write(batch).await?;
let res = equality_delete_writer.close().await?;
assert_eq!(res.len(), 1);
let data_file = res.into_iter().next().unwrap();

// Read back the Parquet footer metadata
let input_file = file_io.new_input(data_file.file_path.clone()).unwrap();
let input_content = input_file.read().await.unwrap();
let reader_builder = ParquetRecordBatchReaderBuilder::try_new(input_content).unwrap();
let kv_metadata = reader_builder
.metadata()
.file_metadata()
.key_value_metadata()
.expect("key_value_metadata should be present");

// Verify delete-type is set to "equality"
let delete_type = kv_metadata
.iter()
.find(|kv| kv.key == "delete-type")
.expect("delete-type key must be present in footer");
assert_eq!(delete_type.value.as_deref(), Some("equality"));

// Verify delete-field-ids matches the equality IDs
let field_ids = kv_metadata
.iter()
.find(|kv| kv.key == "delete-field-ids")
.expect("delete-field-ids key must be present in footer");
assert_eq!(field_ids.value.as_deref(), Some("0, 1"));

Ok(())
}
}
2 changes: 2 additions & 0 deletions crates/iceberg/src/writer/file_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub trait FileWriterBuilder<O = DefaultOutput>: Clone + Send + Sync + 'static {
type R: FileWriter<O>;
/// Build file writer.
fn build(&self, output_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
/// Add a key-value metadata entry to the file footer.
fn with_footer_metadata(self, key: String, value: String) -> Self;
}

/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc)
Expand Down
14 changes: 13 additions & 1 deletion crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use itertools::Itertools;
use parquet::arrow::AsyncArrowWriter;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::metadata::{KeyValue, ParquetMetaData};
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics;

Expand Down Expand Up @@ -88,6 +88,18 @@ impl FileWriterBuilder for ParquetWriterBuilder {
nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
})
}

fn with_footer_metadata(mut self, key: String, value: String) -> Self {
let mut kv_metadata = self.props.key_value_metadata().cloned().unwrap_or_default();
kv_metadata.push(KeyValue::new(key, value));

self.props = self
.props
.into_builder()
.set_key_value_metadata(Some(kv_metadata))
.build();
self
}
}

/// A mapping from Parquet column path names to internal field id
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/writer/file_writer/rolling_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ where
}
}

/// Add a key-value metadata entry to the file footer.
pub fn with_footer_metadata(mut self, key: String, value: String) -> Self {
self.inner_builder = self.inner_builder.with_footer_metadata(key, value);
self
}

/// Build a new [`RollingFileWriter`].
pub fn build(&self) -> RollingFileWriter<B, L, F> {
RollingFileWriter {
Expand Down
Loading