feat: add ExtractLeafExpressions optimizer rule for get_field pushdown#20117
feat: add ExtractLeafExpressions optimizer rule for get_field pushdown#20117adriangb wants to merge 22 commits intoapache:mainfrom
Conversation
This PR adds a new optimizer rule `ExtractLeafExpressions` that extracts `MoveTowardsLeafNodes` sub-expressions (like `get_field`) from Filter, Sort, Limit, Aggregate, and Projection nodes into intermediate projections. This normalization allows `OptimizeProjections` (which runs next) to merge consecutive projections and push `get_field` expressions down to the scan, enabling Parquet column pruning for struct fields. Example transformation for projections: ```sql SELECT id, s['label'] FROM t WHERE s['value'] > 150 ``` Before: `get_field(s, 'label')` stayed in ProjectionExec, reading full struct After: Both `get_field` expressions pushed to DataSourceExec The rule: - Extracts `MoveTowardsLeafNodes` expressions into `__leaf_N` aliases - Creates inner projections with extracted expressions + pass-through columns - Creates outer projections to restore original schema names - Handles deduplication of identical expressions - Skips expressions already aliased with `__leaf_*` to ensure idempotency Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces the ExtractLeafExpressions optimizer rule to enable better Parquet column pruning by extracting get_field expressions into intermediate projections. The rule normalizes query plans so that field accessor expressions can be pushed down to DataSource nodes, allowing only required struct fields to be read from Parquet files.
Changes:
- New
ExtractLeafExpressionsoptimizer rule that extractsMoveTowardsLeafNodesexpressions (likeget_field) from Filter, Sort, Limit, Aggregate, and Projection nodes - Modified
PushDownFilterto avoid pushing filters through__leaf_*extraction projections - Updated test expectations across multiple SQL logic test files to reflect new query plans with extracted field expressions
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/optimizer/src/extract_leaf_expressions.rs | New optimizer rule implementation with bottom-up traversal to extract and push down leaf expressions |
| datafusion/optimizer/src/optimizer.rs | Registers ExtractLeafExpressions to run before OptimizeProjections |
| datafusion/optimizer/src/lib.rs | Exports the new extract_leaf_expressions module |
| datafusion/optimizer/src/push_down_filter.rs | Adds logic to prevent filter pushdown through __leaf_* extraction projections |
| datafusion/optimizer/src/test/mod.rs | Adds test helper functions for tables with struct fields |
| datafusion/sqllogictest/test_files/projection_pushdown.slt | Updates expected query plans showing __leaf_* aliases and extraction projections |
| datafusion/sqllogictest/test_files/struct.slt | Updates expected projection output to include AS clause for field access |
| datafusion/sqllogictest/test_files/projection.slt | Updates expected logical plan to include AS clause for field access |
| datafusion/sqllogictest/test_files/push_down_filter.slt | Updates expected physical plan showing extraction projection before FilterExec |
| datafusion/sqllogictest/test_files/explain.slt | Adds new optimizer stage output line for extract_leaf_expressions |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Implement `extract_from_join` to extract `MoveTowardsLeafNodes` sub-expressions (like get_field) from Join nodes: - Extract from `on` expressions (equijoin keys) - Extract from `filter` expressions (non-equi conditions) - Route extractions to appropriate side (left/right) based on columns - Add recovery projection to restore original schema Also adds unit tests and sqllogictest integration tests for: - Join with get_field in equijoin condition - Join with get_field in filter (WHERE clause) - Join with extractions from both sides - Left join with get_field extraction - Baseline join without extraction Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| // Everything else passes through unchanged | ||
| _ => Ok(Transformed::no(plan)), |
There was a problem hiding this comment.
I'm not sure what else we could handle here. Maybe Extension?
Before we merge this PR we expand this to explicitly ignore all other nodes so that if a new node is added one has to decide how this rule should handle it. I'll wait to do that since that's another +30 LOC diff.
There was a problem hiding this comment.
If we used the map_expressions API, as suggested above, we would get support for Extension nodes "for free"
| ---- | ||
| logical_plan | ||
| 01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) + Int64(1) | ||
| 01)Projection: simple_struct.id, get_field(simple_struct.s, Utf8("value")) + Int64(1) AS simple_struct.s[value] + Int64(1) |
There was a problem hiding this comment.
Note that this is not a change in the output schema name: it is already simple_struct.s[value].
|
|
||
| ##################### | ||
| # Section 12: Cleanup | ||
| # Section 12: Join Tests - get_field Extraction from Join Nodes |
There was a problem hiding this comment.
I can break these out into another PR to reduce the diff if that's helpful.
When `find_extraction_target` returns a Projection that renames columns
(e.g. `user AS x`), both `build_extraction_projection` and
`merge_into_extracted_projection` were adding extracted expressions that
reference the target's output columns (e.g. `col("x")`) to a projection
evaluated against the target's input (which only has `user`).
Fix by resolving extracted expressions and columns_needed through the
projection's rename mapping using `replace_cols_by_name` before merging.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| 04)------ProjectionExec: expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as __datafusion_extracted_1, column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as __unnest_placeholder(d.column2,depth=1)] | ||
| 05)--------UnnestExec |
There was a problem hiding this comment.
I think it would be quite complex to try to push the get_filter through the unnest, and ultimately I don't think Parquet would be able to optimize the scan (maybe I'm wrong about this?) so there would be little point.
There was a problem hiding this comment.
Since it wasn't pushed through before I think it is fine that it (still) isn't pushed through
| // Don't push filters through extracted expression projections. | ||
| // Pushing filters through would rewrite expressions like `__datafusion_extracted_1 > 150` | ||
| // back to `get_field(s,'value') > 150`, undoing the extraction. | ||
| if is_extracted_expr_projection(&projection) { |
There was a problem hiding this comment.
This is obviously not great, but I don't see another way to avoid this. Otherwise if we have:
Filter: get_field(col, 'foo') > 1
TableScan: projection=[col]
And we run our new rule to get:
Projection: col('col')
Filter: __datafusion_extracted_1 > 1
Projection: get_field(col, 'foo') as __datafusion_extracted_1, col
TableScan: projection=[col]
Then this rule runs and will produce:
Projection: col('col')
Projection: get_field(col, 'foo') as __datafusion_extracted_1, col
Filter: get_field(col, 'foo') > 1
TableScan projection=[col]
Because it wants to push the filter under the projection.
I'd argue as a general rule there's no point in pushing a filter under a projection that is purely column selections / get_field expressions especially if we can't then push it further.
Maybe a more robust fix would be to have the filter pushdown optimizer rule traverse the rest of the plan, find the position it plans to push into and then check if there's any advantage to doing some (i.e. is it pushing the filter under an expensive operator that benefits from less input data, or is it just doing a trivial pointless pushdown like in the case above). But that would be a lot more involved so I chose this simpler solution for now.
There was a problem hiding this comment.
I'd argue as a general rule there's no point in pushing a filter under a projection that is purely column selections / get_field expressions especially if we can't then push it further.
Yes I agree with this statement.
I don't really have a better suggestion other than to perhaps make the exception more general "don't push filters under projections that doesn't do computation / etc"
There was a problem hiding this comment.
I think the comment would be better / easier to understand the need for the special case if you included the great example from your comment
Projection: col('col') Filter: __datafusion_extracted_1 > 1 Projection: get_field(col, 'foo') as __datafusion_extracted_1, col TableScan: projection=[col]Then this rule runs and will produce:
Projection: col('col') Projection: get_field(col, 'foo') as __datafusion_extracted_1, col Filter: get_field(col, 'foo') > 1 TableScan projection=[col]
There was a problem hiding this comment.
Won't projection pushdown push the projection later inside Filter again? What does a final plan look like?
|
run benchmark sql_planner |
|
🤖 |
alamb
left a comment
There was a problem hiding this comment.
Thank you @adriangb -- this is very exciting to see so close.
After this PR, what else is left to close out these issues?
Major points:
- I think we can avoid a lot of boilerplate code and make this code easier to maintain by using the map_expressions API: https://github.com/apache/datafusion/pull/20117/changes#r2760584800
- Can we avoid coupling the passes? (below)
One concern, which you have also touched on, is the coupling of ExtractLeafExpressions and OptimizeProjections, in the sense that those passes now have implicit dependencies on this new pass
Did you consider incorporating this logic directly into the OptimizeProjections? It seems like this transformation is really just a mechanism to enable OptimizeProjections 🤔
Cc @AdamGS as you said you are interested in this for Vortex as well
| /// | ||
| /// This is used by optimizers to make decisions about expression placement, | ||
| /// such as whether to push expressions down through projections. | ||
| pub fn placement(&self) -> ExpressionPlacement { |
There was a problem hiding this comment.
I really like the name ExpressionPlacement 👍
| 04)------ProjectionExec: expr=[get_field(__unnest_placeholder(d.column2,depth=1)@1, a) as __datafusion_extracted_1, column1@0 as column1, __unnest_placeholder(d.column2,depth=1)@1 as __unnest_placeholder(d.column2,depth=1)] | ||
| 05)--------UnnestExec |
There was a problem hiding this comment.
Since it wasn't pushed through before I think it is fine that it (still) isn't pushed through
| 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[id, s], file_type=parquet, predicate=id@0 > 2, pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 2, required_guarantees=[] | ||
| 01)ProjectionExec: expr=[id@1 as id, __datafusion_extracted_1@0 as simple_struct.s[value]] | ||
| 02)--FilterExec: id@1 > 2 | ||
| 03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[get_field(s@1, value) as __datafusion_extracted_1, id], file_type=parquet, predicate=id@0 > 2, pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 2, required_guarantees=[] |
There was a problem hiding this comment.
Here is the optimizer pass in action -- the get_field was pushed down -- the plan looks good to me
| /// Extracts `MoveTowardsLeafNodes` sub-expressions from all nodes into projections. | ||
| /// | ||
| /// This normalizes the plan so that all `MoveTowardsLeafNodes` computations (like field | ||
| /// accessors) live in Projection nodes, making them eligible for pushdown. |
There was a problem hiding this comment.
What does "live in projection nodes" mean here? Like that all MoveTowardsLeafNodes computations appear as top level Exprs in a ProjectionExec?
| initial_logical_plan | ||
| 01)Projection: simple_explain_test.a, simple_explain_test.b, simple_explain_test.c | ||
| 02)--TableScan: simple_explain_test | ||
| logical_plan after resolve_grouping_function SAME TEXT AS ABOVE |
There was a problem hiding this comment.
😓 that is a lot of rewrites (not related to this PR, I am just thinking about planning speed in general)
| //! - `Limit` - passes all input columns through | ||
| //! | ||
| //! **Projection Nodes** (merge through): | ||
| //! - Replace column refs with underlying expressions from the child projection |
There was a problem hiding this comment.
Is there a reason to split up the comments into module on the struct?
It might make sense to leave the module level comments relatively minimal and move #Algorithm and everything else down to the doc comment on ExtractLeafExpressions so the algorithm and examples are close together
| /// The `OptimizeProjections` rule can then push this projection down to the scan. | ||
| /// | ||
| /// **Important:** The `PushDownFilter` rule is aware of projections created by this rule | ||
| /// and will not push filters through them. See `is_extracted_expr_projection` in utils.rs. |
There was a problem hiding this comment.
would be nice to make this a link too so it is checked automatically by rustdoc rather than can get out of sync
| /// and will not push filters through them. See `is_extracted_expr_projection` in utils.rs. | |
| /// and will not push filters through them. See [`is_extracted_expr_projection`] |
| match &plan { | ||
| // Schema-preserving nodes - extract and push down | ||
| LogicalPlan::Filter(_) | LogicalPlan::Sort(_) | LogicalPlan::Limit(_) => { | ||
| extract_from_schema_preserving(plan, alias_generator) |
There was a problem hiding this comment.
I don't understand why there needs to be specialized code for different LogicalPlan types -- this seems like it is exactly the use case LogicalPlan::map_expressions() is designed to handle.
Couldn't you use map_expressions to rewrite any get_field expressions, and then add the relevant projection below it?
| // Everything else passes through unchanged | ||
| _ => Ok(Transformed::no(plan)), |
There was a problem hiding this comment.
If we used the map_expressions API, as suggested above, we would get support for Extension nodes "for free"
| let rebuilt_input = extractor.build_extraction_projection(&target, path)?; | ||
|
|
||
| // Create the node with new input | ||
| let new_inputs: Vec<LogicalPlan> = std::iter::once(rebuilt_input) |
There was a problem hiding this comment.
the code above seems to assume there is a single input -- so it seems strange to have code here for multiple inputs 🤔
|
BTW codex found a test that shows a single projection being extracted doesn't get pushed down I can make this a separate PR if you like note how the get_field is not pushed into the datasource: ###
# Test 2.1b: Projection-only get_field (potential optimization target)
###
query TT
EXPLAIN SELECT s['label'] FROM simple_struct;
----
logical_plan
01)Projection: get_field(simple_struct.s, Utf8("label"))
02)--TableScan: simple_struct projection=[s]
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]}, projection=[get_field(s@1, label) as simple_struct.s[label]], file_type=parquet
# Verify correctness
query T
SELECT s['label'] FROM simple_struct ORDER BY s['label'];
----
alpha
beta
delta
epsilon
gamma |
|
🤖: Benchmark completed Details
|
@alamb are there any of these that use structs? It seems like this has no impact on the benchmarks (good!) but maybe we should add some that hit the full rewrite? |
Thanks for reporting. I’ll make a new PR with this test + join tests + benchmarks. |
|
I was more trying to quantify the effect on planning time of adding a new optimizer pass -- it seems like it is small but noticable slowdown (1-3%). I'll see if I can reproduce those results |
|
run benchmark sql_planner |
It makes sense that there’s a small slowdown, it has to visit every node in the plan even if it doesn’t modify it at all. That said a lot of the numbers were within the variation ie not statistically different. |
Yeah, there is a tension here for sure I do think in general it would be a good idea to consolidate optimizer passes given each basically deep copies the plan |
|
🤖 |
|
🤖: Benchmark completed Details
|
Done in #20143 |
Summary
This PR adds a new optimizer rule
ExtractLeafExpressionsthat extractsMoveTowardsLeafNodessub-expressions (likeget_field) from Filter, Sort, Limit, Aggregate, and Projection nodes into intermediate projections.This normalization allows
OptimizeProjections(which runs next) to merge consecutive projections and pushget_fieldexpressions down to the scan, enabling Parquet column pruning for struct fields.Example
Before:
get_field(s, 'label')stayed in ProjectionExec, reading full structsAfter: Both
get_fieldexpressions pushed to DataSourceExec:How It Works
The rule:
MoveTowardsLeafNodesexpressions into__datafusion_extracted_Naliases__datafusion_extracted_*to ensure idempotencyThis is partially modeled after:
CommonSubexprEliminatewhich also creates expressions with aliases and extracts them into "2 phase" projectionsPushDownFilterwhich handles pushing expressions past joins, aggregates, etc.OptimizeProjectionswhich also manipulates projectionsInteraction with other optimizer rules
This rule has some interaction with
PushDownFilter. I had to teachPushDownFilterto not push filters past the pushed down projections, otherwise it would undo the work this optimizer rule did. There is no point in pushing filters past these expressions as they are so cheap to compute it's better to evaluate them before filters.Test plan
extract_leaf_expressions.rsprojection_pushdown.sltcargo test -p datafusion-optimizer)🤖 Generated with Claude Code