Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3720,3 +3720,90 @@ async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
);
}
}

/// Regression test for https://github.com/apache/datafusion/issues/20109
#[tokio::test]
async fn test_filter_with_projection_pushdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance you could add an SLT test that reproduces this? It could go in datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

use arrow::array::{Int64Array, RecordBatch, StringArray};
use datafusion_physical_plan::collect;
use datafusion_physical_plan::filter::FilterExecBuilder;

// Create schema: [time, event, size]
let schema = Arc::new(Schema::new(vec![
Field::new("time", DataType::Int64, false),
Field::new("event", DataType::Utf8, false),
Field::new("size", DataType::Int64, false),
]));

// Create sample data
let timestamps = vec![100i64, 200, 300, 400, 500];
let events = vec!["Ingestion", "Ingestion", "Query", "Ingestion", "Query"];
let sizes = vec![10i64, 20, 30, 40, 50];

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(timestamps)),
Arc::new(StringArray::from(events)),
Arc::new(Int64Array::from(sizes)),
],
)
.unwrap();

// Create data source
let memory_exec = datafusion_datasource::memory::MemorySourceConfig::try_new_exec(
&[vec![batch]],
schema.clone(),
None,
)
.unwrap();

// First FilterExec: time < 350 with projection=[event@1, size@2]
let time_col = col("time", &memory_exec.schema()).unwrap();
let time_filter = Arc::new(BinaryExpr::new(
time_col,
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int64(Some(350)))),
));
let filter1 = Arc::new(
FilterExecBuilder::new(time_filter, memory_exec)
.apply_projection(Some(vec![1, 2]))
.unwrap()
.build()
.unwrap(),
);

// Second FilterExec: event = 'Ingestion' with projection=[size@1]
let event_col = col("event", &filter1.schema()).unwrap();
let event_filter = Arc::new(BinaryExpr::new(
event_col,
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Utf8(Some(
"Ingestion".to_string(),
)))),
));
let filter2 = Arc::new(
FilterExecBuilder::new(event_filter, filter1)
.apply_projection(Some(vec![1]))
.unwrap()
.build()
.unwrap(),
);

// Apply filter pushdown optimization
let config = ConfigOptions::default();
let optimized_plan = FilterPushdown::new()
.optimize(Arc::clone(&filter2) as Arc<dyn ExecutionPlan>, &config)
.unwrap();

// Execute the optimized plan - this should not error
let ctx = SessionContext::new();
let result = collect(optimized_plan, ctx.task_ctx()).await.unwrap();

// Verify results: should return rows where time < 350 AND event = 'Ingestion'
// That's rows with time=100,200 (both have event='Ingestion'), so sizes 10,20
let expected = [
"+------+", "| size |", "+------+", "| 10 |", "| 20 |", "+------+",
];
assert_batches_eq!(expected, &result);
}
30 changes: 23 additions & 7 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use datafusion_expr::Operator;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, lit};
use datafusion_physical_expr::intervals::utils::check_support;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::utils::{collect_columns, reassign_expr_columns};
use datafusion_physical_expr::{
AcrossPartitions, AnalysisContext, ConstExpr, ExprBoundaries, PhysicalExpr, analyze,
conjunction, split_conjunction,
Expand Down Expand Up @@ -623,10 +623,26 @@ impl ExecutionPlan for FilterExec {
return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
}
// We absorb any parent filters that were not handled by our children
let unsupported_parent_filters =
child_pushdown_result.parent_filters.iter().filter_map(|f| {
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
});
let mut unsupported_parent_filters: Vec<Arc<dyn PhysicalExpr>> =
child_pushdown_result
.parent_filters
.iter()
.filter_map(|f| {
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
})
.collect();
Comment on lines -626 to +633
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this is the same code, just added the mut which caused a reformat.


// If this FilterExec has a projection, the unsupported parent filters
// are in the output schema (after projection) coordinates. We need to
// remap them to the input schema coordinates before combining with self filters.
if self.projection.is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻 note that this is because filters get evaluated before the projection

let input_schema = self.input().schema();
unsupported_parent_filters = unsupported_parent_filters
.into_iter()
.map(|expr| reassign_expr_columns(expr, &input_schema))
.collect::<Result<Vec<_>>>()?;
}

let unsupported_self_filters = child_pushdown_result
.self_filters
.first()
Expand Down Expand Up @@ -674,7 +690,7 @@ impl ExecutionPlan for FilterExec {
// The new predicate is the same as our current predicate
None
} else {
// Create a new FilterExec with the new predicate
// Create a new FilterExec with the new predicate, preserving the projection
let new = FilterExec {
predicate: Arc::clone(&new_predicate),
input: Arc::clone(&filter_input),
Expand All @@ -686,7 +702,7 @@ impl ExecutionPlan for FilterExec {
self.default_selectivity,
self.projection.as_ref(),
)?,
projection: None,
projection: self.projection.clone(),
batch_size: self.batch_size,
fetch: self.fetch,
};
Expand Down