feat: make execution transfomation application rule generic#25
feat: make execution transfomation application rule generic#25askalt wants to merge 6 commits intoaskalt/bump-candidatefrom
Conversation
5c2f852 to
e807be9
Compare
e807be9 to
316fd62
Compare
c87dd45 to
d213820
Compare
|
@LLDay please, check the patch. |
datafusion/core/tests/physical_optimizer/physical_expr_resolver.rs
Outdated
Show resolved
Hide resolved
|
|
||
| /// Checks if the given [`ExecutionPlan`] node matches the criteria for this rule. | ||
| fn matches(&mut self, _node: &Arc<dyn ExecutionPlan>) -> Result<bool> { | ||
| fn matches(&self, _node: &Arc<dyn ExecutionPlan>) -> Result<bool> { |
There was a problem hiding this comment.
The idea was to allow mutations for possible optimization. For example, ResolvePlaceholdersRule may construct a mapping plan_index -> expr_indices that will skip expressions without placeholders. For some reason, I didn't implement it, but that was the idea originally. Now we have a shared reference, and all the rules are stateless. You can resolve this thread if you think this API is suitable for our purposes.
There was a problem hiding this comment.
Added this optimization to the current implementation.
Currently, it depends on the particular execution transformation that resolves placeholders. However, it would be more convenient to be able to specify the rule which is required to be applied to the plan.
|
Also noticed that when we re-execute a plan, we need to someway clear |
Considering it, I think, we should not perform re-use plan actions in the separate node (miss-design from my side, it seems we a bit overengineered here). Plan should contain nodes which manipulate with data and are important for user to debug query performance. I would simplify the code, removing introduced transformations. Let's apply all transformations required to re-execute plan and substitute placeholders separately (e.g., |
Move it out from the separate physical plan into a function that prepares plan for execution.
…ss costly) (apache#19893) <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#19852 Improve performance of query planning and plan state re-set by making node clone cheap. - Store projection as `Option<Arc<[usize]>>` instead of `Option<Vec<usize>>` in `FilterExec`, `HashJoinExec`, `NestedLoopJoinExec`. - Store exprs as `Arc<[ProjectionExpr]>` instead of Vec in `ProjectionExprs`. - Store arced aggregation, filter, group by expressions within `AggregateExec`.
This patch aims to implement a fast-path for the ExecutionPlan::with_new_children function for some plans, moving closer to a physical plan re-use implementation and improving planning performance. If the passed children properties are the same as in self, we do not actually recompute self's properties (which could be costly if projection mapping is required). Instead, we just replace the children and re-use self's properties as-is. To be able to compare two different properties -- ExecutionPlan::properties(...) signature is modified and now returns `&Arc<PlanProperties>`. If `children` properties are the same in `with_new_children` -- we clone our properties arc and then a parent plan will consider our properties as unchanged, doing the same. - Return `&Arc<PlanProperties>` from `ExecutionPlan::properties(...)` instead of a reference. - Implement `with_new_children` fast-path if there is no children properties changes for all major plans. Note: currently, `reset_plan_states` does not allow to re-use plan in general: it is not supported for dynamic filters and recursive queries features, as in this case state reset should update pointers in the children plans. Closes apache#19796
Are you suggesting adding a separate method that simply resolves placeholders? And we will call this method every time before executing the plan?
Yes, final plan contains only meaningful nodes. But our |
Currently, I have the following arguments against the plan node: (1)
Yes, but due to (2) I suggest to implement a wrapper that will not an In DF itself we could leave basic wrapper that will allow to re-execute plan with placeholders, e.g. (will push after run some benchmarks): /// Wraps an [`ExecutionPlan`] that may contain placeholders, making
/// it re-executable, avoiding re-planning stage.
///
/// # Limitations
///
/// Plan does not support re-execution if it (OR):
///
/// * uses dynamic filters,
/// * represents a recursive query.
///
/// This invariant is not enforced by [`ReusableExecutionPlan`] itself, so it
/// must be checked by user.
///
pub struct ReusableExecutionPlan {
binder: Binder,
bound_plan: Option<Arc<dyn ExecutionPlan>>,
}
impl ReusableExecutionPlan {
/// Make a new [`ReusableExecutionPlan`] bound to the passed `plan`.
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let binder = Binder::new(plan);
Self {
binder,
bound_plan: None,
}
}
/// Return a ready to execution instance of [`ReusableExecutionPlan`] where
/// placeholders are bound to the passed `params`.
pub fn bind(&self, params: Option<&ParamValues>) -> Result<Self> {
let bound_plan = self.binder.bind(params).map(Some)?;
Ok(Self {
binder: self.binder.clone(),
bound_plan,
})
}
/// Return an inner plan to execute.
///
/// If this plan is a result of [`Self::bind`] call, then bound plan is returned.
/// Otherwise, an initial plan is returned.
pub fn plan(&self) -> Arc<dyn ExecutionPlan> {
self.bound_plan
.clone()
.unwrap_or_else(|| Arc::clone(&self.binder.plan))
}
}
impl From<Arc<dyn ExecutionPlan>> for ReusableExecutionPlan {
fn from(plan: Arc<dyn ExecutionPlan>) -> Self {
Self::new(plan)
}
}
#[derive(Debug, Clone)]
struct NodeWithPlaceholders {
/// The index of the node in the tree traversal.
idx: usize,
/// Positions of the placeholders among plan physical expressions.
placeholder_idx: Vec<usize>,
}
impl NodeWithPlaceholders {
/// Returns [`Some`] if passed `node` contains placeholders and must
/// be resolved on binding stage.
fn new(node: &Arc<dyn ExecutionPlan>, idx: usize) -> Option<Self> {
let placeholder_idx = if let Some(iter) = node.physical_expressions() {
iter.enumerate()
.filter_map(|(i, expr)| {
if has_placeholders(&expr) {
Some(i)
} else {
None
}
})
.collect()
} else {
vec![]
};
if placeholder_idx.is_empty() {
None
} else {
Some(Self {
idx,
placeholder_idx,
})
}
}
fn resolve(
&self,
node: &Arc<dyn ExecutionPlan>,
params: Option<&ParamValues>,
) -> Result<Arc<dyn ExecutionPlan>> {
let Some(expr) = node.physical_expressions() else {
return exec_err!("node {} does not support expressions export", node.name());
};
let mut exprs: Vec<Arc<dyn PhysicalExpr>> = expr.collect();
for idx in self.placeholder_idx.iter() {
exprs[*idx] = resolve_expr_placeholders(Arc::clone(&exprs[*idx]), params)?;
}
let Some(resolved_node) =
node.with_physical_expressions(ReplacePhysicalExpr { exprs })?
else {
return exec_err!(
"node {} does not support expressions replace",
node.name()
);
};
Ok(resolved_node)
}
}
/// Helper to bound placeholders and reset plan nodes state.
#[derive(Clone)]
struct Binder {
/// Created during [`Binder`] construction.
/// This way we avoid runtime rebuild for expressions without placeholders.
nodes_to_resolve: Arc<[NodeWithPlaceholders]>,
plan: Arc<dyn ExecutionPlan>,
}
impl Binder {
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let mut nodes_to_resolve = vec![];
let mut cursor = 0;
plan.apply(|node| {
let idx = cursor;
cursor += 1;
if let Some(node) = NodeWithPlaceholders::new(node, idx) {
nodes_to_resolve.push(node);
}
Ok(TreeNodeRecursion::Continue)
})
.unwrap();
Self {
plan,
nodes_to_resolve: nodes_to_resolve.into(),
}
}
fn bind(&self, params: Option<&ParamValues>) -> Result<Arc<dyn ExecutionPlan>> {
let mut cursor = 0;
let mut resolve_node_idx = 0;
Arc::clone(&self.plan)
.transform_down(|node| {
let idx = cursor;
cursor += 1;
if resolve_node_idx < self.nodes_to_resolve.len()
&& self.nodes_to_resolve[resolve_node_idx].idx == idx
{
// Note: `resolve` replases plan expressions, which also resets a plan state.
let resolved_node =
self.nodes_to_resolve[resolve_node_idx].resolve(&node, params)?;
resolve_node_idx += 1;
Ok(Transformed::yes(resolved_node))
} else {
// Reset state.
Ok(Transformed::yes(node.reset_state()?))
}
})
.map(|tnr| tnr.data)
}
} |
d213820 to
d5e97b8
Compare
|
Benchmark results: What we clearly can observe:
It looks acceptable at the moment. |
|
@LLDay Please check the last commit and then I will recreate bump candidate branch from the current DF master (it will fix security audit CI job as some dependencies were already bumped in DF). |
This patch adds an execution plan wrapper that allows to bind parameters and reuse plans with placeholders.
d5e97b8 to
3aa6724
Compare
Currently, it depends on the particular execution transformation that resolves placeholders. However, it would be more convenient to be able to specify the rule which is required to be applied to the plan.