-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Closed
Copy link
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
i have a plan like below that have two consecutive FilterExec, so i use FilterPushdown two merge it, but after the merge and execute, it report error. if i not call FilterPushDown, the plan can output result.
Physical plan before pushdown filter:
FilterExec: event@0 = Ingestion, projection=[size@1]
FilterExec: time@0 < 1770020321951000, projection=[event@1, size@2]
DataSourceExec: partitions=1, partition_sizes=[11]
Get result before pushdown filter, result size: 8
Physical plan after pushdown filter:
ProjectionExec: expr=[size@1 as size]
FilterExec: event@0 = Ingestion AND time@0 < 1770020321951000
DataSourceExec: partitions=1, partition_sizes=[11]
Error: ArrowError(InvalidArgumentError("Invalid comparison operation: Int64 == Utf8"), Some(""))
the event@0 is not correct, it should be event@1
To Reproduce
use std::sync::Arc;
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::Result;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::physical_expr::expressions::{BinaryExpr, col, lit};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::{ExecutionPlan, collect, displayable};
use datafusion::prelude::SessionContext;
use datafusion_expr::Operator;
#[tokio::main]
async fn main() -> Result<()> {
// 1. Create data source
let memory_exec = create_data_source();
// 2. First FilterExec: time < 1770020321951000 with projection=[event@1, size@2]
let timestamp_col = col("time", &memory_exec.schema())?;
let upper_bound = lit(1770020321951000i64);
let lt_expr = Arc::new(BinaryExpr::new(timestamp_col, Operator::Lt, upper_bound));
let filter1_exec: Arc<dyn ExecutionPlan> = Arc::new(
FilterExec::try_new(lt_expr, memory_exec)?.with_projection(Some(vec![1, 2]))?, // projection=[event@1, size@2]
);
// 3. Second FilterExec: event = 'Ingestion' with projection=[size]
let event_col = col("event", &filter1_exec.schema())?;
let ingestion_lit = lit("Ingestion");
let event_filter = Arc::new(BinaryExpr::new(event_col, Operator::Eq, ingestion_lit));
let filter2_exec: Arc<dyn ExecutionPlan> = Arc::new(
FilterExec::try_new(event_filter, filter1_exec)?
.with_projection(Some(vec![1]))?, // projection=[size@1]
);
println!("Physical plan before pushdown filter:");
print!(
"{}",
displayable(filter2_exec.as_ref()).indent(true).to_string()
);
// Execute the plan
let ctx = SessionContext::new();
let result = collect(filter2_exec.clone(), ctx.task_ctx()).await?;
println!(
"Get result before pushdown filter, result size: {:?}\n",
result.iter().fold(0, |acc, x| acc + x.num_rows())
);
// After pushdown filter
let pushdown_filter = FilterPushdown::new();
let optimized_plan =
pushdown_filter.optimize(filter2_exec.clone(), ctx.state().config_options())?;
println!("Physical plan after pushdown filter:");
print!(
"{}",
displayable(optimized_plan.as_ref())
.indent(true)
.to_string(),
);
let result = collect(optimized_plan.clone(), ctx.task_ctx()).await?;
println!("{:?}", result);
Ok(())
}
fn create_data_source() -> Arc<dyn ExecutionPlan> {
// Create schema: time, event, size
let schema = Arc::new(Schema::new(vec![
Field::new("time", DataType::Int64, false),
Field::new("event", DataType::Utf8, false),
Field::new("size", DataType::Int64, false),
]));
// Create 11 batches of sample data to get partition_sizes=[11]
let timestamps = vec![
1770020021951000i64,
1770020021951001,
1770020021951002,
1770020021951003,
1770020021951004,
1770020021951005,
1770020021951006,
1770020021951007,
1770020021951008,
1770020021951009,
1770020021951010,
];
let events = vec![
"Ingestion",
"Ingestion",
"Query",
"Ingestion",
"Ingestion",
"Query",
"Ingestion",
"Ingestion",
"Ingestion",
"Query",
"Ingestion",
];
let sizes = vec![100i64, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100];
// Create 11 separate batches (one row each) to get partition_sizes=[11]
let batches: Vec<RecordBatch> = (0..11)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![timestamps[i]])),
Arc::new(StringArray::from(vec![events[i]])),
Arc::new(Int64Array::from(vec![sizes[i]])),
],
)
.unwrap()
})
.collect();
// Create MemorySourceConfig to create the data source
MemorySourceConfig::try_new_exec(&[batches], schema.clone(), None).unwrap()
}Expected behavior
success run
Additional context
test all version that have FilterPushdown, all not work
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working