Skip to content

feat: make execution transfomation application rule generic#25

Open
askalt wants to merge 6 commits intoaskalt/bump-candidatefrom
askalt/generalize-execution-rules-applier
Open

feat: make execution transfomation application rule generic#25
askalt wants to merge 6 commits intoaskalt/bump-candidatefrom
askalt/generalize-execution-rules-applier

Conversation

@askalt
Copy link

@askalt askalt commented Feb 11, 2026

Currently, it depends on the particular execution transformation that resolves placeholders. However, it would be more convenient to be able to specify the rule which is required to be applied to the plan.

@askalt askalt force-pushed the askalt/generalize-execution-rules-applier branch from 5c2f852 to e807be9 Compare February 11, 2026 08:28
@askalt askalt force-pushed the askalt/generalize-execution-rules-applier branch from e807be9 to 316fd62 Compare February 11, 2026 08:57
@askalt askalt force-pushed the askalt/generalize-execution-rules-applier branch 2 times, most recently from c87dd45 to d213820 Compare February 11, 2026 10:15
@askalt
Copy link
Author

askalt commented Feb 11, 2026

@LLDay please, check the patch.


/// Checks if the given [`ExecutionPlan`] node matches the criteria for this rule.
fn matches(&mut self, _node: &Arc<dyn ExecutionPlan>) -> Result<bool> {
fn matches(&self, _node: &Arc<dyn ExecutionPlan>) -> Result<bool> {
Copy link

@LLDay LLDay Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to allow mutations for possible optimization. For example, ResolvePlaceholdersRule may construct a mapping plan_index -> expr_indices that will skip expressions without placeholders. For some reason, I didn't implement it, but that was the idea originally. Now we have a shared reference, and all the rules are stateless. You can resolve this thread if you think this API is suitable for our purposes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this optimization to the current implementation.

Currently, it depends on the particular execution transformation
that resolves placeholders. However, it would be more convenient
to be able to specify the rule which is required to be applied to
the plan.
@askalt
Copy link
Author

askalt commented Feb 11, 2026

Also noticed that when we re-execute a plan, we need to someway clear TransformPlanExec metrics itself.

@askalt
Copy link
Author

askalt commented Feb 11, 2026

Also noticed that when we re-execute a plan, we need to someway clear TransformPlanExec metrics itself.

Considering it, I think, we should not perform re-use plan actions in the separate node (miss-design from my side, it seems we a bit overengineered here). Plan should contain nodes which manipulate with data and are important for user to debug query performance.

I would simplify the code, removing introduced transformations. Let's apply all transformations required to re-execute plan and substitute placeholders separately (e.g., reset_plan_states from DF).

askalt and others added 4 commits February 12, 2026 10:02
Move it out from the separate physical plan into a function
that prepares plan for execution.
…ss costly) (apache#19893)

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes apache#19852

Improve performance of query planning and plan state re-set by making
node clone cheap.

- Store projection as `Option<Arc<[usize]>>` instead of
`Option<Vec<usize>>` in `FilterExec`, `HashJoinExec`,
`NestedLoopJoinExec`.
- Store exprs as `Arc<[ProjectionExpr]>` instead of Vec in
`ProjectionExprs`.
- Store arced aggregation, filter, group by expressions within
`AggregateExec`.
This patch aims to implement a fast-path for the ExecutionPlan::with_new_children function
for some plans, moving closer to a physical plan re-use implementation and improving planning
performance. If the passed children properties are the same as in self, we do not actually
recompute self's properties  (which could be costly if projection mapping is required).
Instead, we just replace the children and re-use self's properties as-is.

To be able to compare two different properties -- ExecutionPlan::properties(...) signature
is modified and now returns `&Arc<PlanProperties>`. If `children` properties are the same
in `with_new_children` -- we clone our properties arc and then a parent plan will consider
our properties as unchanged, doing the same.

- Return `&Arc<PlanProperties>` from `ExecutionPlan::properties(...)` instead of a reference.
- Implement `with_new_children` fast-path if there is no children properties changes for all
  major plans.

Note: currently, `reset_plan_states` does not allow to re-use plan in general: it is not
supported for dynamic filters and recursive queries features, as in this case state reset
should update pointers in the children plans.

Closes apache#19796
@LLDay
Copy link

LLDay commented Feb 12, 2026

Let's apply all transformations required to re-execute plan and substitute placeholders separately

Are you suggesting adding a separate method that simply resolves placeholders? And we will call this method every time before executing the plan?

Plan should contain nodes which manipulate with data and are important for user to debug query performance.

Yes, final plan contains only meaningful nodes. But our TransformPlanExec can be an ancillary node as OutputRequirementExec. We can prohibit calling execute on TransformPlanExec. I'm just offering alternative ideas, I'm not against a separate method that resolves placeholders and simplifies the code.

@askalt
Copy link
Author

askalt commented Feb 12, 2026

Let's apply all transformations required to re-execute plan and substitute placeholders separately

Are you suggesting adding a separate method that simply resolves placeholders? And we will call this method every time before executing the plan?

Plan should contain nodes which manipulate with data and are important for user to debug query performance.

Yes, final plan contains only meaningful nodes. But our TransformPlanExec can be an ancillary node as OutputRequirementExec. We can prohibit calling execute on TransformPlanExec. I'm just offering alternative ideas, I'm not against a separate method that resolves placeholders and simplifies the code.

Currently, I have the following arguments against the plan node:

(1) with_new_children semantic: this node is created for the fixed input plan and it is a fragile invariant that tree form must be preserved to not break indexes. I see that it is created on post-phase when tree-form typically is not modified, but I would to avoid such requirements, if we can.
(2) This node is always kept as a plan-tree root. So it is a miss-use to have it as a child of some node (it probably would work, but it is not the pattern how we want to use it). This causes a question why we even allow to keep as a child, implementing ExecutionPlan for it.
(3) It pollutes the plan.
(4) We anyway should reset its metrics when we re-execute the plan.

Yes, final plan contains only meaningful nodes. But our TransformPlanExec can be an ancillary node as OutputRequirementExec. We can prohibit calling execute on TransformPlanExec. I'm just offering alternative ideas, I'm not against a separate method that resolves placeholders and simplifies the code.

Yes, but due to (2) I suggest to implement a wrapper that will not an ExecutionPlan itself. Every user could implement custom wrapper based on the physical_expressions(...) API and actions required to do before the execution. So IMO generalization here would be over-kill (I reconsidered my opinion).

In DF itself we could leave basic wrapper that will allow to re-execute plan with placeholders, e.g. (will push after run some benchmarks):

/// Wraps an [`ExecutionPlan`] that may contain placeholders, making
/// it re-executable, avoiding re-planning stage.
///
/// # Limitations
///
/// Plan does not support re-execution if it (OR):
///
/// * uses dynamic filters,
/// * represents a recursive query.
///
/// This invariant is not enforced by [`ReusableExecutionPlan`] itself, so it
/// must be checked by user.
///
pub struct ReusableExecutionPlan {
    binder: Binder,
    bound_plan: Option<Arc<dyn ExecutionPlan>>,
}

impl ReusableExecutionPlan {
    /// Make a new [`ReusableExecutionPlan`] bound to the passed `plan`.
    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
        let binder = Binder::new(plan);
        Self {
            binder,
            bound_plan: None,
        }
    }

    /// Return a ready to execution instance of [`ReusableExecutionPlan`] where
    /// placeholders are bound to the passed `params`.
    pub fn bind(&self, params: Option<&ParamValues>) -> Result<Self> {
        let bound_plan = self.binder.bind(params).map(Some)?;
        Ok(Self {
            binder: self.binder.clone(),
            bound_plan,
        })
    }

    /// Return an inner plan to execute.
    ///
    /// If this plan is a result of [`Self::bind`] call, then bound plan is returned.
    /// Otherwise, an initial plan is returned.
    pub fn plan(&self) -> Arc<dyn ExecutionPlan> {
        self.bound_plan
            .clone()
            .unwrap_or_else(|| Arc::clone(&self.binder.plan))
    }
}

impl From<Arc<dyn ExecutionPlan>> for ReusableExecutionPlan {
    fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
        Self::new(plan)
    }
}

#[derive(Debug, Clone)]
struct NodeWithPlaceholders {
    /// The index of the node in the tree traversal.
    idx: usize,
    /// Positions of the placeholders among plan physical expressions.
    placeholder_idx: Vec<usize>,
}

impl NodeWithPlaceholders {
    /// Returns [`Some`] if passed `node` contains placeholders and must
    /// be resolved on binding stage.
    fn new(node: &Arc<dyn ExecutionPlan>, idx: usize) -> Option<Self> {
        let placeholder_idx = if let Some(iter) = node.physical_expressions() {
            iter.enumerate()
                .filter_map(|(i, expr)| {
                    if has_placeholders(&expr) {
                        Some(i)
                    } else {
                        None
                    }
                })
                .collect()
        } else {
            vec![]
        };

        if placeholder_idx.is_empty() {
            None
        } else {
            Some(Self {
                idx,
                placeholder_idx,
            })
        }
    }

    fn resolve(
        &self,
        node: &Arc<dyn ExecutionPlan>,
        params: Option<&ParamValues>,
    ) -> Result<Arc<dyn ExecutionPlan>> {
        let Some(expr) = node.physical_expressions() else {
            return exec_err!("node {} does not support expressions export", node.name());
        };
        let mut exprs: Vec<Arc<dyn PhysicalExpr>> = expr.collect();
        for idx in self.placeholder_idx.iter() {
            exprs[*idx] = resolve_expr_placeholders(Arc::clone(&exprs[*idx]), params)?;
        }
        let Some(resolved_node) =
            node.with_physical_expressions(ReplacePhysicalExpr { exprs })?
        else {
            return exec_err!(
                "node {} does not support expressions replace",
                node.name()
            );
        };
        Ok(resolved_node)
    }
}

/// Helper to bound placeholders and reset plan nodes state.
#[derive(Clone)]
struct Binder {
    /// Created during [`Binder`] construction.
    /// This way we avoid runtime rebuild for expressions without placeholders.
    nodes_to_resolve: Arc<[NodeWithPlaceholders]>,
    plan: Arc<dyn ExecutionPlan>,
}

impl Binder {
    fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
        let mut nodes_to_resolve = vec![];
        let mut cursor = 0;

        plan.apply(|node| {
            let idx = cursor;
            cursor += 1;
            if let Some(node) = NodeWithPlaceholders::new(node, idx) {
                nodes_to_resolve.push(node);
            }
            Ok(TreeNodeRecursion::Continue)
        })
        .unwrap();

        Self {
            plan,
            nodes_to_resolve: nodes_to_resolve.into(),
        }
    }

    fn bind(&self, params: Option<&ParamValues>) -> Result<Arc<dyn ExecutionPlan>> {
        let mut cursor = 0;
        let mut resolve_node_idx = 0;
        Arc::clone(&self.plan)
            .transform_down(|node| {
                let idx = cursor;
                cursor += 1;
                if resolve_node_idx < self.nodes_to_resolve.len()
                    && self.nodes_to_resolve[resolve_node_idx].idx == idx
                {
                    // Note: `resolve` replases plan expressions, which also resets a plan state.
                    let resolved_node =
                        self.nodes_to_resolve[resolve_node_idx].resolve(&node, params)?;
                    resolve_node_idx += 1;
                    Ok(Transformed::yes(resolved_node))
                } else {
                    // Reset state.
                    Ok(Transformed::yes(node.reset_state()?))
                }
            })
            .map(|tnr| tnr.data)
    }
}

@askalt askalt force-pushed the askalt/generalize-execution-rules-applier branch from d213820 to d5e97b8 Compare March 11, 2026 09:18
@github-actions github-actions bot added documentation Improvements or additions to documentation physical-expr common execution labels Mar 11, 2026
@askalt
Copy link
Author

askalt commented Mar 11, 2026

Benchmark results:

reset_query0/reset      time:   [1.3652 µs 1.3698 µs 1.3750 µs]
bind_query0/0_placeholders
                        time:   [1.4154 µs 1.4178 µs 1.4201 µs]
bind_query0/12_placeholders
                        time:   [596.58 µs 597.53 µs 598.71 µs]
bind_query0/120_placeholders
                        time:   [732.47 µs 733.47 µs 734.66 µs]
bind_query0/600_placeholders
                        time:   [1.4174 ms 1.4196 ms 1.4221 ms]
bind_query0/1200_placeholders
                        time:   [2.2405 ms 2.2429 ms 2.2454 ms]

reset_query1/reset      time:   [1.6086 µs 1.6099 µs 1.6113 µs]
bind_query1/0_placeholders
                        time:   [1.6316 µs 1.6336 µs 1.6357 µs]
bind_query1/6_placeholders
                        time:   [606.34 µs 607.70 µs 609.10 µs]
bind_query1/60_placeholders
                        time:   [623.67 µs 624.37 µs 625.03 µs]
bind_query1/300_placeholders
                        time:   [744.95 µs 745.98 µs 747.02 µs]
bind_query1/600_placeholders
                        time:   [901.97 µs 903.38 µs 904.99 µs]

reset_query2/reset      time:   [468.98 ns 469.51 ns 470.09 ns]
bind_query2/0_placeholders
                        time:   [483.03 ns 484.23 ns 485.33 ns]
bind_query2/6_placeholders
                        time:   [627.45 µs 628.46 µs 629.42 µs]
bind_query2/60_placeholders
                        time:   [643.85 µs 644.64 µs 645.48 µs]
bind_query2/300_placeholders
                        time:   [758.36 µs 759.09 µs 759.87 µs]
bind_query2/600_placeholders
                        time:   [885.94 µs 886.74 µs 887.54 µs]

What we clearly can observe:

  1. Resolving does not affect when there are no placeholders.
  2. When there is some vector in the plan that contains many expressions (e.g., aggregate expressions), then even the single placeholder will lead to the whole vector clone what causes a performance drop for all queries when the number of placeholders becomes greater than 0.

It looks acceptable at the moment.

@askalt
Copy link
Author

askalt commented Mar 11, 2026

@LLDay Please check the last commit and then I will recreate bump candidate branch from the current DF master (it will fix security audit CI job as some dependencies were already bumped in DF).

This patch adds an execution plan wrapper that allows to bind
parameters and reuse plans with placeholders.
@askalt askalt force-pushed the askalt/generalize-execution-rules-applier branch from d5e97b8 to 3aa6724 Compare March 11, 2026 09:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants