Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,137 @@ config_namespace! {
}
}

/// Options for content-defined chunking (CDC) when writing parquet files.
/// See [`ParquetOptions::use_content_defined_chunking`].
///
/// Can be enabled with default options by setting
/// `use_content_defined_chunking` to `true`, or configured with sub-fields
/// like `use_content_defined_chunking.min_chunk_size`.
#[derive(Debug, Clone, PartialEq)]
pub struct CdcOptions {
/// Minimum chunk size in bytes. The rolling hash will not trigger a split
/// until this many bytes have been accumulated. Default is 256 KiB.
pub min_chunk_size: usize,

/// Maximum chunk size in bytes. A split is forced when the accumulated
/// size exceeds this value. Default is 1 MiB.
pub max_chunk_size: usize,

/// Normalization level. Increasing this improves deduplication ratio
/// but increases fragmentation. Recommended range is [-3, 3], default is 0.
pub norm_level: i32,
}

// Note: `CdcOptions` intentionally does NOT implement `Default` so that the
// blanket `impl<F: ConfigField + Default> ConfigField for Option<F>` does not
// apply. This allows the specific `impl ConfigField for Option<CdcOptions>`
// below to handle "true"/"false" for enabling/disabling CDC.
// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`.
impl CdcOptions {
/// Returns a new `CdcOptions` with default values.
#[expect(clippy::should_implement_trait)]
pub fn default() -> Self {
Self {
min_chunk_size: 256 * 1024,
max_chunk_size: 1024 * 1024,
norm_level: 0,
}
}
}

impl ConfigField for CdcOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"min_chunk_size" => self.min_chunk_size.set(rem, value),
"max_chunk_size" => self.max_chunk_size.set(rem, value),
"norm_level" => self.norm_level.set(rem, value),
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
}
}

fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
let key = format!("{key_prefix}.min_chunk_size");
self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB.");
let key = format!("{key_prefix}.max_chunk_size");
self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB.");
let key = format!("{key_prefix}.norm_level");
self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0.");
}

fn reset(&mut self, key: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"min_chunk_size" => {
if rem.is_empty() {
self.min_chunk_size = CdcOptions::default().min_chunk_size;
Ok(())
} else {
self.min_chunk_size.reset(rem)
}
}
"max_chunk_size" => {
if rem.is_empty() {
self.max_chunk_size = CdcOptions::default().max_chunk_size;
Ok(())
} else {
self.max_chunk_size.reset(rem)
}
}
"norm_level" => {
if rem.is_empty() {
self.norm_level = CdcOptions::default().norm_level;
Ok(())
} else {
self.norm_level.reset(rem)
}
}
_ => _config_err!("Config value \"{}\" not found on CdcOptions", key),
}
}
}

/// `ConfigField` for `Option<CdcOptions>` — allows setting the option to
/// `"true"` (enable with defaults) or `"false"` (disable), in addition to
/// setting individual sub-fields like `min_chunk_size`.
impl ConfigField for Option<CdcOptions> {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
match self {
Some(s) => s.visit(v, key, description),
None => v.none(key, description),
}
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
if key.is_empty() {
match value.to_ascii_lowercase().as_str() {
"true" => {
*self = Some(CdcOptions::default());
Ok(())
}
"false" => {
*self = None;
Ok(())
}
_ => _config_err!(
"Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'"
),
}
} else {
self.get_or_insert_with(CdcOptions::default).set(key, value)
}
}

fn reset(&mut self, key: &str) -> Result<()> {
if key.is_empty() {
*self = None;
Ok(())
} else {
self.get_or_insert_with(CdcOptions::default).reset(key)
}
}
}

config_namespace! {
/// Options for reading and writing parquet files
///
Expand Down Expand Up @@ -872,6 +1003,12 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

/// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing
/// parquet files. When `Some`, CDC is enabled with the given options; when `None`
/// (the default), CDC is disabled. When CDC is enabled, parallel writing is
/// automatically disabled since the chunker state must persist across row groups.
pub use_content_defined_chunking: Option<CdcOptions>, default = None
}
}

Expand Down Expand Up @@ -1826,6 +1963,7 @@ config_field!(usize);
config_field!(f64);
config_field!(u64);
config_field!(u32);
config_field!(i32);

impl ConfigField for u8 {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
Expand Down Expand Up @@ -3579,4 +3717,77 @@ mod tests {
"Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0"
);
}

#[cfg(feature = "parquet")]
#[test]
fn set_cdc_option_with_boolean_true() {
use crate::config::ConfigOptions;

let mut config = ConfigOptions::default();
assert!(
config
.execution
.parquet
.use_content_defined_chunking
.is_none()
);

// Setting to "true" should enable CDC with default options
config
.set(
"datafusion.execution.parquet.use_content_defined_chunking",
"true",
)
.unwrap();
let cdc = config
.execution
.parquet
.use_content_defined_chunking
.as_ref()
.expect("CDC should be enabled");
assert_eq!(cdc.min_chunk_size, 256 * 1024);
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
assert_eq!(cdc.norm_level, 0);

// Setting to "false" should disable CDC
config
.set(
"datafusion.execution.parquet.use_content_defined_chunking",
"false",
)
.unwrap();
assert!(
config
.execution
.parquet
.use_content_defined_chunking
.is_none()
);
}

#[cfg(feature = "parquet")]
#[test]
fn set_cdc_option_with_subfields() {
use crate::config::ConfigOptions;

let mut config = ConfigOptions::default();

// Setting sub-fields should also enable CDC
config
.set(
"datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size",
"1024",
)
.unwrap();
let cdc = config
.execution
.parquet
.use_content_defined_chunking
.as_ref()
.expect("CDC should be enabled");
assert_eq!(cdc.min_chunk_size, 1024);
// Other fields should be defaults
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
assert_eq!(cdc.norm_level, 0);
}
}
103 changes: 101 additions & 2 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {
global,
column_specific_options,
key_value_metadata,
crypto: _,
..
} = table_parquet_options;

let mut builder = global.into_writer_properties_builder()?;
Expand Down Expand Up @@ -191,6 +191,7 @@ impl ParquetOptions {
bloom_filter_on_write,
bloom_filter_fpp,
bloom_filter_ndv,
use_content_defined_chunking,

// not in WriterProperties
enable_page_index: _,
Expand Down Expand Up @@ -247,6 +248,26 @@ impl ParquetOptions {
if let Some(encoding) = encoding {
builder = builder.set_encoding(parse_encoding_string(encoding)?);
}
if let Some(cdc) = use_content_defined_chunking {
if cdc.min_chunk_size == 0 {
return Err(DataFusionError::Configuration(
"CDC min_chunk_size must be greater than 0".to_string(),
));
}
if cdc.max_chunk_size <= cdc.min_chunk_size {
return Err(DataFusionError::Configuration(format!(
"CDC max_chunk_size ({}) must be greater than min_chunk_size ({})",
cdc.max_chunk_size, cdc.min_chunk_size
)));
}
builder = builder.set_content_defined_chunking(Some(
parquet::file::properties::CdcOptions {
min_chunk_size: cdc.min_chunk_size,
max_chunk_size: cdc.max_chunk_size,
norm_level: cdc.norm_level,
},
));
}

Ok(builder)
}
Expand Down Expand Up @@ -388,7 +409,9 @@ mod tests {
use super::*;
#[cfg(feature = "parquet_encryption")]
use crate::config::ConfigFileEncryptionProperties;
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions};
use crate::config::{
CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
};
use crate::parquet_config::DFParquetWriterVersion;
use parquet::basic::Compression;
use parquet::file::properties::{
Expand Down Expand Up @@ -460,6 +483,7 @@ mod tests {
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
max_predicate_cache_size: defaults.max_predicate_cache_size,
use_content_defined_chunking: defaults.use_content_defined_chunking.clone(),
}
}

Expand Down Expand Up @@ -576,6 +600,13 @@ mod tests {
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
use_content_defined_chunking: props.content_defined_chunking().map(|c| {
CdcOptions {
min_chunk_size: c.min_chunk_size,
max_chunk_size: c.max_chunk_size,
norm_level: c.norm_level,
}
}),
},
column_specific_options,
key_value_metadata,
Expand Down Expand Up @@ -786,6 +817,74 @@ mod tests {
);
}

#[test]
fn test_cdc_enabled_with_custom_options() {
let mut opts = TableParquetOptions::default();
opts.global.use_content_defined_chunking = Some(CdcOptions {
min_chunk_size: 128 * 1024,
max_chunk_size: 512 * 1024,
norm_level: 2,
});
opts.arrow_schema(&Arc::new(Schema::empty()));

let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
let cdc = props.content_defined_chunking().expect("CDC should be set");
assert_eq!(cdc.min_chunk_size, 128 * 1024);
assert_eq!(cdc.max_chunk_size, 512 * 1024);
assert_eq!(cdc.norm_level, 2);
}

#[test]
fn test_cdc_disabled_by_default() {
let mut opts = TableParquetOptions::default();
opts.arrow_schema(&Arc::new(Schema::empty()));

let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
assert!(props.content_defined_chunking().is_none());
}

#[test]
fn test_cdc_round_trip_through_writer_props() {
let mut opts = TableParquetOptions::default();
opts.global.use_content_defined_chunking = Some(CdcOptions {
min_chunk_size: 64 * 1024,
max_chunk_size: 2 * 1024 * 1024,
norm_level: -1,
});
opts.arrow_schema(&Arc::new(Schema::empty()));

let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
let recovered = session_config_from_writer_props(&props);

let cdc = recovered.global.use_content_defined_chunking.unwrap();
assert_eq!(cdc.min_chunk_size, 64 * 1024);
assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024);
assert_eq!(cdc.norm_level, -1);
}

#[test]
fn test_cdc_validation_zero_min_chunk_size() {
let mut opts = TableParquetOptions::default();
opts.global.use_content_defined_chunking = Some(CdcOptions {
min_chunk_size: 0,
..CdcOptions::default()
});
opts.arrow_schema(&Arc::new(Schema::empty()));
assert!(WriterPropertiesBuilder::try_from(&opts).is_err());
}

#[test]
fn test_cdc_validation_max_not_greater_than_min() {
let mut opts = TableParquetOptions::default();
opts.global.use_content_defined_chunking = Some(CdcOptions {
min_chunk_size: 512 * 1024,
max_chunk_size: 256 * 1024,
..CdcOptions::default()
});
opts.arrow_schema(&Arc::new(Schema::empty()));
assert!(WriterPropertiesBuilder::try_from(&opts).is_err());
}

#[test]
fn test_bloom_filter_set_ndv_only() {
// the TableParquetOptions::default, with only ndv set
Expand Down
Loading
Loading