Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/data_io/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ impl PruningStatistics for ParquetMetadataIndex {
}

/// return the row counts for each file
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
Some(self.row_counts_ref().clone())
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/query_planning/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl PruningStatistics for MyCatalog {
None
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
// In this example, we know nothing about the number of rows in each file
None
}
Expand Down
71 changes: 34 additions & 37 deletions datafusion/common/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,17 @@ pub trait PruningStatistics {
/// [`UInt64Array`]: arrow::array::UInt64Array
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;

/// Return the number of rows for the named column in each container
/// as an [`UInt64Array`].
/// Return the number of rows in each container as an [`UInt64Array`].
///
/// Row counts are container-level (not column-specific) — the value
/// is the same regardless of which column is being considered.
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
///
/// [`UInt64Array`]: arrow::array::UInt64Array
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
fn row_counts(&self) -> Option<ArrayRef>;

/// Returns [`BooleanArray`] where each row represents information known
/// about specific literal `values` in a column.
Expand Down Expand Up @@ -265,7 +267,7 @@ impl PruningStatistics for PartitionPruningStatistics {
None
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
None
}

Expand Down Expand Up @@ -398,11 +400,7 @@ impl PruningStatistics for PrunableStatistics {
}
}

fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
// If the column does not exist in the schema, return None
if self.schema.index_of(column.name()).is_err() {
return None;
}
fn row_counts(&self) -> Option<ArrayRef> {
if self
.statistics
.iter()
Expand Down Expand Up @@ -502,9 +500,9 @@ impl PruningStatistics for CompositePruningStatistics {
None
}

fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
for stats in &self.statistics {
if let Some(array) = stats.row_counts(column) {
if let Some(array) = stats.row_counts() {
return Some(array);
}
}
Expand Down Expand Up @@ -566,9 +564,9 @@ mod tests {

// Partition values don't know anything about nulls or row counts
assert!(partition_stats.null_counts(&column_a).is_none());
assert!(partition_stats.row_counts(&column_a).is_none());
assert!(partition_stats.row_counts().is_none());
assert!(partition_stats.null_counts(&column_b).is_none());
assert!(partition_stats.row_counts(&column_b).is_none());
assert!(partition_stats.row_counts().is_none());

// Min/max values are the same as the partition values
let min_values_a =
Expand Down Expand Up @@ -709,9 +707,9 @@ mod tests {

// Partition values don't know anything about nulls or row counts
assert!(partition_stats.null_counts(&column_a).is_none());
assert!(partition_stats.row_counts(&column_a).is_none());
assert!(partition_stats.row_counts().is_none());
assert!(partition_stats.null_counts(&column_b).is_none());
assert!(partition_stats.row_counts(&column_b).is_none());
assert!(partition_stats.row_counts().is_none());

// Min/max values are all missing
assert!(partition_stats.min_values(&column_a).is_none());
Expand Down Expand Up @@ -814,13 +812,13 @@ mod tests {
assert_eq!(null_counts_b, expected_null_counts_b);

// Row counts are the same as the statistics
let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap())
let row_counts_a = as_uint64_array(&pruning_stats.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts_a = vec![Some(100), Some(200)];
assert_eq!(row_counts_a, expected_row_counts_a);
let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap())
let row_counts_b = as_uint64_array(&pruning_stats.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
Expand All @@ -845,20 +843,21 @@ mod tests {
// This is debatable, personally I think `row_count` should not take a `Column` as an argument
// at all since all columns should have the same number of rows.
// But for now we just document the current behavior in this test.
let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap())
let row_counts_c = as_uint64_array(&pruning_stats.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts_c = vec![Some(100), Some(200)];
assert_eq!(row_counts_c, expected_row_counts_c);
assert!(pruning_stats.contained(&column_c, &values).is_none());

// Test with a column that doesn't exist
// Test with a column that doesn't exist — column-specific stats
// return None, but row_counts is container-level and still available
let column_d = Column::new_unqualified("d");
assert!(pruning_stats.min_values(&column_d).is_none());
assert!(pruning_stats.max_values(&column_d).is_none());
assert!(pruning_stats.null_counts(&column_d).is_none());
assert!(pruning_stats.row_counts(&column_d).is_none());
assert!(pruning_stats.row_counts().is_some());
assert!(pruning_stats.contained(&column_d, &values).is_none());
}

Expand Down Expand Up @@ -886,8 +885,8 @@ mod tests {
assert!(pruning_stats.null_counts(&column_b).is_none());

// Row counts are all missing
assert!(pruning_stats.row_counts(&column_a).is_none());
assert!(pruning_stats.row_counts(&column_b).is_none());
assert!(pruning_stats.row_counts().is_none());
assert!(pruning_stats.row_counts().is_none());

// Contained values are all empty
let values = HashSet::from([ScalarValue::from(1i32)]);
Expand Down Expand Up @@ -1027,13 +1026,11 @@ mod tests {
let expected_null_counts_col_x = vec![Some(0), Some(10)];
assert_eq!(null_counts_col_x, expected_null_counts_col_x);

// Test row counts - only available from file statistics
assert!(composite_stats.row_counts(&part_a).is_none());
let row_counts_col_x =
as_uint64_array(&composite_stats.row_counts(&col_x).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
// Test row counts — container-level, available from file statistics
let row_counts_col_x = as_uint64_array(&composite_stats.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts = vec![Some(100), Some(200)];
assert_eq!(row_counts_col_x, expected_row_counts);

Expand All @@ -1046,12 +1043,13 @@ mod tests {
// File statistics don't implement contained
assert!(composite_stats.contained(&col_x, &values).is_none());

// Non-existent column should return None for everything
// Non-existent column should return None for column-specific stats,
// but row_counts is container-level and still available
let non_existent = Column::new_unqualified("non_existent");
assert!(composite_stats.min_values(&non_existent).is_none());
assert!(composite_stats.max_values(&non_existent).is_none());
assert!(composite_stats.null_counts(&non_existent).is_none());
assert!(composite_stats.row_counts(&non_existent).is_none());
assert!(composite_stats.row_counts().is_some());
assert!(composite_stats.contained(&non_existent, &values).is_none());

// Verify num_containers matches
Expand Down Expand Up @@ -1155,7 +1153,7 @@ mod tests {
let expected_null_counts = vec![Some(0), Some(5)];
assert_eq!(null_counts, expected_null_counts);

let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap())
let row_counts = as_uint64_array(&composite_stats.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
Expand Down Expand Up @@ -1195,11 +1193,10 @@ mod tests {
let expected_null_counts = vec![Some(10), Some(20)];
assert_eq!(null_counts, expected_null_counts);

let row_counts =
as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let row_counts = as_uint64_array(&composite_stats_reversed.row_counts().unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts = vec![Some(1000), Some(2000)];
assert_eq!(row_counts, expected_row_counts);
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource-parquet/src/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ impl PruningStatistics for PagesPruningStatistics<'_> {
}
}

fn row_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
match self.converter.data_page_row_counts(
self.offset_index,
self.row_group_metadatas,
Expand Down
18 changes: 9 additions & 9 deletions datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use super::{ParquetAccessPlan, ParquetFileMetrics};
use arrow::array::{ArrayRef, BooleanArray};
use arrow::array::{ArrayRef, BooleanArray, UInt64Array};
use arrow::datatypes::Schema;
use datafusion_common::pruning::PruningStatistics;
use datafusion_common::{Column, Result, ScalarValue};
Expand Down Expand Up @@ -536,7 +536,7 @@ impl PruningStatistics for BloomFilterStatistics {
None
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
None
}

Expand Down Expand Up @@ -626,13 +626,13 @@ impl PruningStatistics for RowGroupPruningStatistics<'_> {
.map(|counts| Arc::new(counts) as ArrayRef)
}

fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
// row counts are the same for all columns in a row group
self.statistics_converter(column)
.and_then(|c| Ok(c.row_group_row_counts(self.metadata_iter())?))
.ok()
.flatten()
.map(|counts| Arc::new(counts) as ArrayRef)
fn row_counts(&self) -> Option<ArrayRef> {
// Row counts are container-level — read directly from row group metadata.
let counts: UInt64Array = self
.metadata_iter()
.map(|rg| Some(rg.num_rows() as u64))
.collect();
Some(Arc::new(counts) as ArrayRef)
}

fn contained(
Expand Down
11 changes: 5 additions & 6 deletions datafusion/pruning/src/pruning_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
StatisticsType::Min => statistics.min_values(&column),
StatisticsType::Max => statistics.max_values(&column),
StatisticsType::NullCount => statistics.null_counts(&column),
StatisticsType::RowCount => statistics.row_counts(&column),
StatisticsType::RowCount => statistics.row_counts(),
};
let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));

Expand Down Expand Up @@ -2300,11 +2300,10 @@ mod tests {
.unwrap_or(None)
}

fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
self.stats
.get(column)
.map(|container_stats| container_stats.row_counts())
.unwrap_or(None)
.values()
.find_map(|container_stats| container_stats.row_counts())
}

fn contained(
Expand Down Expand Up @@ -2342,7 +2341,7 @@ mod tests {
None
}

fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
fn row_counts(&self) -> Option<ArrayRef> {
None
}

Expand Down
35 changes: 35 additions & 0 deletions docs/source/library-user-guide/upgrading/54.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,41 @@ auto-derefs through the `Arc`).
> always return `None`. Use the `downcast_ref` method above instead, or
> dereference through the `Arc` first with `plan.as_ref() as &dyn Any`.

### `PruningStatistics::row_counts` no longer takes a `column` parameter
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb added upgrade guide here


The `row_counts` method on the `PruningStatistics` trait no longer takes a
`&Column` argument, since row counts are a container-level property (the same
for every column).

**Before:**

```rust,ignore
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
// ...
}
```

**After:**

```rust,ignore
fn row_counts(&self) -> Option<ArrayRef> {
// ...
}
```

**Who is affected:**

- Users who implement the `PruningStatistics` trait

**Migration guide:**

Remove the `column: &Column` parameter from your `row_counts` implementation
and any corresponding call sites. If your implementation was using the column
argument, note that row counts are identical for all columns in a container, so
the parameter was unnecessary.

See [PR #21369](https://github.com/apache/datafusion/pull/21369) for details.

### Avro API and timestamp decoding changes

DataFusion has switched to use `arrow-avro` (see [#17861]) when reading avro files
Expand Down
Loading