Skip to content

FilterPushdown do not generate correct column index when merge FilterExec #20109

@haohuaijin

Description

@haohuaijin

Describe the bug

i have a plan like below that have two consecutive FilterExec, so i use FilterPushdown two merge it, but after the merge and execute, it report error. if i not call FilterPushDown, the plan can output result.

Physical plan before pushdown filter:
FilterExec: event@0 = Ingestion, projection=[size@1]
  FilterExec: time@0 < 1770020321951000, projection=[event@1, size@2]
    DataSourceExec: partitions=1, partition_sizes=[11]
Get result before pushdown filter, result size: 8

Physical plan after pushdown filter:
ProjectionExec: expr=[size@1 as size]
  FilterExec: event@0 = Ingestion AND time@0 < 1770020321951000
    DataSourceExec: partitions=1, partition_sizes=[11]
Error: ArrowError(InvalidArgumentError("Invalid comparison operation: Int64 == Utf8"), Some(""))

the event@0 is not correct, it should be event@1

To Reproduce

use std::sync::Arc;

use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::Result;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::physical_expr::expressions::{BinaryExpr, col, lit};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::{ExecutionPlan, collect, displayable};
use datafusion::prelude::SessionContext;
use datafusion_expr::Operator;

#[tokio::main]
async fn main() -> Result<()> {
    // 1. Create data source
    let memory_exec = create_data_source();

    // 2. First FilterExec: time < 1770020321951000 with projection=[event@1, size@2]
    let timestamp_col = col("time", &memory_exec.schema())?;
    let upper_bound = lit(1770020321951000i64);
    let lt_expr = Arc::new(BinaryExpr::new(timestamp_col, Operator::Lt, upper_bound));
    let filter1_exec: Arc<dyn ExecutionPlan> = Arc::new(
        FilterExec::try_new(lt_expr, memory_exec)?.with_projection(Some(vec![1, 2]))?, // projection=[event@1, size@2]
    );

    // 3. Second FilterExec: event = 'Ingestion' with projection=[size]
    let event_col = col("event", &filter1_exec.schema())?;
    let ingestion_lit = lit("Ingestion");
    let event_filter = Arc::new(BinaryExpr::new(event_col, Operator::Eq, ingestion_lit));
    let filter2_exec: Arc<dyn ExecutionPlan> = Arc::new(
        FilterExec::try_new(event_filter, filter1_exec)?
            .with_projection(Some(vec![1]))?, // projection=[size@1]
    );

    println!("Physical plan before pushdown filter:");
    print!(
        "{}",
        displayable(filter2_exec.as_ref()).indent(true).to_string()
    );

    // Execute the plan
    let ctx = SessionContext::new();
    let result = collect(filter2_exec.clone(), ctx.task_ctx()).await?;

    println!(
        "Get result before pushdown filter, result size: {:?}\n",
        result.iter().fold(0, |acc, x| acc + x.num_rows())
    );

    // After pushdown filter
    let pushdown_filter = FilterPushdown::new();
    let optimized_plan =
        pushdown_filter.optimize(filter2_exec.clone(), ctx.state().config_options())?;

    println!("Physical plan after pushdown filter:");
    print!(
        "{}",
        displayable(optimized_plan.as_ref())
            .indent(true)
            .to_string(),
    );

    let result = collect(optimized_plan.clone(), ctx.task_ctx()).await?;
    println!("{:?}", result);

    Ok(())
}

fn create_data_source() -> Arc<dyn ExecutionPlan> {
    // Create schema: time, event, size
    let schema = Arc::new(Schema::new(vec![
        Field::new("time", DataType::Int64, false),
        Field::new("event", DataType::Utf8, false),
        Field::new("size", DataType::Int64, false),
    ]));

    // Create 11 batches of sample data to get partition_sizes=[11]
    let timestamps = vec![
        1770020021951000i64,
        1770020021951001,
        1770020021951002,
        1770020021951003,
        1770020021951004,
        1770020021951005,
        1770020021951006,
        1770020021951007,
        1770020021951008,
        1770020021951009,
        1770020021951010,
    ];
    let events = vec![
        "Ingestion",
        "Ingestion",
        "Query",
        "Ingestion",
        "Ingestion",
        "Query",
        "Ingestion",
        "Ingestion",
        "Ingestion",
        "Query",
        "Ingestion",
    ];
    let sizes = vec![100i64, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1100];

    // Create 11 separate batches (one row each) to get partition_sizes=[11]
    let batches: Vec<RecordBatch> = (0..11)
        .map(|i| {
            RecordBatch::try_new(
                schema.clone(),
                vec![
                    Arc::new(Int64Array::from(vec![timestamps[i]])),
                    Arc::new(StringArray::from(vec![events[i]])),
                    Arc::new(Int64Array::from(vec![sizes[i]])),
                ],
            )
            .unwrap()
        })
        .collect();

    // Create MemorySourceConfig to create the data source
    MemorySourceConfig::try_new_exec(&[batches], schema.clone(), None).unwrap()
}

Expected behavior

success run

Additional context

test all version that have FilterPushdown, all not work

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions