diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0551cbbb15ae1..38b18c06fe930 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -687,6 +687,137 @@ config_namespace! { } } +/// Options for content-defined chunking (CDC) when writing parquet files. +/// See [`ParquetOptions::use_content_defined_chunking`]. +/// +/// Can be enabled with default options by setting +/// `use_content_defined_chunking` to `true`, or configured with sub-fields +/// like `use_content_defined_chunking.min_chunk_size`. +#[derive(Debug, Clone, PartialEq)] +pub struct CdcOptions { + /// Minimum chunk size in bytes. The rolling hash will not trigger a split + /// until this many bytes have been accumulated. Default is 256 KiB. + pub min_chunk_size: usize, + + /// Maximum chunk size in bytes. A split is forced when the accumulated + /// size exceeds this value. Default is 1 MiB. + pub max_chunk_size: usize, + + /// Normalization level. Increasing this improves deduplication ratio + /// but increases fragmentation. Recommended range is [-3, 3], default is 0. + pub norm_level: i32, +} + +// Note: `CdcOptions` intentionally does NOT implement `Default` so that the +// blanket `impl ConfigField for Option` does not +// apply. This allows the specific `impl ConfigField for Option` +// below to handle "true"/"false" for enabling/disabling CDC. +// Use `CdcOptions::default()` (the inherent method) instead of `Default::default()`. +impl CdcOptions { + /// Returns a new `CdcOptions` with default values. + #[expect(clippy::should_implement_trait)] + pub fn default() -> Self { + Self { + min_chunk_size: 256 * 1024, + max_chunk_size: 1024 * 1024, + norm_level: 0, + } + } +} + +impl ConfigField for CdcOptions { + fn set(&mut self, key: &str, value: &str) -> Result<()> { + let (key, rem) = key.split_once('.').unwrap_or((key, "")); + match key { + "min_chunk_size" => self.min_chunk_size.set(rem, value), + "max_chunk_size" => self.max_chunk_size.set(rem, value), + "norm_level" => self.norm_level.set(rem, value), + _ => _config_err!("Config value \"{}\" not found on CdcOptions", key), + } + } + + fn visit(&self, v: &mut V, key_prefix: &str, _description: &'static str) { + let key = format!("{key_prefix}.min_chunk_size"); + self.min_chunk_size.visit(v, &key, "Minimum chunk size in bytes. The rolling hash will not trigger a split until this many bytes have been accumulated. Default is 256 KiB."); + let key = format!("{key_prefix}.max_chunk_size"); + self.max_chunk_size.visit(v, &key, "Maximum chunk size in bytes. A split is forced when the accumulated size exceeds this value. Default is 1 MiB."); + let key = format!("{key_prefix}.norm_level"); + self.norm_level.visit(v, &key, "Normalization level. Increasing this improves deduplication ratio but increases fragmentation. Recommended range is [-3, 3], default is 0."); + } + + fn reset(&mut self, key: &str) -> Result<()> { + let (key, rem) = key.split_once('.').unwrap_or((key, "")); + match key { + "min_chunk_size" => { + if rem.is_empty() { + self.min_chunk_size = CdcOptions::default().min_chunk_size; + Ok(()) + } else { + self.min_chunk_size.reset(rem) + } + } + "max_chunk_size" => { + if rem.is_empty() { + self.max_chunk_size = CdcOptions::default().max_chunk_size; + Ok(()) + } else { + self.max_chunk_size.reset(rem) + } + } + "norm_level" => { + if rem.is_empty() { + self.norm_level = CdcOptions::default().norm_level; + Ok(()) + } else { + self.norm_level.reset(rem) + } + } + _ => _config_err!("Config value \"{}\" not found on CdcOptions", key), + } + } +} + +/// `ConfigField` for `Option` — allows setting the option to +/// `"true"` (enable with defaults) or `"false"` (disable), in addition to +/// setting individual sub-fields like `min_chunk_size`. +impl ConfigField for Option { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + match self { + Some(s) => s.visit(v, key, description), + None => v.none(key, description), + } + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + if key.is_empty() { + match value.to_ascii_lowercase().as_str() { + "true" => { + *self = Some(CdcOptions::default()); + Ok(()) + } + "false" => { + *self = None; + Ok(()) + } + _ => _config_err!( + "Expected 'true' or 'false' for use_content_defined_chunking, got '{value}'" + ), + } + } else { + self.get_or_insert_with(CdcOptions::default).set(key, value) + } + } + + fn reset(&mut self, key: &str) -> Result<()> { + if key.is_empty() { + *self = None; + Ok(()) + } else { + self.get_or_insert_with(CdcOptions::default).reset(key) + } + } +} + config_namespace! { /// Options for reading and writing parquet files /// @@ -872,6 +1003,12 @@ config_namespace! { /// writing out already in-memory data, such as from a cached /// data frame. pub maximum_buffered_record_batches_per_stream: usize, default = 2 + + /// (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing + /// parquet files. When `Some`, CDC is enabled with the given options; when `None` + /// (the default), CDC is disabled. When CDC is enabled, parallel writing is + /// automatically disabled since the chunker state must persist across row groups. + pub use_content_defined_chunking: Option, default = None } } @@ -1826,6 +1963,7 @@ config_field!(usize); config_field!(f64); config_field!(u64); config_field!(u32); +config_field!(i32); impl ConfigField for u8 { fn visit(&self, v: &mut V, key: &str, description: &'static str) { @@ -3579,4 +3717,77 @@ mod tests { "Invalid or Unsupported Configuration: Invalid parquet writer version: 3.0. Expected one of: 1.0, 2.0" ); } + + #[cfg(feature = "parquet")] + #[test] + fn set_cdc_option_with_boolean_true() { + use crate::config::ConfigOptions; + + let mut config = ConfigOptions::default(); + assert!( + config + .execution + .parquet + .use_content_defined_chunking + .is_none() + ); + + // Setting to "true" should enable CDC with default options + config + .set( + "datafusion.execution.parquet.use_content_defined_chunking", + "true", + ) + .unwrap(); + let cdc = config + .execution + .parquet + .use_content_defined_chunking + .as_ref() + .expect("CDC should be enabled"); + assert_eq!(cdc.min_chunk_size, 256 * 1024); + assert_eq!(cdc.max_chunk_size, 1024 * 1024); + assert_eq!(cdc.norm_level, 0); + + // Setting to "false" should disable CDC + config + .set( + "datafusion.execution.parquet.use_content_defined_chunking", + "false", + ) + .unwrap(); + assert!( + config + .execution + .parquet + .use_content_defined_chunking + .is_none() + ); + } + + #[cfg(feature = "parquet")] + #[test] + fn set_cdc_option_with_subfields() { + use crate::config::ConfigOptions; + + let mut config = ConfigOptions::default(); + + // Setting sub-fields should also enable CDC + config + .set( + "datafusion.execution.parquet.use_content_defined_chunking.min_chunk_size", + "1024", + ) + .unwrap(); + let cdc = config + .execution + .parquet + .use_content_defined_chunking + .as_ref() + .expect("CDC should be enabled"); + assert_eq!(cdc.min_chunk_size, 1024); + // Other fields should be defaults + assert_eq!(cdc.max_chunk_size, 1024 * 1024); + assert_eq!(cdc.norm_level, 0); + } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a7a1fc6d0bb66..eaf5a1642e8e2 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -95,7 +95,7 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { global, column_specific_options, key_value_metadata, - crypto: _, + .. } = table_parquet_options; let mut builder = global.into_writer_properties_builder()?; @@ -191,6 +191,7 @@ impl ParquetOptions { bloom_filter_on_write, bloom_filter_fpp, bloom_filter_ndv, + use_content_defined_chunking, // not in WriterProperties enable_page_index: _, @@ -247,6 +248,26 @@ impl ParquetOptions { if let Some(encoding) = encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); } + if let Some(cdc) = use_content_defined_chunking { + if cdc.min_chunk_size == 0 { + return Err(DataFusionError::Configuration( + "CDC min_chunk_size must be greater than 0".to_string(), + )); + } + if cdc.max_chunk_size <= cdc.min_chunk_size { + return Err(DataFusionError::Configuration(format!( + "CDC max_chunk_size ({}) must be greater than min_chunk_size ({})", + cdc.max_chunk_size, cdc.min_chunk_size + ))); + } + builder = builder.set_content_defined_chunking(Some( + parquet::file::properties::CdcOptions { + min_chunk_size: cdc.min_chunk_size, + max_chunk_size: cdc.max_chunk_size, + norm_level: cdc.norm_level, + }, + )); + } Ok(builder) } @@ -388,7 +409,9 @@ mod tests { use super::*; #[cfg(feature = "parquet_encryption")] use crate::config::ConfigFileEncryptionProperties; - use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions}; + use crate::config::{ + CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions, + }; use crate::parquet_config::DFParquetWriterVersion; use parquet::basic::Compression; use parquet::file::properties::{ @@ -460,6 +483,7 @@ mod tests { skip_arrow_metadata: defaults.skip_arrow_metadata, coerce_int96: None, max_predicate_cache_size: defaults.max_predicate_cache_size, + use_content_defined_chunking: defaults.use_content_defined_chunking.clone(), } } @@ -576,6 +600,13 @@ mod tests { binary_as_string: global_options_defaults.binary_as_string, skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, coerce_int96: None, + use_content_defined_chunking: props.content_defined_chunking().map(|c| { + CdcOptions { + min_chunk_size: c.min_chunk_size, + max_chunk_size: c.max_chunk_size, + norm_level: c.norm_level, + } + }), }, column_specific_options, key_value_metadata, @@ -786,6 +817,74 @@ mod tests { ); } + #[test] + fn test_cdc_enabled_with_custom_options() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + let cdc = props.content_defined_chunking().expect("CDC should be set"); + assert_eq!(cdc.min_chunk_size, 128 * 1024); + assert_eq!(cdc.max_chunk_size, 512 * 1024); + assert_eq!(cdc.norm_level, 2); + } + + #[test] + fn test_cdc_disabled_by_default() { + let mut opts = TableParquetOptions::default(); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + assert!(props.content_defined_chunking().is_none()); + } + + #[test] + fn test_cdc_round_trip_through_writer_props() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 64 * 1024, + max_chunk_size: 2 * 1024 * 1024, + norm_level: -1, + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + let recovered = session_config_from_writer_props(&props); + + let cdc = recovered.global.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 64 * 1024); + assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); + assert_eq!(cdc.norm_level, -1); + } + + #[test] + fn test_cdc_validation_zero_min_chunk_size() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 0, + ..CdcOptions::default() + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + assert!(WriterPropertiesBuilder::try_from(&opts).is_err()); + } + + #[test] + fn test_cdc_validation_max_not_greater_than_min() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 512 * 1024, + max_chunk_size: 256 * 1024, + ..CdcOptions::default() + }); + opts.arrow_schema(&Arc::new(Schema::empty())); + assert!(WriterPropertiesBuilder::try_from(&opts).is_err()); + } + #[test] fn test_bloom_filter_set_ndv_only() { // the TableParquetOptions::default, with only ndv set diff --git a/datafusion/core/tests/parquet/content_defined_chunking.rs b/datafusion/core/tests/parquet/content_defined_chunking.rs new file mode 100644 index 0000000000000..6a98ded1bd4cf --- /dev/null +++ b/datafusion/core/tests/parquet/content_defined_chunking.rs @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for parquet content-defined chunking (CDC). +//! +//! These tests verify that CDC options are correctly wired through to the +//! parquet writer by inspecting file metadata (compressed sizes, page +//! boundaries) on the written files. + +use arrow::array::{AsArray, Int32Array, StringArray}; +use arrow::datatypes::{DataType, Field, Int32Type, Int64Type, Schema}; +use arrow::record_batch::RecordBatch; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_common::config::{CdcOptions, TableParquetOptions}; +use parquet::arrow::ArrowWriter; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; +use parquet::file::properties::WriterProperties; +use std::fs::File; +use std::sync::Arc; +use tempfile::NamedTempFile; + +/// Create a RecordBatch with enough data to exercise CDC chunking. +fn make_test_batch(num_rows: usize) -> RecordBatch { + let ids: Vec = (0..num_rows as i32).collect(); + // ~100 bytes per row to generate enough data for CDC page splits + let payloads: Vec = (0..num_rows) + .map(|i| format!("row-{i:06}-payload-{}", "x".repeat(80))) + .collect(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("payload", DataType::Utf8, false), + ])); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(ids)), + Arc::new(StringArray::from(payloads)), + ], + ) + .unwrap() +} + +/// Build WriterProperties from TableParquetOptions, exercising the same +/// code path that DataFusion's parquet sink uses. +fn writer_props( + opts: &mut TableParquetOptions, + schema: &Arc, +) -> WriterProperties { + opts.arrow_schema(schema); + parquet::file::properties::WriterPropertiesBuilder::try_from( + opts as &TableParquetOptions, + ) + .unwrap() + .build() +} + +/// Write a batch to a temp parquet file and return the file handle. +fn write_parquet_file(batch: &RecordBatch, props: WriterProperties) -> NamedTempFile { + let tmp = tempfile::Builder::new() + .suffix(".parquet") + .tempfile() + .unwrap(); + let mut writer = + ArrowWriter::try_new(tmp.reopen().unwrap(), batch.schema(), Some(props)).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + tmp +} + +/// Read parquet metadata from a file. +fn read_metadata(file: &NamedTempFile) -> parquet::file::metadata::ParquetMetaData { + let f = File::open(file.path()).unwrap(); + let reader_meta = ArrowReaderMetadata::load(&f, Default::default()).unwrap(); + reader_meta.metadata().as_ref().clone() +} + +/// Write parquet with CDC enabled, read it back via DataFusion, and verify +/// the data round-trips correctly. +#[tokio::test] +async fn cdc_data_round_trip() { + let batch = make_test_batch(5000); + + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions::default()); + let props = writer_props(&mut opts, &batch.schema()); + + let tmp = write_parquet_file(&batch, props); + + // Read back via DataFusion and verify row count + let ctx = SessionContext::new(); + ctx.register_parquet( + "data", + tmp.path().to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + let result = ctx + .sql("SELECT COUNT(*), MIN(id), MAX(id) FROM data") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let row = &result[0]; + let count = row.column(0).as_primitive::().value(0); + let min_id = row.column(1).as_primitive::().value(0); + let max_id = row.column(2).as_primitive::().value(0); + + assert_eq!(count, 5000); + assert_eq!(min_id, 0); + assert_eq!(max_id, 4999); +} + +/// Verify that CDC options are reflected in the parquet file metadata. +/// With small chunk sizes, CDC should produce different page boundaries +/// compared to default (no CDC) writing. +#[tokio::test] +async fn cdc_affects_page_boundaries() { + let batch = make_test_batch(5000); + + // Write WITHOUT CDC + let mut no_cdc_opts = TableParquetOptions::default(); + let no_cdc_file = + write_parquet_file(&batch, writer_props(&mut no_cdc_opts, &batch.schema())); + let no_cdc_meta = read_metadata(&no_cdc_file); + + // Write WITH CDC using small chunk sizes to maximize effect + let mut cdc_opts = TableParquetOptions::default(); + cdc_opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 512, + max_chunk_size: 2048, + norm_level: 0, + }); + let cdc_file = + write_parquet_file(&batch, writer_props(&mut cdc_opts, &batch.schema())); + let cdc_meta = read_metadata(&cdc_file); + + // Both files should have the same number of rows + assert_eq!( + no_cdc_meta.file_metadata().num_rows(), + cdc_meta.file_metadata().num_rows(), + ); + + // Compare the uncompressed sizes of columns across all row groups. + // CDC with small chunk sizes should produce different page boundaries. + let no_cdc_sizes: Vec = no_cdc_meta + .row_groups() + .iter() + .flat_map(|rg| rg.columns().iter().map(|c| c.uncompressed_size())) + .collect(); + + let cdc_sizes: Vec = cdc_meta + .row_groups() + .iter() + .flat_map(|rg| rg.columns().iter().map(|c| c.uncompressed_size())) + .collect(); + + assert_ne!( + no_cdc_sizes, cdc_sizes, + "CDC with small chunk sizes should produce different page layouts \ + than default writing. no_cdc={no_cdc_sizes:?}, cdc={cdc_sizes:?}" + ); +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 0535ddd9247d4..e96bd49b9ace9 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -44,6 +44,7 @@ use parquet::file::properties::{EnabledStatistics, WriterProperties}; use std::sync::Arc; use tempfile::NamedTempFile; +mod content_defined_chunking; mod custom_reader; #[cfg(feature = "parquet_encryption")] mod encryption; diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index da35a1a34d441..c4faedf571f6d 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -1370,7 +1370,11 @@ impl FileSink for ParquetSink { while let Some((path, mut rx)) = file_stream_rx.recv().await { let parquet_props = self.create_writer_props(&runtime, &path).await?; - if !parquet_opts.global.allow_single_file_parallelism { + // CDC requires the sequential writer: the chunker state lives in ArrowWriter + // and persists across row groups. The parallel path bypasses ArrowWriter entirely. + if !parquet_opts.global.allow_single_file_parallelism + || parquet_opts.global.use_content_defined_chunking.is_some() + { let mut writer = self .create_async_arrow_writer( &path, diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 62c6bbe85612a..31ece63577b4f 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -603,6 +603,14 @@ message ParquetOptions { oneof max_predicate_cache_size_opt { uint64 max_predicate_cache_size = 33; } + + CdcOptions content_defined_chunking = 35; +} + +message CdcOptions { + uint64 min_chunk_size = 1; + uint64 max_chunk_size = 2; + int32 norm_level = 3; } enum JoinSide { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index c616eadf295f2..4b7a91f38c201 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -39,7 +39,7 @@ use datafusion_common::{ DataFusionError, JoinSide, ScalarValue, Statistics, TableReference, arrow_datafusion_err, config::{ - CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, + CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, }, file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, @@ -1089,6 +1089,17 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + use_content_defined_chunking: value.content_defined_chunking.map(|cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + // proto3 uses 0 as the wire default for uint64; a zero chunk size is + // invalid, so treat it as "field not set" and fall back to the default. + min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, + max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, + // norm_level = 0 is a valid value (and the default), so pass it through directly. + norm_level: cdc.norm_level, + } + }), }) } } @@ -1151,7 +1162,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { column_specific_options.insert(column_name.clone(), options.try_into()?); } } - Ok(TableParquetOptions { + let opts = TableParquetOptions { global: value .global .as_ref() @@ -1159,9 +1170,9 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { .unwrap() .unwrap(), column_specific_options, - key_value_metadata: Default::default(), - crypto: Default::default(), - }) + ..Default::default() + }; + Ok(opts) } } @@ -1261,3 +1272,87 @@ pub(crate) fn csv_writer_options_from_proto( .with_null(writer_options.null_value.clone()) .with_double_quote(writer_options.double_quote)) } + +#[cfg(test)] +mod tests { + use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions}; + + fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions { + let proto: crate::protobuf_common::ParquetOptions = + (&opts).try_into().expect("to_proto"); + ParquetOptions::try_from(&proto).expect("from_proto") + } + + fn table_parquet_options_proto_round_trip( + opts: TableParquetOptions, + ) -> TableParquetOptions { + let proto: crate::protobuf_common::TableParquetOptions = + (&opts).try_into().expect("to_proto"); + TableParquetOptions::try_from(&proto).expect("from_proto") + } + + #[test] + fn test_parquet_options_cdc_disabled_round_trip() { + let opts = ParquetOptions::default(); + assert!(opts.use_content_defined_chunking.is_none()); + let recovered = parquet_options_proto_round_trip(opts.clone()); + assert_eq!(opts, recovered); + } + + #[test] + fn test_parquet_options_cdc_enabled_round_trip() { + let opts = ParquetOptions { + use_content_defined_chunking: Some(CdcOptions { + min_chunk_size: 128 * 1024, + max_chunk_size: 512 * 1024, + norm_level: 2, + }), + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts.clone()); + let cdc = recovered.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 128 * 1024); + assert_eq!(cdc.max_chunk_size, 512 * 1024); + assert_eq!(cdc.norm_level, 2); + } + + #[test] + fn test_parquet_options_cdc_negative_norm_level_round_trip() { + let opts = ParquetOptions { + use_content_defined_chunking: Some(CdcOptions { + norm_level: -3, + ..CdcOptions::default() + }), + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts); + assert_eq!( + recovered.use_content_defined_chunking.unwrap().norm_level, + -3 + ); + } + + #[test] + fn test_table_parquet_options_cdc_round_trip() { + let mut opts = TableParquetOptions::default(); + opts.global.use_content_defined_chunking = Some(CdcOptions { + min_chunk_size: 64 * 1024, + max_chunk_size: 2 * 1024 * 1024, + norm_level: -1, + }); + + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + let cdc = recovered.global.use_content_defined_chunking.unwrap(); + assert_eq!(cdc.min_chunk_size, 64 * 1024); + assert_eq!(cdc.max_chunk_size, 2 * 1024 * 1024); + assert_eq!(cdc.norm_level, -1); + } + + #[test] + fn test_table_parquet_options_cdc_disabled_round_trip() { + let opts = TableParquetOptions::default(); + assert!(opts.global.use_content_defined_chunking.is_none()); + let recovered = table_parquet_options_proto_round_trip(opts.clone()); + assert!(recovered.global.use_content_defined_chunking.is_none()); + } +} diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b00e7546bba20..77a3b71488ece 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -883,6 +883,144 @@ impl<'de> serde::Deserialize<'de> for AvroOptions { deserializer.deserialize_struct("datafusion_common.AvroOptions", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for CdcOptions { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.min_chunk_size != 0 { + len += 1; + } + if self.max_chunk_size != 0 { + len += 1; + } + if self.norm_level != 0 { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion_common.CdcOptions", len)?; + if self.min_chunk_size != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("minChunkSize", ToString::to_string(&self.min_chunk_size).as_str())?; + } + if self.max_chunk_size != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("maxChunkSize", ToString::to_string(&self.max_chunk_size).as_str())?; + } + if self.norm_level != 0 { + struct_ser.serialize_field("normLevel", &self.norm_level)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for CdcOptions { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "min_chunk_size", + "minChunkSize", + "max_chunk_size", + "maxChunkSize", + "norm_level", + "normLevel", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + MinChunkSize, + MaxChunkSize, + NormLevel, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "minChunkSize" | "min_chunk_size" => Ok(GeneratedField::MinChunkSize), + "maxChunkSize" | "max_chunk_size" => Ok(GeneratedField::MaxChunkSize), + "normLevel" | "norm_level" => Ok(GeneratedField::NormLevel), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = CdcOptions; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion_common.CdcOptions") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut min_chunk_size__ = None; + let mut max_chunk_size__ = None; + let mut norm_level__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::MinChunkSize => { + if min_chunk_size__.is_some() { + return Err(serde::de::Error::duplicate_field("minChunkSize")); + } + min_chunk_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::MaxChunkSize => { + if max_chunk_size__.is_some() { + return Err(serde::de::Error::duplicate_field("maxChunkSize")); + } + max_chunk_size__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::NormLevel => { + if norm_level__.is_some() { + return Err(serde::de::Error::duplicate_field("normLevel")); + } + norm_level__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + } + } + Ok(CdcOptions { + min_chunk_size: min_chunk_size__.unwrap_or_default(), + max_chunk_size: max_chunk_size__.unwrap_or_default(), + norm_level: norm_level__.unwrap_or_default(), + }) + } + } + deserializer.deserialize_struct("datafusion_common.CdcOptions", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for Column { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -5695,6 +5833,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { len += 1; } + if self.content_defined_chunking.is_some() { + len += 1; + } if self.metadata_size_hint_opt.is_some() { len += 1; } @@ -5806,6 +5947,9 @@ impl serde::Serialize for ParquetOptions { if !self.created_by.is_empty() { struct_ser.serialize_field("createdBy", &self.created_by)?; } + if let Some(v) = self.content_defined_chunking.as_ref() { + struct_ser.serialize_field("contentDefinedChunking", v)?; + } if let Some(v) = self.metadata_size_hint_opt.as_ref() { match v { parquet_options::MetadataSizeHintOpt::MetadataSizeHint(v) => { @@ -5944,6 +6088,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "maxRowGroupSize", "created_by", "createdBy", + "content_defined_chunking", + "contentDefinedChunking", "metadata_size_hint", "metadataSizeHint", "compression", @@ -5989,6 +6135,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { DataPageRowCountLimit, MaxRowGroupSize, CreatedBy, + ContentDefinedChunking, MetadataSizeHint, Compression, DictionaryEnabled, @@ -6042,6 +6189,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), "createdBy" | "created_by" => Ok(GeneratedField::CreatedBy), + "contentDefinedChunking" | "content_defined_chunking" => Ok(GeneratedField::ContentDefinedChunking), "metadataSizeHint" | "metadata_size_hint" => Ok(GeneratedField::MetadataSizeHint), "compression" => Ok(GeneratedField::Compression), "dictionaryEnabled" | "dictionary_enabled" => Ok(GeneratedField::DictionaryEnabled), @@ -6093,6 +6241,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; let mut created_by__ = None; + let mut content_defined_chunking__ = None; let mut metadata_size_hint_opt__ = None; let mut compression_opt__ = None; let mut dictionary_enabled_opt__ = None; @@ -6246,6 +6395,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } created_by__ = Some(map_.next_value()?); } + GeneratedField::ContentDefinedChunking => { + if content_defined_chunking__.is_some() { + return Err(serde::de::Error::duplicate_field("contentDefinedChunking")); + } + content_defined_chunking__ = map_.next_value()?; + } GeneratedField::MetadataSizeHint => { if metadata_size_hint_opt__.is_some() { return Err(serde::de::Error::duplicate_field("metadataSizeHint")); @@ -6336,6 +6491,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), created_by: created_by__.unwrap_or_default(), + content_defined_chunking: content_defined_chunking__, metadata_size_hint_opt: metadata_size_hint_opt__, compression_opt: compression_opt__, dictionary_enabled_opt: dictionary_enabled_opt__, diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a09826a29be52..1251a51ab0983 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -838,6 +838,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + #[prost(message, optional, tag = "35")] + pub content_defined_chunking: ::core::option::Option, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -931,6 +933,15 @@ pub mod parquet_options { MaxPredicateCacheSize(u64), } } +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CdcOptions { + #[prost(uint64, tag = "1")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "2")] + pub max_chunk_size: u64, + #[prost(int32, tag = "3")] + pub norm_level: i32, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { #[prost(enumeration = "PrecisionInfo", tag = "1")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..65089f029b866 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -904,6 +904,13 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { skip_arrow_metadata: value.skip_arrow_metadata, coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + content_defined_chunking: value.use_content_defined_chunking.as_ref().map(|cdc| + protobuf::CdcOptions { + min_chunk_size: cdc.min_chunk_size as u64, + max_chunk_size: cdc.max_chunk_size as u64, + norm_level: cdc.norm_level, + } + ), }) } } @@ -963,8 +970,11 @@ impl TryFrom<&TableParquetOptions> for protobuf::TableParquetOptions { .iter() .filter_map(|(k, v)| v.as_ref().map(|v| (k.clone(), v.clone()))) .collect::>(); + + let global: protobuf::ParquetOptions = (&value.global).try_into()?; + Ok(protobuf::TableParquetOptions { - global: Some((&value.global).try_into()?), + global: Some(global), column_specific_options, key_value_metadata, }) diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a09826a29be52..1251a51ab0983 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -838,6 +838,8 @@ pub struct ParquetOptions { pub max_row_group_size: u64, #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, + #[prost(message, optional, tag = "35")] + pub content_defined_chunking: ::core::option::Option, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] pub metadata_size_hint_opt: ::core::option::Option< parquet_options::MetadataSizeHintOpt, @@ -931,6 +933,15 @@ pub mod parquet_options { MaxPredicateCacheSize(u64), } } +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct CdcOptions { + #[prost(uint64, tag = "1")] + pub min_chunk_size: u64, + #[prost(uint64, tag = "2")] + pub max_chunk_size: u64, + #[prost(int32, tag = "3")] + pub norm_level: i32, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct Precision { #[prost(enumeration = "PrecisionInfo", tag = "1")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..8df0b3f1d9705 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -351,13 +351,13 @@ mod parquet { use super::*; use crate::protobuf::{ - ParquetColumnOptions as ParquetColumnOptionsProto, ParquetColumnSpecificOptions, - ParquetOptions as ParquetOptionsProto, + CdcOptions as CdcOptionsProto, ParquetColumnOptions as ParquetColumnOptionsProto, + ParquetColumnSpecificOptions, ParquetOptions as ParquetOptionsProto, TableParquetOptions as TableParquetOptionsProto, parquet_column_options, parquet_options, }; use datafusion_common::config::{ - ParquetColumnOptions, ParquetOptions, TableParquetOptions, + CdcOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, }; use datafusion_datasource_parquet::file_format::ParquetFormatFactory; @@ -426,6 +426,13 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), + content_defined_chunking: global_options.global.use_content_defined_chunking.as_ref().map(|cdc| { + CdcOptionsProto { + min_chunk_size: cdc.min_chunk_size as u64, + max_chunk_size: cdc.max_chunk_size as u64, + norm_level: cdc.norm_level, + } + }), }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -525,6 +532,17 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| { + let defaults = CdcOptions::default(); + CdcOptions { + // proto3 uses 0 as the wire default for uint64; a zero chunk size is + // invalid, so treat it as "field not set" and fall back to the default. + min_chunk_size: if cdc.min_chunk_size != 0 { cdc.min_chunk_size as usize } else { defaults.min_chunk_size }, + max_chunk_size: if cdc.max_chunk_size != 0 { cdc.max_chunk_size as usize } else { defaults.max_chunk_size }, + // norm_level = 0 is a valid value (and the default), so pass it through directly. + norm_level: cdc.norm_level, + } + }), } } } @@ -585,7 +603,7 @@ mod parquet { .iter() .map(|(k, v)| (k.clone(), Some(v.clone()))) .collect(), - crypto: Default::default(), + ..Default::default() } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index a334bd25b0ce3..77ae1d335fb8d 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -259,6 +259,7 @@ datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.statistics_truncate_length 64 +datafusion.execution.parquet.use_content_defined_chunking NULL datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 datafusion.execution.perfect_hash_join_min_key_density 0.15 @@ -401,6 +402,7 @@ datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding t datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting +datafusion.execution.parquet.use_content_defined_chunking NULL (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in rows datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" datafusion.execution.perfect_hash_join_min_key_density 0.15 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. diff --git a/datafusion/sqllogictest/test_files/parquet_cdc.slt b/datafusion/sqllogictest/test_files/parquet_cdc.slt new file mode 100644 index 0000000000000..f87f05af74a0c --- /dev/null +++ b/datafusion/sqllogictest/test_files/parquet_cdc.slt @@ -0,0 +1,231 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Test parquet content-defined chunking (CDC) end-to-end: +# write parquet files with CDC enabled, then read them back and verify correctness. + +# Create source data +statement ok +CREATE TABLE cdc_source AS VALUES + (1, 'alice', 100.50), + (2, 'bob', 200.75), + (3, 'charlie', 300.25), + (4, 'diana', 400.00), + (5, 'eve', 500.99) + +# +# Test 1: Enable CDC with 'true' (uses default options) +# + +query I +COPY cdc_source TO 'test_files/scratch/parquet_cdc/enabled_true/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) +---- +5 + +statement ok +CREATE EXTERNAL TABLE cdc_enabled_true_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/enabled_true/' + +query ITR rowsort +SELECT * FROM cdc_enabled_true_read +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# Verify filtering works on CDC-written files +query ITR +SELECT * FROM cdc_enabled_true_read WHERE column1 > 3 ORDER BY column1 +---- +4 diana 400 +5 eve 500.99 + +# Verify aggregation works on CDC-written files +query R +SELECT SUM(column3) FROM cdc_enabled_true_read +---- +1502.49 + +# +# Test 2: Disable CDC with 'false' (same as default behavior) +# + +query I +COPY cdc_source TO 'test_files/scratch/parquet_cdc/disabled_false/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'false' +) +---- +5 + +statement ok +CREATE EXTERNAL TABLE cdc_disabled_false_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/disabled_false/' + +query ITR rowsort +SELECT * FROM cdc_disabled_false_read +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# +# Test 3: Enable CDC with custom sub-field options +# + +query I +COPY cdc_source TO 'test_files/scratch/parquet_cdc/custom_chunks/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking.min_chunk_size' '1024', + 'format.use_content_defined_chunking.max_chunk_size' '4096', + 'format.use_content_defined_chunking.norm_level' '1' +) +---- +5 + +statement ok +CREATE EXTERNAL TABLE cdc_custom_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/custom_chunks/' + +query ITR rowsort +SELECT * FROM cdc_custom_read +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# +# Test 4: Write via external table with CDC enabled +# + +statement ok +CREATE EXTERNAL TABLE cdc_external_write ( + id INT, + name VARCHAR, + value DOUBLE +) +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/external_table/' OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) + +query I +INSERT INTO cdc_external_write SELECT * FROM cdc_source +---- +5 + +query ITR rowsort +SELECT * FROM cdc_external_write +---- +1 alice 100.5 +2 bob 200.75 +3 charlie 300.25 +4 diana 400 +5 eve 500.99 + +# +# Test 5: Write larger dataset to exercise CDC chunking logic +# + +statement ok +CREATE TABLE cdc_large_source AS + SELECT + value as id, + CONCAT('name_', CAST(value AS VARCHAR)) as name, + CAST(value AS DOUBLE) * 1.5 as amount, + CASE WHEN value % 2 = 0 THEN true ELSE false END as flag + FROM generate_series(1, 1000) t + +query I +COPY cdc_large_source TO 'test_files/scratch/parquet_cdc/large/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) +---- +1000 + +statement ok +CREATE EXTERNAL TABLE cdc_large_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/large/' + +query I +SELECT COUNT(*) FROM cdc_large_read +---- +1000 + +query IR +SELECT MIN(id), MIN(amount) FROM cdc_large_read +---- +1 1.5 + +query IR +SELECT MAX(id), MAX(amount) FROM cdc_large_read +---- +1000 1500 + +query I +SELECT COUNT(*) FROM cdc_large_read WHERE flag = true +---- +500 + +# +# Test 6: CDC with different data types including NULLs +# + +statement ok +CREATE TABLE cdc_types_source AS VALUES + (1::INT, 'text'::VARCHAR, 3.14::DOUBLE, true::BOOLEAN, DATE '2024-01-15', TIMESTAMP '2024-01-15 10:30:00'), + (2::INT, 'more'::VARCHAR, 2.72::DOUBLE, false::BOOLEAN, DATE '2024-06-20', TIMESTAMP '2024-06-20 14:45:00'), + (3::INT, NULL::VARCHAR, NULL::DOUBLE, NULL::BOOLEAN, NULL::DATE, NULL::TIMESTAMP) + +query I +COPY cdc_types_source TO 'test_files/scratch/parquet_cdc/types/' +STORED AS PARQUET +OPTIONS ( + 'format.use_content_defined_chunking' 'true' +) +---- +3 + +statement ok +CREATE EXTERNAL TABLE cdc_types_read +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_cdc/types/' + +query ITRBDP rowsort +SELECT * FROM cdc_types_read +---- +1 text 3.14 true 2024-01-15 2024-01-15T10:30:00 +2 more 2.72 false 2024-06-20 2024-06-20T14:45:00 +3 NULL NULL NULL NULL NULL diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 69627e3cb9148..be42f4a0becb8 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -112,6 +112,7 @@ The following configuration settings are available: | datafusion.execution.parquet.allow_single_file_parallelism | true | (writing) Controls whether DataFusion will attempt to speed up writing parquet files by serializing them in parallel. Each column in each row group in each output file are serialized in parallel leveraging a maximum possible core count of n_files*n_row_groups*n_columns. | | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | +| datafusion.execution.parquet.use_content_defined_chunking | NULL | (writing) EXPERIMENTAL: Enable content-defined chunking (CDC) when writing parquet files. When `Some`, CDC is enabled with the given options; when `None` (the default), CDC is disabled. When CDC is enabled, parallel writing is automatically disabled since the chunker state must persist across row groups. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | | datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.spill_compression | uncompressed | Sets the compression codec used when spilling data to disk. Since datafusion writes spill files using the Arrow IPC Stream format, only codecs supported by the Arrow IPC Stream Writer are allowed. Valid values are: uncompressed, lz4_frame, zstd. Note: lz4_frame offers faster (de)compression, but typically results in larger spill files. In contrast, zstd achieves higher compression ratios at the cost of slower (de)compression speed. |