From 1e3caa536f79d259f746ab21118a2d224e4c686b Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Mon, 23 Mar 2026 11:53:32 +0100 Subject: [PATCH 1/9] feat: add support for parquet content defined chunking options --- datafusion/common/src/config.rs | 23 ++++ .../common/src/file_options/parquet_writer.rs | 77 ++++++++++++- .../datasource-parquet/src/file_format.rs | 6 +- datafusion/proto-common/Cargo.toml | 2 + .../proto/datafusion_common.proto | 14 +++ datafusion/proto-common/src/from_proto/mod.rs | 104 +++++++++++++++++- .../proto-common/src/generated/pbjson.rs | 88 +++++++++++++++ .../proto-common/src/generated/prost.rs | 30 +++++ datafusion/proto-common/src/to_proto/mod.rs | 15 ++- .../src/generated/datafusion_proto_common.rs | 28 +++++ .../proto/src/logical_plan/file_formats.rs | 6 +- .../test_files/information_schema.slt | 8 ++ 12 files changed, 393 insertions(+), 8 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0551cbbb15ae1..b311d7b960cf6 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -845,6 +845,7 @@ config_namespace! { /// default parquet writer setting pub bloom_filter_ndv: Option, default = None + /// (writing) Controls whether DataFusion will attempt to speed up writing /// parquet files by serializing them in parallel. Each column /// in each row group in each output file are serialized in parallel @@ -872,6 +873,27 @@ config_namespace! { /// writing out already in-memory data, such as from a cached /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 + + /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing + /// parquet files. When true, the other `cdc_*` options control the chunking + /// behavior. When CDC is enabled, parallel writing is automatically disabled + /// since the chunker state must persist across row groups. + pub enable_content_defined_chunking: bool, default = false + + /// (writing) Minimum chunk size in bytes for content-defined chunking. + /// The rolling hash will not be updated until this size is reached for each chunk. + /// Default is 256 KiB. Only used when `enable_content_defined_chunking` is true. + pub cdc_min_chunk_size: usize, default = 256 * 1024 + + /// (writing) Maximum chunk size in bytes for content-defined chunking. + /// The chunker will create a new chunk whenever the chunk size exceeds this value. + /// Default is 1 MiB. Only used when `enable_content_defined_chunking` is true. + pub cdc_max_chunk_size: usize, default = 1024 * 1024 + + /// (writing) Normalization level for content-defined chunking. + /// Increasing this improves deduplication ratio but increases fragmentation. + /// Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true. + pub cdc_norm_level: i64, default = 0 } } @@ -1826,6 +1848,7 @@ config_field!(usize); config_field!(f64); config_field!(u64); config_field!(u32); +config_field!(i64); impl ConfigField for u8 { fn visit(&self, v: &mut V, key: &str, description: &'static str) { diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a7a1fc6d0bb66..d9d16892917d1 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -95,7 +95,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { global, column_specific_options, key_value_metadata, - crypto: _, + .. } = table_parquet_options; let mut builder = global.into_writer_properties_builder()?; @@ -191,6 +191,10 @@ impl ParquetOptions { bloom_filter_on_write, bloom_filter_fpp, bloom_filter_ndv, + enable_content_defined_chunking, + cdc_min_chunk_size, + cdc_max_chunk_size, + cdc_norm_level, // not in WriterProperties enable_page_index: _, @@ -247,6 +251,15 @@ impl ParquetOptions { if let Some(encoding) = encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); } + if *enable_content_defined_chunking { + builder = builder.set_content_defined_chunking(Some( + parquet::file::properties::CdcOptions { + min_chunk_size: *cdc_min_chunk_size, + max_chunk_size: *cdc_max_chunk_size, + norm_level: *cdc_norm_level as i32, + }, + )); + } Ok(builder) } @@ -460,6 +473,10 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + enable_content_defined_chunking: defaults.enable_content_defined_chunking, + cdc_min_chunk_size: defaults.cdc_min_chunk_size, + cdc_max_chunk_size: defaults.cdc_max_chunk_size, + cdc_norm_level: defaults.cdc_norm_level, } } @@ -576,6 +593,21 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + enable_content_defined_chunking: props + .content_defined_chunking() + .is_some(), + cdc_min_chunk_size: props + .content_defined_chunking() + .map(|c| c.min_chunk_size) + .unwrap_or(global_options_defaults.cdc_min_chunk_size), + cdc_max_chunk_size: props + .content_defined_chunking() + .map(|c| c.max_chunk_size) + .unwrap_or(global_options_defaults.cdc_max_chunk_size), + cdc_norm_level: props + .content_defined_chunking() + .map(|c| c.norm_level as i64) + .unwrap_or(global_options_defaults.cdc_norm_level), }, column_specific_options, key_value_metadata, @@ -786,6 +818,49 @@ mod tests { ); } + #[test] + fn test_cdc_enabled_with_custom_options() { + let mut opts = TableParquetOptions::default(); + opts.global.enable_content_defined_chunking = true; + opts.global.cdc_min_chunk_size = 128 * 1024; + opts.global.cdc_max_chunk_size = 512 * 1024; + opts.global.cdc_norm_level = 2; + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + let cdc = props.content_defined_chunking().expect("CDC should be set"); + assert_eq!(cdc.min_chunk_size, 128 * 1024); + assert_eq!(cdc.max_chunk_size, 512 * 1024); + assert_eq!(cdc.norm_level, 2); + } + + #[test] + fn test_cdc_disabled_by_default() { + let mut opts = TableParquetOptions::default(); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + assert!(props.content_defined_chunking().is_none()); + } + + #[test] + fn test_cdc_round_trip_through_writer_props() { + let mut opts = TableParquetOptions::default(); + opts.global.enable_content_defined_chunking = true; + opts.global.cdc_min_chunk_size = 64 * 1024; + opts.global.cdc_max_chunk_size = 2 * 1024 * 1024; + opts.global.cdc_norm_level = -1; + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + let recovered = session_config_from_writer_props(&props); + + assert_eq!(recovered.global.enable_content_defined_chunking, true); + assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024); + assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024); + assert_eq!(recovered.global.cdc_norm_level, -1); + } + #[test] fn test_bloom_filter_set_ndv_only() { // the TableParquetOptions::default, with only ndv set diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index da35a1a34d441..be922b9349421 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1370,7 +1370,11 @@ impl FileSink for ParquetSink { while let Some((path, mut rx)) = file_stream_rx.recv().await { let parquet_props = self.create_writer_props(&runtime, &path).await?; - if !parquet_opts.global.allow_single_file_parallelism { + // CDC requires the sequential writer: the chunker state lives in ArrowWriter + // and persists across row groups. The parallel path bypasses ArrowWriter entirely. + if !parquet_opts.global.allow_single_file_parallelism + || parquet_opts.global.enable_content_defined_chunking + { let mut writer = self .create_async_arrow_writer( &path, diff --git a/datafusion/proto-common/Cargo.toml b/datafusion/proto-common/Cargo.toml index 46dae36ba40ed..9de3494ec1e21 100644 --- a/datafusion/proto-common/Cargo.toml +++ b/datafusion/proto-common/Cargo.toml @@ -37,10 +37,12 @@ name = "datafusion_proto_common" [features] default = [] json = ["serde", "pbjson"] +parquet = ["datafusion-common/parquet", "dep:parquet"] [dependencies] arrow = { workspace = true } datafusion-common = { workspace = true } +parquet = { workspace = true, optional = true } pbjson = { workspace = true, optional = true } prost = { workspace = true } serde = { version = "1.0", optional = true } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 62c6bbe85612a..388bac272536f 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -603,6 +603,20 @@ message ParquetOptions { oneof max_predicate_cache_size_opt { uint64 max_predicate_cache_size = 33; } + + bool content_defined_chunking = 35; // default = false + + oneof cdc_min_chunk_size_opt { + uint64 cdc_min_chunk_size = 36; + } + + oneof cdc_max_chunk_size_opt { + uint64 cdc_max_chunk_size = 37; + } + + oneof cdc_norm_level_opt { + int32 cdc_norm_level = 38; + } } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index c616eadf295f2..9ec0f4b1cafe0 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1089,6 +1089,16 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + enable_content_defined_chunking: value.content_defined_chunking, + cdc_min_chunk_size: value.cdc_min_chunk_size_opt.map(|opt| match opt { + protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => v as usize, + }).unwrap_or(ParquetOptions::default().cdc_min_chunk_size), + cdc_max_chunk_size: value.cdc_max_chunk_size_opt.map(|opt| match opt { + protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => v as usize, + }).unwrap_or(ParquetOptions::default().cdc_max_chunk_size), + cdc_norm_level: value.cdc_norm_level_opt.map(|opt| match opt { + protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => v as i64, + }).unwrap_or(ParquetOptions::default().cdc_norm_level), }) } } @@ -1151,7 +1161,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { column_specific_options.insert(column_name.clone(), options.try_into()?); } } - Ok(TableParquetOptions { + let opts = TableParquetOptions { global: value .global .as_ref() @@ -1159,9 +1169,9 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { .unwrap() .unwrap(), column_specific_options, - key_value_metadata: Default::default(), - crypto: Default::default(), - }) + ..Default::default() + }; + Ok(opts) } } @@ -1261,3 +1271,89 @@ pub(crate) fn csv_writer_options_from_proto( .with_null(writer_options.null_value.clone()) .with_double_quote(writer_options.double_quote)) } + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_common::config::{ParquetOptions, TableParquetOptions}; + + fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions { + let proto: crate::protobuf_common::ParquetOptions = + (&opts).try_into().expect("to_proto"); + ParquetOptions::try_from(&proto).expect("from_proto") + } + + fn table_parquet_options_proto_round_trip( + opts: TableParquetOptions, + ) -> TableParquetOptions { + let proto: crate::protobuf_common::TableParquetOptions = + (&opts).try_into().expect("to_proto"); + TableParquetOptions::try_from(&proto).expect("from_proto") + } + + #[test] + fn test_parquet_options_cdc_disabled_round_trip() { + let opts = ParquetOptions::default(); + assert!(!opts.enable_content_defined_chunking); + let recovered = parquet_options_proto_round_trip(opts.clone()); + assert_eq!(opts, recovered); + } + + #[test] + fn test_parquet_options_cdc_enabled_round_trip() { + let opts = ParquetOptions { + enable_content_defined_chunking: true, + cdc_min_chunk_size: 128 * 1024, + cdc_max_chunk_size: 512 * 1024, + cdc_norm_level: 2, + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts.clone()); + assert_eq!(recovered.enable_content_defined_chunking, true); + assert_eq!(recovered.cdc_min_chunk_size, 128 * 1024); + assert_eq!(recovered.cdc_max_chunk_size, 512 * 1024); + assert_eq!(recovered.cdc_norm_level, 2); + } + + #[test] + fn test_parquet_options_cdc_negative_norm_level_round_trip() { + let opts = ParquetOptions { + enable_content_defined_chunking: true, + cdc_norm_level: -3, + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts); + assert_eq!(recovered.cdc_norm_level, -3); + } + + #[test] + fn test_table_parquet_options_cdc_round_trip() { + let mut opts = TableParquetOptions::default(); + opts.global.enable_content_defined_chunking = true; + opts.global.cdc_min_chunk_size = 64 * 1024; + opts.global.cdc_max_chunk_size = 2 * 1024 * 1024; + opts.global.cdc_norm_level = -1; + + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + assert_eq!(recovered.global.enable_content_defined_chunking, true); + assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024); + assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024); + assert_eq!(recovered.global.cdc_norm_level, -1); + } + + #[test] + fn test_table_parquet_options_cdc_disabled_round_trip() { + let opts = TableParquetOptions::default(); + assert!(!opts.global.enable_content_defined_chunking); + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + assert_eq!(recovered.global.enable_content_defined_chunking, false); + assert_eq!( + recovered.global.cdc_min_chunk_size, + ParquetOptions::default().cdc_min_chunk_size + ); + assert_eq!( + recovered.global.cdc_max_chunk_size, + ParquetOptions::default().cdc_max_chunk_size + ); + } +} diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b00e7546bba20..c639baf94cb5c 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -5728,6 +5728,18 @@ impl serde::Serialize for ParquetOptions { if self.max_predicate_cache_size_opt.is_some() { len += 1; } + if self.content_defined_chunking { + len += 1; + } + if self.cdc_min_chunk_size_opt.is_some() { + len += 1; + } + if self.cdc_max_chunk_size_opt.is_some() { + len += 1; + } + if self.cdc_norm_level_opt.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -5893,6 +5905,34 @@ impl serde::Serialize for ParquetOptions { } } } + if self.content_defined_chunking { + struct_ser.serialize_field("contentDefinedChunking", &self.content_defined_chunking)?; + } + if let Some(v) = self.cdc_min_chunk_size_opt.as_ref() { + match v { + parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("cdcMinChunkSize", ToString::to_string(&v).as_str())?; + } + } + } + if let Some(v) = self.cdc_max_chunk_size_opt.as_ref() { + match v { + parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("cdcMaxChunkSize", ToString::to_string(&v).as_str())?; + } + } + } + if let Some(v) = self.cdc_norm_level_opt.as_ref() { + match v { + parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => { + struct_ser.serialize_field("cdcNormLevel", v)?; + } + } + } struct_ser.end() } } @@ -5964,6 +6004,14 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "coerceInt96", "max_predicate_cache_size", "maxPredicateCacheSize", + "content_defined_chunking", + "contentDefinedChunking", + "cdc_min_chunk_size", + "cdcMinChunkSize", + "cdc_max_chunk_size", + "cdcMaxChunkSize", + "cdc_norm_level", + "cdcNormLevel", ]; #[allow(clippy::enum_variant_names)] @@ -6000,6 +6048,10 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterNdv, CoerceInt96, MaxPredicateCacheSize, + ContentDefinedChunking, + CdcMinChunkSize, + CdcMaxChunkSize, + CdcNormLevel, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6053,6 +6105,10 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), "maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize), + "contentDefinedChunking" | "content_defined_chunking" => Ok(GeneratedField::ContentDefinedChunking), + "cdcMinChunkSize" | "cdc_min_chunk_size" => Ok(GeneratedField::CdcMinChunkSize), + "cdcMaxChunkSize" | "cdc_max_chunk_size" => Ok(GeneratedField::CdcMaxChunkSize), + "cdcNormLevel" | "cdc_norm_level" => Ok(GeneratedField::CdcNormLevel), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6104,6 +6160,10 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_ndv_opt__ = None; let mut coerce_int96_opt__ = None; let mut max_predicate_cache_size_opt__ = None; + let mut content_defined_chunking__ = None; + let mut cdc_min_chunk_size_opt__ = None; + let mut cdc_max_chunk_size_opt__ = None; + let mut cdc_norm_level_opt__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::EnablePageIndex => { @@ -6312,6 +6372,30 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0)); } + GeneratedField::ContentDefinedChunking => { + if content_defined_chunking__.is_some() { + return Err(serde::de::Error::duplicate_field("contentDefinedChunking")); + } + content_defined_chunking__ = Some(map_.next_value()?); + } + GeneratedField::CdcMinChunkSize => { + if cdc_min_chunk_size_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("cdcMinChunkSize")); + } + cdc_min_chunk_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(x.0)); + } + GeneratedField::CdcMaxChunkSize => { + if cdc_max_chunk_size_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("cdcMaxChunkSize")); + } + cdc_max_chunk_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(x.0)); + } + GeneratedField::CdcNormLevel => { + if cdc_norm_level_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("cdcNormLevel")); + } + cdc_norm_level_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcNormLevelOpt::CdcNormLevel(x.0)); + } } } Ok(ParquetOptions { @@ -6347,6 +6431,10 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_ndv_opt: bloom_filter_ndv_opt__, coerce_int96_opt: coerce_int96_opt__, max_predicate_cache_size_opt: max_predicate_cache_size_opt__, + content_defined_chunking: content_defined_chunking__.unwrap_or_default(), + cdc_min_chunk_size_opt: cdc_min_chunk_size_opt__, + cdc_max_chunk_size_opt: cdc_max_chunk_size_opt__, + cdc_norm_level_opt: cdc_norm_level_opt__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a09826a29be52..1ba7fe702665a 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -872,6 +872,21 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + /// default = false + #[prost(bool, tag = "35")] + pub content_defined_chunking: bool, + #[prost(oneof = "parquet_options::CdcMinChunkSizeOpt", tags = "36")] + pub cdc_min_chunk_size_opt: ::core::option::Option< + parquet_options::CdcMinChunkSizeOpt, + >, + #[prost(oneof = "parquet_options::CdcMaxChunkSizeOpt", tags = "37")] + pub cdc_max_chunk_size_opt: ::core::option::Option< + parquet_options::CdcMaxChunkSizeOpt, + >, + #[prost(oneof = "parquet_options::CdcNormLevelOpt", tags = "38")] + pub cdc_norm_level_opt: ::core::option::Option< + parquet_options::CdcNormLevelOpt, + >, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -930,6 +945,21 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcMinChunkSizeOpt { + #[prost(uint64, tag = "36")] + CdcMinChunkSize(u64), + } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcMaxChunkSizeOpt { + #[prost(uint64, tag = "37")] + CdcMaxChunkSize(u64), + } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcNormLevelOpt { + #[prost(int32, tag = "38")] + CdcNormLevel(i32), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..87c39e205ce65 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -904,6 +904,16 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + content_defined_chunking: value.enable_content_defined_chunking, + cdc_min_chunk_size_opt: value.enable_content_defined_chunking.then(|| + protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(value.cdc_min_chunk_size as u64) + ), + cdc_max_chunk_size_opt: value.enable_content_defined_chunking.then(|| + protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(value.cdc_max_chunk_size as u64) + ), + cdc_norm_level_opt: value.enable_content_defined_chunking.then(|| + protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(value.cdc_norm_level as i32) + ), }) } } @@ -963,8 +973,11 @@ impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions { .iter() .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone()))) .collect::>(); + + let global: protobuf::ParquetOptions = (&value.global).try_into()?; + Ok(protobuf::TableParquetOptions { - global: Some((&value.global).try_into()?), + global: Some(global), column_specific_options, key_value_metadata, }) diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a09826a29be52..8c9af5ec75584 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -838,6 +838,9 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + /// default = false + #[prost(bool, tag = "35")] + pub content_defined_chunking: bool, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -872,6 +875,16 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + #[prost(oneof = "parquet_options::CdcMinChunkSizeOpt", tags = "36")] + pub cdc_min_chunk_size_opt: ::core::option::Option< + parquet_options::CdcMinChunkSizeOpt, + >, + #[prost(oneof = "parquet_options::CdcMaxChunkSizeOpt", tags = "37")] + pub cdc_max_chunk_size_opt: ::core::option::Option< + parquet_options::CdcMaxChunkSizeOpt, + >, + #[prost(oneof = "parquet_options::CdcNormLevelOpt", tags = "38")] + pub cdc_norm_level_opt: ::core::option::Option, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -930,6 +943,21 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcMinChunkSizeOpt { + #[prost(uint64, tag = "36")] + CdcMinChunkSize(u64), + } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcMaxChunkSizeOpt { + #[prost(uint64, tag = "37")] + CdcMaxChunkSize(u64), + } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum CdcNormLevelOpt { + #[prost(int32, tag = "38")] + CdcNormLevel(i32), + } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..c6b638f875732 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -525,6 +525,10 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + enable_content_defined_chunking: Default::default(), + cdc_min_chunk_size: Default::default(), + cdc_max_chunk_size: Default::default(), + cdc_norm_level: Default::default(), } } } @@ -585,7 +589,7 @@ mod parquet { .iter() .map(|(k, v)| (k.clone(), Some(v.clone()))) .collect(), - crypto: Default::default(), + ..Default::default() } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index a334bd25b0ce3..deb8a3ab5f345 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -235,6 +235,9 @@ datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false +datafusion.execution.parquet.cdc_max_chunk_size 1048576 +datafusion.execution.parquet.cdc_min_chunk_size 262144 +datafusion.execution.parquet.cdc_norm_level 0 datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) @@ -243,6 +246,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 datafusion.execution.parquet.data_pagesize_limit 1048576 datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 +datafusion.execution.parquet.enable_content_defined_chunking false datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.force_filter_selections false @@ -377,6 +381,9 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files +datafusion.execution.parquet.cdc_max_chunk_size 1048576 (writing) Maximum chunk size in bytes for content-defined chunking. The chunker will create a new chunk whenever the chunk size exceeds this value. Default is 1 MiB. Only used when `enable_content_defined_chunking` is true. +datafusion.execution.parquet.cdc_min_chunk_size 262144 (writing) Minimum chunk size in bytes for content-defined chunking. The rolling hash will not be updated until this size is reached for each chunk. Default is 256 KiB. Only used when `enable_content_defined_chunking` is true. +datafusion.execution.parquet.cdc_norm_level 0 (writing) Normalization level for content-defined chunking. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true. datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. @@ -385,6 +392,7 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes +datafusion.execution.parquet.enable_content_defined_chunking false (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When true, the other `cdc_*` options control the chunking behavior. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. From d63588c5052a92a963ea60fed519584f15fa759b Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Mon, 23 Mar 2026 12:36:52 +0100 Subject: [PATCH 2/9] fix: add CDC fields to datafusion-proto file_formats serialization --- .../proto/src/logical_plan/file_formats.rs | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index c6b638f875732..4dd6b7b4dd051 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -426,6 +426,16 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), + content_defined_chunking: global_options.global.enable_content_defined_chunking, + cdc_min_chunk_size_opt: global_options.global.enable_content_defined_chunking.then(|| { + parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(global_options.global.cdc_min_chunk_size as u64) + }), + cdc_max_chunk_size_opt: global_options.global.enable_content_defined_chunking.then(|| { + parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(global_options.global.cdc_max_chunk_size as u64) + }), + cdc_norm_level_opt: global_options.global.enable_content_defined_chunking.then(|| { + parquet_options::CdcNormLevelOpt::CdcNormLevel(global_options.global.cdc_norm_level as i32) + }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -525,10 +535,16 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), - enable_content_defined_chunking: Default::default(), - cdc_min_chunk_size: Default::default(), - cdc_max_chunk_size: Default::default(), - cdc_norm_level: Default::default(), + enable_content_defined_chunking: proto.content_defined_chunking, + cdc_min_chunk_size: proto.cdc_min_chunk_size_opt.as_ref().map(|opt| match opt { + parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => *v as usize, + }).unwrap_or(ParquetOptions::default().cdc_min_chunk_size), + cdc_max_chunk_size: proto.cdc_max_chunk_size_opt.as_ref().map(|opt| match opt { + parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => *v as usize, + }).unwrap_or(ParquetOptions::default().cdc_max_chunk_size), + cdc_norm_level: proto.cdc_norm_level_opt.as_ref().map(|opt| match opt { + parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => *v as i64, + }).unwrap_or(ParquetOptions::default().cdc_norm_level), } } } From 7b61b267831c027bb35fbccf40c1410adb217699 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Wed, 25 Mar 2026 17:26:27 +0100 Subject: [PATCH 3/9] chore: update parquet CDC options and proto serialization for arrow-rs 58.1 --- Cargo.lock | 1 + datafusion/common/src/config.rs | 41 +-- .../common/src/file_options/parquet_writer.rs | 70 +++-- .../datasource-parquet/src/file_format.rs | 2 +- .../proto/datafusion_common.proto | 18 +- datafusion/proto-common/src/from_proto/mod.rs | 81 +++--- .../proto-common/src/generated/pbjson.rs | 244 +++++++++++------- .../proto-common/src/generated/prost.rs | 41 +-- datafusion/proto-common/src/to_proto/mod.rs | 15 +- .../src/generated/datafusion_proto_common.rs | 39 +-- .../proto/src/logical_plan/file_formats.rs | 39 ++- .../test_files/information_schema.slt | 10 +- 12 files changed, 302 insertions(+), 299 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 895b3059f50c1..a5c071dc8e557 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2550,6 +2550,7 @@ dependencies = [ "arrow", "datafusion-common", "doc-comment", + "parquet", "pbjson 0.9.0", "prost", "serde", diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b311d7b960cf6..d27862a690510 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -687,6 +687,24 @@ config_namespace! { } } +config_namespace! { + /// Options for content-defined chunking (CDC) when writing parquet files. + /// See [`ParquetOptions::use_content_defined_chunking`]. + pub struct CdcOptions { + /// Minimum chunk size in bytes. The rolling hash will not trigger a split + /// until this many bytes have been accumulated. Default is 256 KiB. + pub min_chunk_size: usize, default = 256 * 1024 + + /// Maximum chunk size in bytes. A split is forced when the accumulated + /// size exceeds this value. Default is 1 MiB. + pub max_chunk_size: usize, default = 1024 * 1024 + + /// Normalization level. Increasing this improves deduplication ratio + /// but increases fragmentation. Recommended range is [-3, 3], default is 0. + pub norm_level: i64, default = 0 + } +} + config_namespace! { /// Options for reading and writing parquet files /// @@ -875,25 +893,10 @@ config_namespace! { pub maximum_buffered_record_batches_per_stream: usize, default = 2 /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing - /// parquet files. When true, the other `cdc_*` options control the chunking - /// behavior. When CDC is enabled, parallel writing is automatically disabled - /// since the chunker state must persist across row groups. - pub enable_content_defined_chunking: bool, default = false - - /// (writing) Minimum chunk size in bytes for content-defined chunking. - /// The rolling hash will not be updated until this size is reached for each chunk. - /// Default is 256 KiB. Only used when `enable_content_defined_chunking` is true. - pub cdc_min_chunk_size: usize, default = 256 * 1024 - - /// (writing) Maximum chunk size in bytes for content-defined chunking. - /// The chunker will create a new chunk whenever the chunk size exceeds this value. - /// Default is 1 MiB. Only used when `enable_content_defined_chunking` is true. - pub cdc_max_chunk_size: usize, default = 1024 * 1024 - - /// (writing) Normalization level for content-defined chunking. - /// Increasing this improves deduplication ratio but increases fragmentation. - /// Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true. - pub cdc_norm_level: i64, default = 0 + /// parquet files. When `Some`, CDC is enabled with the given options; when `None` + /// (the default), CDC is disabled. When CDC is enabled, parallel writing is + /// automatically disabled since the chunker state must persist across row groups. + pub use_content_defined_chunking: Option, default = None } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index d9d16892917d1..103719b21315d 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -191,10 +191,7 @@ impl ParquetOptions { bloom_filter_on_write, bloom_filter_fpp, bloom_filter_ndv, - enable_content_defined_chunking, - cdc_min_chunk_size, - cdc_max_chunk_size, - cdc_norm_level, + use_content_defined_chunking, // not in WriterProperties enable_page_index: _, @@ -251,12 +248,12 @@ impl ParquetOptions { if let Some(encoding) = encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); } - if *enable_content_defined_chunking { + if let Some(cdc) = use_content_defined_chunking { builder = builder.set_content_defined_chunking(Some( parquet::file::properties::CdcOptions { - min_chunk_size: *cdc_min_chunk_size, - max_chunk_size: *cdc_max_chunk_size, - norm_level: *cdc_norm_level as i32, + min_chunk_size: cdc.min_chunk_size, + max_chunk_size: cdc.max_chunk_size, + norm_level: cdc.norm_level as i32, }, )); } @@ -401,7 +398,9 @@ mod tests { use super::*; #[cfg(feature = "parquet_encryption")] use crate::config::ConfigFileEncryptionProperties; - use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions}; + use crate::config::{ + CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions, + }; use crate::parquet_config::DFParquetWriterVersion; use parquet::basic::Compression; use parquet::file::properties::{ @@ -473,10 +472,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, - enable_content_defined_chunking: defaults.enable_content_defined_chunking, - cdc_min_chunk_size: defaults.cdc_min_chunk_size, - cdc_max_chunk_size: defaults.cdc_max_chunk_size, - cdc_norm_level: defaults.cdc_norm_level, + use_content_defined_chunking: defaults.use_content_defined_chunking.clone(), } } @@ -593,21 +589,13 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, - enable_content_defined_chunking: props - .content_defined_chunking() - .is_some(), - cdc_min_chunk_size: props - .content_defined_chunking() - .map(|c| c.min_chunk_size) - .unwrap_or(global_options_defaults.cdc_min_chunk_size), - cdc_max_chunk_size: props - .content_defined_chunking() - .map(|c| c.max_chunk_size) - .unwrap_or(global_options_defaults.cdc_max_chunk_size), - cdc_norm_level: props - .content_defined_chunking() - .map(|c| c.norm_level as i64) - .unwrap_or(global_options_defaults.cdc_norm_level), + use_content_defined_chunking: props.content_defined_chunking().map(|c| { + CdcOptions { + min_chunk_size: c.min_chunk_size, + max_chunk_size: c.max_chunk_size, + norm_level: c.norm_level as i64, + } + }), }, column_specific_options, key_value_metadata, @@ -821,10 +809,11 @@ mod tests { #[test] fn test_cdc_enabled_with_custom_options() { let mut opts = TableParquetOptions::default(); - opts.global.enable_content_defined_chunking = true; - opts.global.cdc_min_chunk_size = 128 * 1024; - opts.global.cdc_max_chunk_size = 512 * 1024; - opts.global.cdc_norm_level = 2; + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }); opts.arrow_schema(&Arc::new(Schema::empty())); let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); @@ -846,19 +835,20 @@ mod tests { #[test] fn test_cdc_round_trip_through_writer_props() { let mut opts = TableParquetOptions::default(); - opts.global.enable_content_defined_chunking = true; - opts.global.cdc_min_chunk_size = 64 * 1024; - opts.global.cdc_max_chunk_size = 2 * 1024 * 1024; - opts.global.cdc_norm_level = -1; + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 64 * 1024, + max_chunk_size: 2 * 1024 * 1024, + norm_level: -1, + }); opts.arrow_schema(&Arc::new(Schema::empty())); let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); let recovered = session_config_from_writer_props(&props); - assert_eq!(recovered.global.enable_content_defined_chunking, true); - assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024); - assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024); - assert_eq!(recovered.global.cdc_norm_level, -1); + let cdc = recovered.global.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 64 * 1024); + assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); + assert_eq!(cdc.norm_level, -1); } #[test] diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index be922b9349421..c4faedf571f6d 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1373,7 +1373,7 @@ impl FileSink for ParquetSink { // CDC requires the sequential writer: the chunker state lives in ArrowWriter // and persists across row groups. The parallel path bypasses ArrowWriter entirely. if !parquet_opts.global.allow_single_file_parallelism - || parquet_opts.global.enable_content_defined_chunking + || parquet_opts.global.use_content_defined_chunking.is_some() { let mut writer = self .create_async_arrow_writer( diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 388bac272536f..31ece63577b4f 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -604,19 +604,13 @@ message ParquetOptions { uint64 max_predicate_cache_size = 33; } - bool content_defined_chunking = 35; // default = false - - oneof cdc_min_chunk_size_opt { - uint64 cdc_min_chunk_size = 36; - } - - oneof cdc_max_chunk_size_opt { - uint64 cdc_max_chunk_size = 37; - } + CdcOptions content_defined_chunking = 35; +} - oneof cdc_norm_level_opt { - int32 cdc_norm_level = 38; - } +message CdcOptions { + uint64 min_chunk_size = 1; + uint64 max_chunk_size = 2; + int32 norm_level = 3; } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 9ec0f4b1cafe0..3524d80640dba 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -39,7 +39,7 @@ use datafusion_common::{ DataFusionError, JoinSide, ScalarValue, Statistics, TableReference, arrow_datafusion_err, config::{ - CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, + CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, }, file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, @@ -1089,16 +1089,14 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), - enable_content_defined_chunking: value.content_defined_chunking, - cdc_min_chunk_size: value.cdc_min_chunk_size_opt.map(|opt| match opt { - protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => v as usize, - }).unwrap_or(ParquetOptions::default().cdc_min_chunk_size), - cdc_max_chunk_size: value.cdc_max_chunk_size_opt.map(|opt| match opt { - protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => v as usize, - }).unwrap_or(ParquetOptions::default().cdc_max_chunk_size), - cdc_norm_level: value.cdc_norm_level_opt.map(|opt| match opt { - protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => v as i64, - }).unwrap_or(ParquetOptions::default().cdc_norm_level), + use_content_defined_chunking: value.content_defined_chunking.map(|cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, + max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, + norm_level: cdc.norm_level as i64, + } + }), }) } } @@ -1275,7 +1273,7 @@ pub(crate) fn csv_writer_options_from_proto( #[cfg(test)] mod tests { use super::*; - use datafusion_common::config::{ParquetOptions, TableParquetOptions}; + use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions}; fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions { let proto: crate::protobuf_common::ParquetOptions = @@ -1294,7 +1292,7 @@ mod tests { #[test] fn test_parquet_options_cdc_disabled_round_trip() { let opts = ParquetOptions::default(); - assert!(!opts.enable_content_defined_chunking); + assert!(opts.use_content_defined_chunking.is_none()); let recovered = parquet_options_proto_round_trip(opts.clone()); assert_eq!(opts, recovered); } @@ -1302,58 +1300,57 @@ mod tests { #[test] fn test_parquet_options_cdc_enabled_round_trip() { let opts = ParquetOptions { - enable_content_defined_chunking: true, - cdc_min_chunk_size: 128 * 1024, - cdc_max_chunk_size: 512 * 1024, - cdc_norm_level: 2, + use_content_defined_chunking: Some(CdcOptions { + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }), ..ParquetOptions::default() }; let recovered = parquet_options_proto_round_trip(opts.clone()); - assert_eq!(recovered.enable_content_defined_chunking, true); - assert_eq!(recovered.cdc_min_chunk_size, 128 * 1024); - assert_eq!(recovered.cdc_max_chunk_size, 512 * 1024); - assert_eq!(recovered.cdc_norm_level, 2); + let cdc = recovered.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 128 * 1024); + assert_eq!(cdc.max_chunk_size, 512 * 1024); + assert_eq!(cdc.norm_level, 2); } #[test] fn test_parquet_options_cdc_negative_norm_level_round_trip() { let opts = ParquetOptions { - enable_content_defined_chunking: true, - cdc_norm_level: -3, + use_content_defined_chunking: Some(CdcOptions { + norm_level: -3, + ..CdcOptions::default() + }), ..ParquetOptions::default() }; let recovered = parquet_options_proto_round_trip(opts); - assert_eq!(recovered.cdc_norm_level, -3); + assert_eq!( + recovered.use_content_defined_chunking.unwrap().norm_level, + -3 + ); } #[test] fn test_table_parquet_options_cdc_round_trip() { let mut opts = TableParquetOptions::default(); - opts.global.enable_content_defined_chunking = true; - opts.global.cdc_min_chunk_size = 64 * 1024; - opts.global.cdc_max_chunk_size = 2 * 1024 * 1024; - opts.global.cdc_norm_level = -1; + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 64 * 1024, + max_chunk_size: 2 * 1024 * 1024, + norm_level: -1, + }); let recovered = table_parquet_options_proto_round_trip(opts.clone()); - assert_eq!(recovered.global.enable_content_defined_chunking, true); - assert_eq!(recovered.global.cdc_min_chunk_size, 64 * 1024); - assert_eq!(recovered.global.cdc_max_chunk_size, 2 * 1024 * 1024); - assert_eq!(recovered.global.cdc_norm_level, -1); + let cdc = recovered.global.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 64 * 1024); + assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); + assert_eq!(cdc.norm_level, -1); } #[test] fn test_table_parquet_options_cdc_disabled_round_trip() { let opts = TableParquetOptions::default(); - assert!(!opts.global.enable_content_defined_chunking); + assert!(opts.global.use_content_defined_chunking.is_none()); let recovered = table_parquet_options_proto_round_trip(opts.clone()); - assert_eq!(recovered.global.enable_content_defined_chunking, false); - assert_eq!( - recovered.global.cdc_min_chunk_size, - ParquetOptions::default().cdc_min_chunk_size - ); - assert_eq!( - recovered.global.cdc_max_chunk_size, - ParquetOptions::default().cdc_max_chunk_size - ); + assert!(recovered.global.use_content_defined_chunking.is_none()); } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index c639baf94cb5c..77a3b71488ece 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -883,6 +883,144 @@ impl<'de> serde::Deserialize<'de> for AvroOptions { deserializer.deserialize_struct("datafusion_common.AvroOptions", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for CdcOptions { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.min_chunk_size != 0 { + len += 1; + } + if self.max_chunk_size != 0 { + len += 1; + } + if self.norm_level != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion_common.CdcOptions", len)?; + if self.min_chunk_size != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("minChunkSize", ToString::to_string(&self.min_chunk_size).as_str())?; + } + if self.max_chunk_size != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("maxChunkSize", ToString::to_string(&self.max_chunk_size).as_str())?; + } + if self.norm_level != 0 { + struct_ser.serialize_field("normLevel", &self.norm_level)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for CdcOptions { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "min_chunk_size", + "minChunkSize", + "max_chunk_size", + "maxChunkSize", + "norm_level", + "normLevel", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + MinChunkSize, + MaxChunkSize, + NormLevel, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "minChunkSize" | "min_chunk_size" => Ok(GeneratedField::MinChunkSize), + "maxChunkSize" | "max_chunk_size" => Ok(GeneratedField::MaxChunkSize), + "normLevel" | "norm_level" => Ok(GeneratedField::NormLevel), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CdcOptions; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion_common.CdcOptions") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut min_chunk_size__ = None; + let mut max_chunk_size__ = None; + let mut norm_level__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::MinChunkSize => { + if min_chunk_size__.is_some() { + return Err(serde::de::Error::duplicate_field("minChunkSize")); + } + min_chunk_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::MaxChunkSize => { + if max_chunk_size__.is_some() { + return Err(serde::de::Error::duplicate_field("maxChunkSize")); + } + max_chunk_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::NormLevel => { + if norm_level__.is_some() { + return Err(serde::de::Error::duplicate_field("normLevel")); + } + norm_level__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(CdcOptions { + min_chunk_size: min_chunk_size__.unwrap_or_default(), + max_chunk_size: max_chunk_size__.unwrap_or_default(), + norm_level: norm_level__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion_common.CdcOptions", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for Column { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -5695,6 +5833,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { len += 1; } + if self.content_defined_chunking.is_some() { + len += 1; + } if self.metadata_size_hint_opt.is_some() { len += 1; } @@ -5728,18 +5869,6 @@ impl serde::Serialize for ParquetOptions { if self.max_predicate_cache_size_opt.is_some() { len += 1; } - if self.content_defined_chunking { - len += 1; - } - if self.cdc_min_chunk_size_opt.is_some() { - len += 1; - } - if self.cdc_max_chunk_size_opt.is_some() { - len += 1; - } - if self.cdc_norm_level_opt.is_some() { - len += 1; - } let mut struct_ser = serializer.serialize_struct("datafusion_common.ParquetOptions", len)?; if self.enable_page_index { struct_ser.serialize_field("enablePageIndex", &self.enable_page_index)?; @@ -5818,6 +5947,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { struct_ser.serialize_field("createdBy", &self.created_by)?; } + if let Some(v) = self.content_defined_chunking.as_ref() { + struct_ser.serialize_field("contentDefinedChunking", v)?; + } if let Some(v) = self.metadata_size_hint_opt.as_ref() { match v { parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => { @@ -5905,34 +6037,6 @@ impl serde::Serialize for ParquetOptions { } } } - if self.content_defined_chunking { - struct_ser.serialize_field("contentDefinedChunking", &self.content_defined_chunking)?; - } - if let Some(v) = self.cdc_min_chunk_size_opt.as_ref() { - match v { - parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => { - #[allow(clippy::needless_borrow)] - #[allow(clippy::needless_borrows_for_generic_args)] - struct_ser.serialize_field("cdcMinChunkSize", ToString::to_string(&v).as_str())?; - } - } - } - if let Some(v) = self.cdc_max_chunk_size_opt.as_ref() { - match v { - parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => { - #[allow(clippy::needless_borrow)] - #[allow(clippy::needless_borrows_for_generic_args)] - struct_ser.serialize_field("cdcMaxChunkSize", ToString::to_string(&v).as_str())?; - } - } - } - if let Some(v) = self.cdc_norm_level_opt.as_ref() { - match v { - parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => { - struct_ser.serialize_field("cdcNormLevel", v)?; - } - } - } struct_ser.end() } } @@ -5984,6 +6088,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maxRowGroupSize", "created_by", "createdBy", + "content_defined_chunking", + "contentDefinedChunking", "metadata_size_hint", "metadataSizeHint", "compression", @@ -6004,14 +6110,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "coerceInt96", "max_predicate_cache_size", "maxPredicateCacheSize", - "content_defined_chunking", - "contentDefinedChunking", - "cdc_min_chunk_size", - "cdcMinChunkSize", - "cdc_max_chunk_size", - "cdcMaxChunkSize", - "cdc_norm_level", - "cdcNormLevel", ]; #[allow(clippy::enum_variant_names)] @@ -6037,6 +6135,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { DataPageRowCountLimit, MaxRowGroupSize, CreatedBy, + ContentDefinedChunking, MetadataSizeHint, Compression, DictionaryEnabled, @@ -6048,10 +6147,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterNdv, CoerceInt96, MaxPredicateCacheSize, - ContentDefinedChunking, - CdcMinChunkSize, - CdcMaxChunkSize, - CdcNormLevel, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6094,6 +6189,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), "createdBy" | "created_by" => Ok(GeneratedField::CreatedBy), + "contentDefinedChunking" | "content_defined_chunking" => Ok(GeneratedField::ContentDefinedChunking), "metadataSizeHint" | "metadata_size_hint" => Ok(GeneratedField::MetadataSizeHint), "compression" => Ok(GeneratedField::Compression), "dictionaryEnabled" | "dictionary_enabled" => Ok(GeneratedField::DictionaryEnabled), @@ -6105,10 +6201,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), "maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize), - "contentDefinedChunking" | "content_defined_chunking" => Ok(GeneratedField::ContentDefinedChunking), - "cdcMinChunkSize" | "cdc_min_chunk_size" => Ok(GeneratedField::CdcMinChunkSize), - "cdcMaxChunkSize" | "cdc_max_chunk_size" => Ok(GeneratedField::CdcMaxChunkSize), - "cdcNormLevel" | "cdc_norm_level" => Ok(GeneratedField::CdcNormLevel), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6149,6 +6241,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; let mut created_by__ = None; + let mut content_defined_chunking__ = None; let mut metadata_size_hint_opt__ = None; let mut compression_opt__ = None; let mut dictionary_enabled_opt__ = None; @@ -6160,10 +6253,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_ndv_opt__ = None; let mut coerce_int96_opt__ = None; let mut max_predicate_cache_size_opt__ = None; - let mut content_defined_chunking__ = None; - let mut cdc_min_chunk_size_opt__ = None; - let mut cdc_max_chunk_size_opt__ = None; - let mut cdc_norm_level_opt__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::EnablePageIndex => { @@ -6306,6 +6395,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } created_by__ = Some(map_.next_value()?); } + GeneratedField::ContentDefinedChunking => { + if content_defined_chunking__.is_some() { + return Err(serde::de::Error::duplicate_field("contentDefinedChunking")); + } + content_defined_chunking__ = map_.next_value()?; + } GeneratedField::MetadataSizeHint => { if metadata_size_hint_opt__.is_some() { return Err(serde::de::Error::duplicate_field("metadataSizeHint")); @@ -6372,30 +6467,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0)); } - GeneratedField::ContentDefinedChunking => { - if content_defined_chunking__.is_some() { - return Err(serde::de::Error::duplicate_field("contentDefinedChunking")); - } - content_defined_chunking__ = Some(map_.next_value()?); - } - GeneratedField::CdcMinChunkSize => { - if cdc_min_chunk_size_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("cdcMinChunkSize")); - } - cdc_min_chunk_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(x.0)); - } - GeneratedField::CdcMaxChunkSize => { - if cdc_max_chunk_size_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("cdcMaxChunkSize")); - } - cdc_max_chunk_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(x.0)); - } - GeneratedField::CdcNormLevel => { - if cdc_norm_level_opt__.is_some() { - return Err(serde::de::Error::duplicate_field("cdcNormLevel")); - } - cdc_norm_level_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::CdcNormLevelOpt::CdcNormLevel(x.0)); - } } } Ok(ParquetOptions { @@ -6420,6 +6491,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), created_by: created_by__.unwrap_or_default(), + content_defined_chunking: content_defined_chunking__, metadata_size_hint_opt: metadata_size_hint_opt__, compression_opt: compression_opt__, dictionary_enabled_opt: dictionary_enabled_opt__, @@ -6431,10 +6503,6 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_ndv_opt: bloom_filter_ndv_opt__, coerce_int96_opt: coerce_int96_opt__, max_predicate_cache_size_opt: max_predicate_cache_size_opt__, - content_defined_chunking: content_defined_chunking__.unwrap_or_default(), - cdc_min_chunk_size_opt: cdc_min_chunk_size_opt__, - cdc_max_chunk_size_opt: cdc_max_chunk_size_opt__, - cdc_norm_level_opt: cdc_norm_level_opt__, }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 1ba7fe702665a..1251a51ab0983 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -838,6 +838,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + #[prost(message, optional, tag = "35")] + pub content_defined_chunking: ::core::option::Option, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -872,21 +874,6 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, - /// default = false - #[prost(bool, tag = "35")] - pub content_defined_chunking: bool, - #[prost(oneof = "parquet_options::CdcMinChunkSizeOpt", tags = "36")] - pub cdc_min_chunk_size_opt: ::core::option::Option< - parquet_options::CdcMinChunkSizeOpt, - >, - #[prost(oneof = "parquet_options::CdcMaxChunkSizeOpt", tags = "37")] - pub cdc_max_chunk_size_opt: ::core::option::Option< - parquet_options::CdcMaxChunkSizeOpt, - >, - #[prost(oneof = "parquet_options::CdcNormLevelOpt", tags = "38")] - pub cdc_norm_level_opt: ::core::option::Option< - parquet_options::CdcNormLevelOpt, - >, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -945,21 +932,15 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcMinChunkSizeOpt { - #[prost(uint64, tag = "36")] - CdcMinChunkSize(u64), - } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcMaxChunkSizeOpt { - #[prost(uint64, tag = "37")] - CdcMaxChunkSize(u64), - } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcNormLevelOpt { - #[prost(int32, tag = "38")] - CdcNormLevel(i32), - } +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CdcOptions { + #[prost(uint64, tag = "1")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "2")] + pub max_chunk_size: u64, + #[prost(int32, tag = "3")] + pub norm_level: i32, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 87c39e205ce65..dbe3add2bdb89 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -904,15 +904,12 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), - content_defined_chunking: value.enable_content_defined_chunking, - cdc_min_chunk_size_opt: value.enable_content_defined_chunking.then(|| - protobuf::parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(value.cdc_min_chunk_size as u64) - ), - cdc_max_chunk_size_opt: value.enable_content_defined_chunking.then(|| - protobuf::parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(value.cdc_max_chunk_size as u64) - ), - cdc_norm_level_opt: value.enable_content_defined_chunking.then(|| - protobuf::parquet_options::CdcNormLevelOpt::CdcNormLevel(value.cdc_norm_level as i32) + content_defined_chunking: value.use_content_defined_chunking.as_ref().map(|cdc| + protobuf::CdcOptions { + min_chunk_size: cdc.min_chunk_size as u64, + max_chunk_size: cdc.max_chunk_size as u64, + norm_level: cdc.norm_level as i32, + } ), }) } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 8c9af5ec75584..1251a51ab0983 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -838,9 +838,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, - /// default = false - #[prost(bool, tag = "35")] - pub content_defined_chunking: bool, + #[prost(message, optional, tag = "35")] + pub content_defined_chunking: ::core::option::Option, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -875,16 +874,6 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, - #[prost(oneof = "parquet_options::CdcMinChunkSizeOpt", tags = "36")] - pub cdc_min_chunk_size_opt: ::core::option::Option< - parquet_options::CdcMinChunkSizeOpt, - >, - #[prost(oneof = "parquet_options::CdcMaxChunkSizeOpt", tags = "37")] - pub cdc_max_chunk_size_opt: ::core::option::Option< - parquet_options::CdcMaxChunkSizeOpt, - >, - #[prost(oneof = "parquet_options::CdcNormLevelOpt", tags = "38")] - pub cdc_norm_level_opt: ::core::option::Option, } /// Nested message and enum types in `ParquetOptions`. pub mod parquet_options { @@ -943,21 +932,15 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcMinChunkSizeOpt { - #[prost(uint64, tag = "36")] - CdcMinChunkSize(u64), - } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcMaxChunkSizeOpt { - #[prost(uint64, tag = "37")] - CdcMaxChunkSize(u64), - } - #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] - pub enum CdcNormLevelOpt { - #[prost(int32, tag = "38")] - CdcNormLevel(i32), - } +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CdcOptions { + #[prost(uint64, tag = "1")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "2")] + pub max_chunk_size: u64, + #[prost(int32, tag = "3")] + pub norm_level: i32, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 4dd6b7b4dd051..4fc8212a73621 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -351,13 +351,13 @@ mod parquet { use super::*; use crate::protobuf::{ - ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions, - ParquetOptions as ParquetOptionsProto, + CdcOptions as CdcOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto, + ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto, TableParquetOptions as TableParquetOptionsProto, parquet_column_options, parquet_options, }; use datafusion_common::config::{ - ParquetColumnOptions, ParquetOptions, TableParquetOptions, + CdcOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, }; use datafusion_datasource_parquet::file_format::ParquetFormatFactory; @@ -426,15 +426,12 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), - content_defined_chunking: global_options.global.enable_content_defined_chunking, - cdc_min_chunk_size_opt: global_options.global.enable_content_defined_chunking.then(|| { - parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(global_options.global.cdc_min_chunk_size as u64) - }), - cdc_max_chunk_size_opt: global_options.global.enable_content_defined_chunking.then(|| { - parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(global_options.global.cdc_max_chunk_size as u64) - }), - cdc_norm_level_opt: global_options.global.enable_content_defined_chunking.then(|| { - parquet_options::CdcNormLevelOpt::CdcNormLevel(global_options.global.cdc_norm_level as i32) + content_defined_chunking: global_options.global.use_content_defined_chunking.as_ref().map(|cdc| { + CdcOptionsProto { + min_chunk_size: cdc.min_chunk_size as u64, + max_chunk_size: cdc.max_chunk_size as u64, + norm_level: cdc.norm_level as i32, + } }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { @@ -535,16 +532,14 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), - enable_content_defined_chunking: proto.content_defined_chunking, - cdc_min_chunk_size: proto.cdc_min_chunk_size_opt.as_ref().map(|opt| match opt { - parquet_options::CdcMinChunkSizeOpt::CdcMinChunkSize(v) => *v as usize, - }).unwrap_or(ParquetOptions::default().cdc_min_chunk_size), - cdc_max_chunk_size: proto.cdc_max_chunk_size_opt.as_ref().map(|opt| match opt { - parquet_options::CdcMaxChunkSizeOpt::CdcMaxChunkSize(v) => *v as usize, - }).unwrap_or(ParquetOptions::default().cdc_max_chunk_size), - cdc_norm_level: proto.cdc_norm_level_opt.as_ref().map(|opt| match opt { - parquet_options::CdcNormLevelOpt::CdcNormLevel(v) => *v as i64, - }).unwrap_or(ParquetOptions::default().cdc_norm_level), + use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, + max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, + norm_level: cdc.norm_level as i64, + } + }), } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index deb8a3ab5f345..ed92055e0a3ae 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -235,9 +235,6 @@ datafusion.execution.parquet.bloom_filter_fpp NULL datafusion.execution.parquet.bloom_filter_ndv NULL datafusion.execution.parquet.bloom_filter_on_read true datafusion.execution.parquet.bloom_filter_on_write false -datafusion.execution.parquet.cdc_max_chunk_size 1048576 -datafusion.execution.parquet.cdc_min_chunk_size 262144 -datafusion.execution.parquet.cdc_norm_level 0 datafusion.execution.parquet.coerce_int96 NULL datafusion.execution.parquet.column_index_truncate_length 64 datafusion.execution.parquet.compression zstd(3) @@ -246,7 +243,6 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 datafusion.execution.parquet.data_pagesize_limit 1048576 datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 -datafusion.execution.parquet.enable_content_defined_chunking false datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.force_filter_selections false @@ -263,6 +259,7 @@ datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.statistics_truncate_length 64 +datafusion.execution.parquet.use_content_defined_chunking NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.perfect_hash_join_min_key_density 0.15 @@ -381,9 +378,6 @@ datafusion.execution.parquet.bloom_filter_fpp NULL (writing) Sets bloom filter f datafusion.execution.parquet.bloom_filter_ndv NULL (writing) Sets bloom filter number of distinct values. If NULL, uses default parquet writer setting datafusion.execution.parquet.bloom_filter_on_read true (reading) Use any available bloom filters when reading parquet files datafusion.execution.parquet.bloom_filter_on_write false (writing) Write bloom filters for all columns when creating parquet files -datafusion.execution.parquet.cdc_max_chunk_size 1048576 (writing) Maximum chunk size in bytes for content-defined chunking. The chunker will create a new chunk whenever the chunk size exceeds this value. Default is 1 MiB. Only used when `enable_content_defined_chunking` is true. -datafusion.execution.parquet.cdc_min_chunk_size 262144 (writing) Minimum chunk size in bytes for content-defined chunking. The rolling hash will not be updated until this size is reached for each chunk. Default is 256 KiB. Only used when `enable_content_defined_chunking` is true. -datafusion.execution.parquet.cdc_norm_level 0 (writing) Normalization level for content-defined chunking. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0. Only used when `enable_content_defined_chunking` is true. datafusion.execution.parquet.coerce_int96 NULL (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. datafusion.execution.parquet.column_index_truncate_length 64 (writing) Sets column index truncate length datafusion.execution.parquet.compression zstd(3) (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. @@ -392,7 +386,6 @@ datafusion.execution.parquet.data_page_row_count_limit 20000 (writing) Sets best datafusion.execution.parquet.data_pagesize_limit 1048576 (writing) Sets best effort maximum size of data page in bytes datafusion.execution.parquet.dictionary_enabled true (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting datafusion.execution.parquet.dictionary_page_size_limit 1048576 (writing) Sets best effort maximum dictionary page size, in bytes -datafusion.execution.parquet.enable_content_defined_chunking false (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When true, the other `cdc_*` options control the chunking behavior. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.enable_page_index true (reading) If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. @@ -409,6 +402,7 @@ datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding t datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting +datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When Some, CDC is enabled with the given options; when None (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. From 9f12d59ae7d9037b7e83b3665d9c97b370965913 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Wed, 25 Mar 2026 21:19:51 +0100 Subject: [PATCH 4/9] fix: remove unused parquet dep from proto-common, regenerate configs.md --- Cargo.lock | 1 - datafusion/proto-common/Cargo.toml | 3 +-- docs/source/user-guide/configs.md | 1 + 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5c071dc8e557..895b3059f50c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2550,7 +2550,6 @@ dependencies = [ "arrow", "datafusion-common", "doc-comment", - "parquet", "pbjson 0.9.0", "prost", "serde", diff --git a/datafusion/proto-common/Cargo.toml b/datafusion/proto-common/Cargo.toml index 9de3494ec1e21..ca5ee1305c57f 100644 --- a/datafusion/proto-common/Cargo.toml +++ b/datafusion/proto-common/Cargo.toml @@ -37,12 +37,11 @@ name = "datafusion_proto_common" [features] default = [] json = ["serde", "pbjson"] -parquet = ["datafusion-common/parquet", "dep:parquet"] +parquet = ["datafusion-common/parquet"] [dependencies] arrow = { workspace = true } datafusion-common = { workspace = true } -parquet = { workspace = true, optional = true } pbjson = { workspace = true, optional = true } prost = { workspace = true } serde = { version = "1.0", optional = true } diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 69627e3cb9148..be42f4a0becb8 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -112,6 +112,7 @@ The following configuration settings are available: | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.parquet.use_content_defined_chunking | NULL | (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. | From 4ee40e0a732185c9431d4ea4b862dd19a6068b90 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Thu, 26 Mar 2026 09:11:38 +0100 Subject: [PATCH 5/9] fix: use backtick formatting for Some/None in SHOW ALL VERBOSE expected output --- datafusion/sqllogictest/test_files/information_schema.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index ed92055e0a3ae..77ae1d335fb8d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -402,7 +402,7 @@ datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding t datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting -datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When Some, CDC is enabled with the given options; when None (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. +datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. From c7da1ce6a9041869b81e25b090d4b86169595694 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Fri, 27 Mar 2026 14:01:00 +0100 Subject: [PATCH 6/9] =?UTF-8?q?fix:=20CDC=20review=20fixes=20=E2=80=94=20u?= =?UTF-8?q?se=20i32=20for=20norm=5Flevel,=20validate=20chunk=20sizes,=20cl?= =?UTF-8?q?ean=20up=20proto=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Change `CdcOptions::norm_level` from `i64` to `i32` to match parquet's type, replacing `config_field!(i64)` with `config_field!(i32)` and removing all now-unnecessary `as i32`/`as i64` casts - Add validation in `into_writer_properties_builder` for invalid CDC config: `min_chunk_size == 0` and `max_chunk_size <= min_chunk_size`, returning a proper `DataFusionError::Configuration` instead of panicking in the parquet layer - Add explanatory comments on why proto zero-value handling is asymmetric between chunk sizes (0 is invalid) and `norm_level` (0 is valid default) - Remove extra blank line in `ParquetOptions` config block - Remove unused `parquet` feature from `datafusion-proto-common/Cargo.toml` - Add three validation tests for the new error paths --- datafusion/common/src/config.rs | 5 +-- .../common/src/file_options/parquet_writer.rs | 38 ++++++++++++++++++- datafusion/proto-common/Cargo.toml | 1 - datafusion/proto-common/src/from_proto/mod.rs | 5 ++- datafusion/proto-common/src/to_proto/mod.rs | 2 +- .../proto/src/logical_plan/file_formats.rs | 7 +++- 6 files changed, 48 insertions(+), 10 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d27862a690510..d80c4efc1225e 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -701,7 +701,7 @@ config_namespace! { /// Normalization level. Increasing this improves deduplication ratio /// but increases fragmentation. Recommended range is [-3, 3], default is 0. - pub norm_level: i64, default = 0 + pub norm_level: i32, default = 0 } } @@ -863,7 +863,6 @@ config_namespace! { /// default parquet writer setting pub bloom_filter_ndv: Option, default = None - /// (writing) Controls whether DataFusion will attempt to speed up writing /// parquet files by serializing them in parallel. Each column /// in each row group in each output file are serialized in parallel @@ -1851,7 +1850,7 @@ config_field!(usize); config_field!(f64); config_field!(u64); config_field!(u32); -config_field!(i64); +config_field!(i32); impl ConfigField for u8 { fn visit(&self, v: &mut V, key: &str, description: &'static str) { diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 103719b21315d..eaf5a1642e8e2 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -249,11 +249,22 @@ impl ParquetOptions { builder = builder.set_encoding(parse_encoding_string(encoding)?); } if let Some(cdc) = use_content_defined_chunking { + if cdc.min_chunk_size == 0 { + return Err(DataFusionError::Configuration( + "CDC min_chunk_size must be greater than 0".to_string(), + )); + } + if cdc.max_chunk_size <= cdc.min_chunk_size { + return Err(DataFusionError::Configuration(format!( + "CDC max_chunk_size ({}) must be greater than min_chunk_size ({})", + cdc.max_chunk_size, cdc.min_chunk_size + ))); + } builder = builder.set_content_defined_chunking(Some( parquet::file::properties::CdcOptions { min_chunk_size: cdc.min_chunk_size, max_chunk_size: cdc.max_chunk_size, - norm_level: cdc.norm_level as i32, + norm_level: cdc.norm_level, }, )); } @@ -593,7 +604,7 @@ mod tests { CdcOptions { min_chunk_size: c.min_chunk_size, max_chunk_size: c.max_chunk_size, - norm_level: c.norm_level as i64, + norm_level: c.norm_level, } }), }, @@ -851,6 +862,29 @@ mod tests { assert_eq!(cdc.norm_level, -1); } + #[test] + fn test_cdc_validation_zero_min_chunk_size() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 0, + ..CdcOptions::default() + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + assert!(WriterPropertiesBuilder::try_from(&opts).is_err()); + } + + #[test] + fn test_cdc_validation_max_not_greater_than_min() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 512 * 1024, + max_chunk_size: 256 * 1024, + ..CdcOptions::default() + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + assert!(WriterPropertiesBuilder::try_from(&opts).is_err()); + } + #[test] fn test_bloom_filter_set_ndv_only() { // the TableParquetOptions::default, with only ndv set diff --git a/datafusion/proto-common/Cargo.toml b/datafusion/proto-common/Cargo.toml index ca5ee1305c57f..46dae36ba40ed 100644 --- a/datafusion/proto-common/Cargo.toml +++ b/datafusion/proto-common/Cargo.toml @@ -37,7 +37,6 @@ name = "datafusion_proto_common" [features] default = [] json = ["serde", "pbjson"] -parquet = ["datafusion-common/parquet"] [dependencies] arrow = { workspace = true } diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 3524d80640dba..e428475df8ad3 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1092,9 +1092,12 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { use_content_defined_chunking: value.content_defined_chunking.map(|cdc| { let defaults = CdcOptions::default(); CdcOptions { + // proto3 uses 0 as the wire default for uint64; a zero chunk size is + // invalid, so treat it as "field not set" and fall back to the default. min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, - norm_level: cdc.norm_level as i64, + // norm_level = 0 is a valid value (and the default), so pass it through directly. + norm_level: cdc.norm_level, } }), }) diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index dbe3add2bdb89..65089f029b866 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -908,7 +908,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { protobuf::CdcOptions { min_chunk_size: cdc.min_chunk_size as u64, max_chunk_size: cdc.max_chunk_size as u64, - norm_level: cdc.norm_level as i32, + norm_level: cdc.norm_level, } ), }) diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 4fc8212a73621..8df0b3f1d9705 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -430,7 +430,7 @@ mod parquet { CdcOptionsProto { min_chunk_size: cdc.min_chunk_size as u64, max_chunk_size: cdc.max_chunk_size as u64, - norm_level: cdc.norm_level as i32, + norm_level: cdc.norm_level, } }), }), @@ -535,9 +535,12 @@ mod parquet { use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| { let defaults = CdcOptions::default(); CdcOptions { + // proto3 uses 0 as the wire default for uint64; a zero chunk size is + // invalid, so treat it as "field not set" and fall back to the default. min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, - norm_level: cdc.norm_level as i64, + // norm_level = 0 is a valid value (and the default), so pass it through directly. + norm_level: cdc.norm_level, } }), } From 6b7e9bb02eda789cb1d2eae62d8fca91d9b6975c Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Tue, 31 Mar 2026 11:39:18 +0200 Subject: [PATCH 7/9] feat: add CDC end-to-end slt tests and true/false config option - Add parquet_cdc.slt with 6 end-to-end tests: write parquet files with CDC enabled/disabled, read back and verify correctness across various data types, sizes, and CDC configurations. - Allow setting use_content_defined_chunking to 'true'/'false' to enable with defaults or disable, via a specific ConfigField impl for Option. - CdcOptions uses an inherent default() method instead of the Default trait to avoid the blanket Option ConfigField impl conflict. --- datafusion/common/src/config.rs | 215 +++++++++++++++- .../sqllogictest/test_files/parquet_cdc.slt | 231 ++++++++++++++++++ 2 files changed, 433 insertions(+), 13 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/parquet_cdc.slt diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d80c4efc1225e..1562768a85242 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -687,21 +687,141 @@ config_namespace! { } } -config_namespace! { - /// Options for content-defined chunking (CDC) when writing parquet files. - /// See [`ParquetOptions::use_content_defined_chunking`]. - pub struct CdcOptions { - /// Minimum chunk size in bytes. The rolling hash will not trigger a split - /// until this many bytes have been accumulated. Default is 256 KiB. - pub min_chunk_size: usize, default = 256 * 1024 +/// Options for content-defined chunking (CDC) when writing parquet files. +/// See [`ParquetOptions::use_content_defined_chunking`]. +/// +/// Can be enabled with default options by setting +/// `use_content_defined_chunking` to `true`, or configured with sub-fields +/// like `use_content_defined_chunking.min_chunk_size`. +#[derive(Debug, Clone, PartialEq)] +pub struct CdcOptions { + /// Minimum chunk size in bytes. The rolling hash will not trigger a split + /// until this many bytes have been accumulated. Default is 256 KiB. + pub min_chunk_size: usize, + + /// Maximum chunk size in bytes. A split is forced when the accumulated + /// size exceeds this value. Default is 1 MiB. + pub max_chunk_size: usize, + + /// Normalization level. Increasing this improves deduplication ratio + /// but increases fragmentation. Recommended range is [-3, 3], default is 0. + pub norm_level: i32, +} + +// Note: `CdcOptions` intentionally does NOT implement `Default` so that the +// blanket `impl ConfigField for Option` does not +// apply. This allows the specific `impl ConfigField for Option` +// below to handle "true"/"false" for enabling/disabling CDC. +// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`. +impl CdcOptions { + /// Returns a new `CdcOptions` with default values. + pub fn default() -> Self { + Self { + min_chunk_size: 256 * 1024, + max_chunk_size: 1024 * 1024, + norm_level: 0, + } + } +} - /// Maximum chunk size in bytes. A split is forced when the accumulated - /// size exceeds this value. Default is 1 MiB. - pub max_chunk_size: usize, default = 1024 * 1024 +impl ConfigField for CdcOptions { + fn set(&mut self, key: &str, value: &str) -> Result<()> { + let (key, rem) = key.split_once('.').unwrap_or((key, "")); + match key { + "min_chunk_size" => self.min_chunk_size.set(rem, value), + "max_chunk_size" => self.max_chunk_size.set(rem, value), + "norm_level" => self.norm_level.set(rem, value), + _ => _config_err!( + "Config value \"{}\" not found on CdcOptions", + key + ), + } + } - /// Normalization level. Increasing this improves deduplication ratio - /// but increases fragmentation. Recommended range is [-3, 3], default is 0. - pub norm_level: i32, default = 0 + fn visit(&self, v: &mut V, key_prefix: &str, _description: &'static str) { + let key = format!("{key_prefix}.min_chunk_size"); + self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB."); + let key = format!("{key_prefix}.max_chunk_size"); + self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB."); + let key = format!("{key_prefix}.norm_level"); + self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0."); + } + + fn reset(&mut self, key: &str) -> Result<()> { + let (key, rem) = key.split_once('.').unwrap_or((key, "")); + match key { + "min_chunk_size" => { + if rem.is_empty() { + self.min_chunk_size = CdcOptions::default().min_chunk_size; + Ok(()) + } else { + self.min_chunk_size.reset(rem) + } + } + "max_chunk_size" => { + if rem.is_empty() { + self.max_chunk_size = CdcOptions::default().max_chunk_size; + Ok(()) + } else { + self.max_chunk_size.reset(rem) + } + } + "norm_level" => { + if rem.is_empty() { + self.norm_level = CdcOptions::default().norm_level; + Ok(()) + } else { + self.norm_level.reset(rem) + } + } + _ => _config_err!( + "Config value \"{}\" not found on CdcOptions", + key + ), + } + } +} + +/// `ConfigField` for `Option` — allows setting the option to +/// `"true"` (enable with defaults) or `"false"` (disable), in addition to +/// setting individual sub-fields like `min_chunk_size`. +impl ConfigField for Option { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + match self { + Some(s) => s.visit(v, key, description), + None => v.none(key, description), + } + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + if key.is_empty() { + match value.to_ascii_lowercase().as_str() { + "true" => { + *self = Some(CdcOptions::default()); + Ok(()) + } + "false" => { + *self = None; + Ok(()) + } + _ => _config_err!( + "Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'" + ), + } + } else { + self.get_or_insert_with(CdcOptions::default) + .set(key, value) + } + } + + fn reset(&mut self, key: &str) -> Result<()> { + if key.is_empty() { + *self = None; + Ok(()) + } else { + self.get_or_insert_with(CdcOptions::default) + .reset(key) + } } } @@ -3604,4 +3724,73 @@ mod tests { "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0" ); } + + #[cfg(feature = "parquet")] + #[test] + fn set_cdc_option_with_boolean_true() { + use crate::config::ConfigOptions; + + let mut config = ConfigOptions::default(); + assert!(config + .execution + .parquet + .use_content_defined_chunking + .is_none()); + + // Setting to "true" should enable CDC with default options + config + .set( + "datafusion.execution.parquet.use_content_defined_chunking", + "true", + ) + .unwrap(); + let cdc = config + .execution + .parquet + .use_content_defined_chunking + .as_ref() + .expect("CDC should be enabled"); + assert_eq!(cdc.min_chunk_size, 256 * 1024); + assert_eq!(cdc.max_chunk_size, 1024 * 1024); + assert_eq!(cdc.norm_level, 0); + + // Setting to "false" should disable CDC + config + .set( + "datafusion.execution.parquet.use_content_defined_chunking", + "false", + ) + .unwrap(); + assert!(config + .execution + .parquet + .use_content_defined_chunking + .is_none()); + } + + #[cfg(feature = "parquet")] + #[test] + fn set_cdc_option_with_subfields() { + use crate::config::ConfigOptions; + + let mut config = ConfigOptions::default(); + + // Setting sub-fields should also enable CDC + config + .set( + "datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size", + "1024", + ) + .unwrap(); + let cdc = config + .execution + .parquet + .use_content_defined_chunking + .as_ref() + .expect("CDC should be enabled"); + assert_eq!(cdc.min_chunk_size, 1024); + // Other fields should be defaults + assert_eq!(cdc.max_chunk_size, 1024 * 1024); + assert_eq!(cdc.norm_level, 0); + } } diff --git a/datafusion/sqllogictest/test_files/parquet_cdc.slt b/datafusion/sqllogictest/test_files/parquet_cdc.slt new file mode 100644 index 0000000000000..f87f05af74a0c --- /dev/null +++ b/datafusion/sqllogictest/test_files/parquet_cdc.slt @@ -0,0 +1,231 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test parquet content-defined chunking (CDC) end-to-end: +# write parquet files with CDC enabled, then read them back and verify correctness. + +# Create source data +statement ok +CREATE TABLE cdc_source AS VALUES + (1, 'alice', 100.50), + (2, 'bob', 200.75), + (3, 'charlie', 300.25), + (4, 'diana', 400.00), + (5, 'eve', 500.99) + +# +# Test 1: Enable CDC with 'true' (uses default options) +# + +query I +COPY cdc_source TO 'test_files/scratch/parquet_cdc/enabled_true/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) +---- +5 + +statement ok +CREATE EXTERNAL TABLE cdc_enabled_true_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/enabled_true/' + +query ITR rowsort +SELECT * FROM cdc_enabled_true_read +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# Verify filtering works on CDC-written files +query ITR +SELECT * FROM cdc_enabled_true_read WHERE column1 > 3 ORDER BY column1 +---- +4 diana 400 +5 eve 500.99 + +# Verify aggregation works on CDC-written files +query R +SELECT SUM(column3) FROM cdc_enabled_true_read +---- +1502.49 + +# +# Test 2: Disable CDC with 'false' (same as default behavior) +# + +query I +COPY cdc_source TO 'test_files/scratch/parquet_cdc/disabled_false/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'false' +) +---- +5 + +statement ok +CREATE EXTERNAL TABLE cdc_disabled_false_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/disabled_false/' + +query ITR rowsort +SELECT * FROM cdc_disabled_false_read +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# +# Test 3: Enable CDC with custom sub-field options +# + +query I +COPY cdc_source TO 'test_files/scratch/parquet_cdc/custom_chunks/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking.min_chunk_size' '1024', + 'format.use_content_defined_chunking.max_chunk_size' '4096', + 'format.use_content_defined_chunking.norm_level' '1' +) +---- +5 + +statement ok +CREATE EXTERNAL TABLE cdc_custom_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/custom_chunks/' + +query ITR rowsort +SELECT * FROM cdc_custom_read +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# +# Test 4: Write via external table with CDC enabled +# + +statement ok +CREATE EXTERNAL TABLE cdc_external_write ( + id INT, + name VARCHAR, + value DOUBLE +) +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/external_table/' OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) + +query I +INSERT INTO cdc_external_write SELECT * FROM cdc_source +---- +5 + +query ITR rowsort +SELECT * FROM cdc_external_write +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# +# Test 5: Write larger dataset to exercise CDC chunking logic +# + +statement ok +CREATE TABLE cdc_large_source AS + SELECT + value as id, + CONCAT('name_', CAST(value AS VARCHAR)) as name, + CAST(value AS DOUBLE) * 1.5 as amount, + CASE WHEN value % 2 = 0 THEN true ELSE false END as flag + FROM generate_series(1, 1000) t + +query I +COPY cdc_large_source TO 'test_files/scratch/parquet_cdc/large/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) +---- +1000 + +statement ok +CREATE EXTERNAL TABLE cdc_large_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/large/' + +query I +SELECT COUNT(*) FROM cdc_large_read +---- +1000 + +query IR +SELECT MIN(id), MIN(amount) FROM cdc_large_read +---- +1 1.5 + +query IR +SELECT MAX(id), MAX(amount) FROM cdc_large_read +---- +1000 1500 + +query I +SELECT COUNT(*) FROM cdc_large_read WHERE flag = true +---- +500 + +# +# Test 6: CDC with different data types including NULLs +# + +statement ok +CREATE TABLE cdc_types_source AS VALUES + (1::INT, 'text'::VARCHAR, 3.14::DOUBLE, true::BOOLEAN, DATE '2024-01-15', TIMESTAMP '2024-01-15 10:30:00'), + (2::INT, 'more'::VARCHAR, 2.72::DOUBLE, false::BOOLEAN, DATE '2024-06-20', TIMESTAMP '2024-06-20 14:45:00'), + (3::INT, NULL::VARCHAR, NULL::DOUBLE, NULL::BOOLEAN, NULL::DATE, NULL::TIMESTAMP) + +query I +COPY cdc_types_source TO 'test_files/scratch/parquet_cdc/types/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) +---- +3 + +statement ok +CREATE EXTERNAL TABLE cdc_types_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/types/' + +query ITRBDP rowsort +SELECT * FROM cdc_types_read +---- +1 text 3.14 true 2024-01-15 2024-01-15T10:30:00 +2 more 2.72 false 2024-06-20 2024-06-20T14:45:00 +3 NULL NULL NULL NULL NULL From 9e3a683d5050802cfc01b7af1201e53fca1d6476 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Tue, 31 Mar 2026 12:09:15 +0200 Subject: [PATCH 8/9] test: add Rust integration tests for CDC parquet metadata verification Add content_defined_chunking.rs with 2 tests that write parquet files using ArrowWriter with CDC-enabled WriterProperties, then inspect file metadata to verify CDC is wired through correctly: - cdc_data_round_trip: write/read 5000 rows, verify count and range - cdc_affects_page_boundaries: compare column uncompressed sizes between CDC and non-CDC writes to verify CDC changes the page layout Also fix clippy warning on CdcOptions::default() inherent method. --- datafusion/common/src/config.rs | 41 ++-- .../tests/parquet/content_defined_chunking.rs | 197 ++++++++++++++++++ datafusion/core/tests/parquet/mod.rs | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 - 4 files changed, 217 insertions(+), 23 deletions(-) create mode 100644 datafusion/core/tests/parquet/content_defined_chunking.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1562768a85242..38b18c06fe930 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -715,6 +715,7 @@ pub struct CdcOptions { // Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`. impl CdcOptions { /// Returns a new `CdcOptions` with default values. + #[expect(clippy::should_implement_trait)] pub fn default() -> Self { Self { min_chunk_size: 256 * 1024, @@ -731,10 +732,7 @@ impl ConfigField for CdcOptions { "min_chunk_size" => self.min_chunk_size.set(rem, value), "max_chunk_size" => self.max_chunk_size.set(rem, value), "norm_level" => self.norm_level.set(rem, value), - _ => _config_err!( - "Config value \"{}\" not found on CdcOptions", - key - ), + _ => _config_err!("Config value \"{}\" not found on CdcOptions", key), } } @@ -774,10 +772,7 @@ impl ConfigField for CdcOptions { self.norm_level.reset(rem) } } - _ => _config_err!( - "Config value \"{}\" not found on CdcOptions", - key - ), + _ => _config_err!("Config value \"{}\" not found on CdcOptions", key), } } } @@ -809,8 +804,7 @@ impl ConfigField for Option { ), } } else { - self.get_or_insert_with(CdcOptions::default) - .set(key, value) + self.get_or_insert_with(CdcOptions::default).set(key, value) } } @@ -819,8 +813,7 @@ impl ConfigField for Option { *self = None; Ok(()) } else { - self.get_or_insert_with(CdcOptions::default) - .reset(key) + self.get_or_insert_with(CdcOptions::default).reset(key) } } } @@ -3731,11 +3724,13 @@ mod tests { use crate::config::ConfigOptions; let mut config = ConfigOptions::default(); - assert!(config - .execution - .parquet - .use_content_defined_chunking - .is_none()); + assert!( + config + .execution + .parquet + .use_content_defined_chunking + .is_none() + ); // Setting to "true" should enable CDC with default options config @@ -3761,11 +3756,13 @@ mod tests { "false", ) .unwrap(); - assert!(config - .execution - .parquet - .use_content_defined_chunking - .is_none()); + assert!( + config + .execution + .parquet + .use_content_defined_chunking + .is_none() + ); } #[cfg(feature = "parquet")] diff --git a/datafusion/core/tests/parquet/content_defined_chunking.rs b/datafusion/core/tests/parquet/content_defined_chunking.rs new file mode 100644 index 0000000000000..2c6e99a3bc639 --- /dev/null +++ b/datafusion/core/tests/parquet/content_defined_chunking.rs @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for parquet content-defined chunking (CDC). +//! +//! These tests verify that CDC options are correctly wired through to the +//! parquet writer by inspecting file metadata (compressed sizes, page +//! boundaries) on the written files. + +use arrow::array::{Int32Array, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_common::config::{CdcOptions, TableParquetOptions}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; +use parquet::file::properties::WriterProperties; +use std::fs::File; +use std::sync::Arc; +use tempfile::NamedTempFile; + +/// Create a RecordBatch with enough data to exercise CDC chunking. +fn make_test_batch(num_rows: usize) -> RecordBatch { + let ids: Vec = (0..num_rows as i32).collect(); + // ~100 bytes per row to generate enough data for CDC page splits + let payloads: Vec = (0..num_rows) + .map(|i| format!("row-{i:06}-payload-{}", "x".repeat(80))) + .collect(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("payload", DataType::Utf8, false), + ])); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(StringArray::from(payloads)), + ], + ) + .unwrap() +} + +/// Build WriterProperties from TableParquetOptions, exercising the same +/// code path that DataFusion's parquet sink uses. +fn writer_props( + opts: &mut TableParquetOptions, + schema: &Arc, +) -> WriterProperties { + opts.arrow_schema(schema); + parquet::file::properties::WriterPropertiesBuilder::try_from( + opts as &TableParquetOptions, + ) + .unwrap() + .build() +} + +/// Write a batch to a temp parquet file and return the file handle. +fn write_parquet_file(batch: &RecordBatch, props: WriterProperties) -> NamedTempFile { + let tmp = tempfile::Builder::new() + .suffix(".parquet") + .tempfile() + .unwrap(); + let mut writer = + ArrowWriter::try_new(tmp.reopen().unwrap(), batch.schema(), Some(props)).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + tmp +} + +/// Read parquet metadata from a file. +fn read_metadata(file: &NamedTempFile) -> parquet::file::metadata::ParquetMetaData { + let f = File::open(file.path()).unwrap(); + let reader_meta = ArrowReaderMetadata::load(&f, Default::default()).unwrap(); + reader_meta.metadata().as_ref().clone() +} + +/// Write parquet with CDC enabled, read it back via DataFusion, and verify +/// the data round-trips correctly. +#[tokio::test] +async fn cdc_data_round_trip() { + let batch = make_test_batch(5000); + + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions::default()); + let props = writer_props(&mut opts, &batch.schema()); + + let tmp = write_parquet_file(&batch, props); + + // Read back via DataFusion and verify row count + let ctx = SessionContext::new(); + ctx.register_parquet( + "data", + tmp.path().to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + let result = ctx + .sql("SELECT COUNT(*), MIN(id), MAX(id) FROM data") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let row = &result[0]; + let count = row + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + let min_id = row + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + let max_id = row + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + + assert_eq!(count, 5000); + assert_eq!(min_id, 0); + assert_eq!(max_id, 4999); +} + +/// Verify that CDC options are reflected in the parquet file metadata. +/// With small chunk sizes, CDC should produce different page boundaries +/// compared to default (no CDC) writing. +#[tokio::test] +async fn cdc_affects_page_boundaries() { + let batch = make_test_batch(5000); + + // Write WITHOUT CDC + let mut no_cdc_opts = TableParquetOptions::default(); + let no_cdc_file = + write_parquet_file(&batch, writer_props(&mut no_cdc_opts, &batch.schema())); + let no_cdc_meta = read_metadata(&no_cdc_file); + + // Write WITH CDC using small chunk sizes to maximize effect + let mut cdc_opts = TableParquetOptions::default(); + cdc_opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 512, + max_chunk_size: 2048, + norm_level: 0, + }); + let cdc_file = + write_parquet_file(&batch, writer_props(&mut cdc_opts, &batch.schema())); + let cdc_meta = read_metadata(&cdc_file); + + // Both files should have the same number of rows + assert_eq!( + no_cdc_meta.file_metadata().num_rows(), + cdc_meta.file_metadata().num_rows(), + ); + + // Compare the uncompressed sizes of columns across all row groups. + // CDC with small chunk sizes should produce different page boundaries. + let no_cdc_sizes: Vec = no_cdc_meta + .row_groups() + .iter() + .flat_map(|rg| rg.columns().iter().map(|c| c.uncompressed_size())) + .collect(); + + let cdc_sizes: Vec = cdc_meta + .row_groups() + .iter() + .flat_map(|rg| rg.columns().iter().map(|c| c.uncompressed_size())) + .collect(); + + assert_ne!( + no_cdc_sizes, cdc_sizes, + "CDC with small chunk sizes should produce different page layouts \ + than default writing. no_cdc={no_cdc_sizes:?}, cdc={cdc_sizes:?}" + ); +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 0535ddd9247d4..e96bd49b9ace9 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -44,6 +44,7 @@ use parquet::file::properties::{EnabledStatistics, WriterProperties}; use std::sync::Arc; use tempfile::NamedTempFile; +mod content_defined_chunking; mod custom_reader; #[cfg(feature = "parquet_encryption")] mod encryption; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index e428475df8ad3..4b7a91f38c201 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1275,7 +1275,6 @@ pub(crate) fn csv_writer_options_from_proto( #[cfg(test)] mod tests { - use super::*; use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions}; fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions { From 37bdbef947561e5fb657d05455e6957fa0cad677 Mon Sep 17 00:00:00 2001 From: Krisztian Szucs Date: Mon, 6 Apr 2026 16:25:55 +0200 Subject: [PATCH 9/9] chore: address review comment --- .../tests/parquet/content_defined_chunking.rs | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/parquet/content_defined_chunking.rs b/datafusion/core/tests/parquet/content_defined_chunking.rs index 2c6e99a3bc639..6a98ded1bd4cf 100644 --- a/datafusion/core/tests/parquet/content_defined_chunking.rs +++ b/datafusion/core/tests/parquet/content_defined_chunking.rs @@ -21,8 +21,8 @@ //! parquet writer by inspecting file metadata (compressed sizes, page //! boundaries) on the written files. -use arrow::array::{Int32Array, StringArray}; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::array::{AsArray, Int32Array, StringArray}; +use arrow::datatypes::{DataType, Field, Int32Type, Int64Type, Schema}; use arrow::record_batch::RecordBatch; use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_common::config::{CdcOptions, TableParquetOptions}; @@ -121,24 +121,9 @@ async fn cdc_data_round_trip() { .unwrap(); let row = &result[0]; - let count = row - .column(0) - .as_any() - .downcast_ref::() - .unwrap() - .value(0); - let min_id = row - .column(1) - .as_any() - .downcast_ref::() - .unwrap() - .value(0); - let max_id = row - .column(2) - .as_any() - .downcast_ref::() - .unwrap() - .value(0); + let count = row.column(0).as_primitive::().value(0); + let min_id = row.column(1).as_primitive::().value(0); + let max_id = row.column(2).as_primitive::().value(0); assert_eq!(count, 5000); assert_eq!(min_id, 0);