diff --git a/datafusion/physical-expr/src/equivalence/properties/dependency.rs b/datafusion/physical-expr/src/equivalence/properties/dependency.rs index 4304ab7fca613..2ebc71559fcf4 100644 --- a/datafusion/physical-expr/src/equivalence/properties/dependency.rs +++ b/datafusion/physical-expr/src/equivalence/properties/dependency.rs @@ -389,7 +389,7 @@ mod tests { convert_to_sort_reqs, create_test_params, create_test_schema, parse_sort_expr, }; use crate::equivalence::{ProjectionMapping, convert_to_sort_exprs}; - use crate::expressions::{BinaryExpr, CastColumnExpr, CastExpr, Column, col}; + use crate::expressions::{BinaryExpr, CastExpr, Column, col}; use crate::projection::tests::output_schema; use crate::{ConstExpr, EquivalenceProperties, ScalarFunctionExpr}; @@ -931,33 +931,35 @@ mod tests { struct TestCase { name: &'static str, constants: Vec>, - equal_conditions: Vec<[Arc; 2]>, - sort_columns: &'static [&'static str], + equal_condition: [Arc; 2], should_satisfy_ordering: bool, } let col_a = col("a", schema.as_ref())?; let col_b = col("b", schema.as_ref())?; let col_c = col("c", schema.as_ref())?; - let cast_c = Arc::new(CastExpr::new(col_c, DataType::Date32, None)) as _; + let cast_c = Arc::new(CastExpr::new_with_target_field( + col_c, + Arc::new(Field::new("c", DataType::Date32, true)), + None, + )) as _; + let required_sort = vec![PhysicalSortExpr::new_default(col("c", &schema)?)]; let cases = vec![ TestCase { - name: "(a, b, c) -> (c)", + name: "cast_c = a", // b is constant, so it should be removed from the sort order constants: vec![Arc::clone(&col_b)], - equal_conditions: vec![[Arc::clone(&cast_c), Arc::clone(&col_a)]], - sort_columns: &["c"], + equal_condition: [Arc::clone(&cast_c), Arc::clone(&col_a)], should_satisfy_ordering: true, }, // Same test with above test, where equality order is swapped. // Algorithm shouldn't depend on this order. TestCase { - name: "(a, b, c) -> (c)", + name: "a = cast_c", // b is constant, so it should be removed from the sort order constants: vec![col_b], - equal_conditions: vec![[Arc::clone(&col_a), Arc::clone(&cast_c)]], - sort_columns: &["c"], + equal_condition: [Arc::clone(&col_a), Arc::clone(&cast_c)], should_satisfy_ordering: true, }, TestCase { @@ -965,8 +967,7 @@ mod tests { // b is not constant anymore constants: vec![], // a and c are still compatible, but this is irrelevant since the original ordering is (a, b, c) - equal_conditions: vec![[Arc::clone(&cast_c), Arc::clone(&col_a)]], - sort_columns: &["c"], + equal_condition: [Arc::clone(&cast_c), Arc::clone(&col_a)], should_satisfy_ordering: false, }, ]; @@ -979,9 +980,8 @@ mod tests { // Equal conditions before constants { let mut properties = base_properties.clone(); - for [left, right] in case.equal_conditions.clone() { - properties.add_equal_conditions(left, right)? - } + let [left, right] = case.equal_condition.clone(); + properties.add_equal_conditions(left, right)?; properties.add_constants( case.constants.iter().cloned().map(ConstExpr::from), )?; @@ -993,20 +993,13 @@ mod tests { properties.add_constants( case.constants.iter().cloned().map(ConstExpr::from), )?; - for [left, right] in case.equal_conditions { - properties.add_equal_conditions(left, right)? - } + let [left, right] = case.equal_condition; + properties.add_equal_conditions(left, right)?; properties }, ] { - let sort = case - .sort_columns - .iter() - .map(|&name| col(name, &schema).map(PhysicalSortExpr::new_default)) - .collect::>>()?; - assert_eq!( - properties.ordering_satisfy(sort)?, + properties.ordering_satisfy(required_sort.clone())?, case.should_satisfy_ordering, "failed test '{}'", case.name @@ -1017,44 +1010,6 @@ mod tests { Ok(()) } - #[test] - fn test_eliminate_redundant_monotonic_sorts_cast_column_expr() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Date32, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None), true), - ])); - let mut properties = EquivalenceProperties::new(Arc::clone(&schema)); - properties.reorder( - ["a", "b", "c"] - .into_iter() - .map(|c| PhysicalSortExpr::new_default(col(c, schema.as_ref()).unwrap())), - )?; - - let col_a = col("a", schema.as_ref())?; - let col_b = col("b", schema.as_ref())?; - let col_c = col("c", schema.as_ref())?; - - let cast_c = Arc::new(CastColumnExpr::new( - Arc::clone(&col_c), - Arc::new(Field::new( - "c", - DataType::Timestamp(TimeUnit::Nanosecond, None), - true, - )), - Arc::new(Field::new("c", DataType::Date32, true)), - None, - )) as Arc; - - properties.add_equal_conditions(cast_c, Arc::clone(&col_a))?; - properties.add_constants(std::iter::once(ConstExpr::from(col_b)))?; - - let required = vec![PhysicalSortExpr::new_default(col("c", &schema)?)]; - assert!(properties.ordering_satisfy(required)?); - - Ok(()) - } - #[test] fn test_ordering_equivalence_with_lex_monotonic_concat() -> Result<()> { let schema = Arc::new(Schema::new(vec![ diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 1ca4ead0335de..214e3ca6b25c2 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -33,7 +33,7 @@ use self::dependency::{ use crate::equivalence::{ AcrossPartitions, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, }; -use crate::expressions::{CastColumnExpr, CastExpr, Column, Literal, with_new_schema}; +use crate::expressions::{CastExpr, Column, Literal, with_new_schema}; use crate::{ ConstExpr, LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, @@ -196,35 +196,23 @@ impl OrderingEquivalenceCache { impl EquivalenceProperties { /// Helper used by the ordering equivalence rule when considering whether a - /// cast-bearing expression can replace an existing sort key without invalidating - /// the ordering. + /// cast-bearing expression can replace an existing sort key without + /// invalidating the ordering. /// - /// This function handles *both* `CastExpr` (generic cast) and - /// `CastColumnExpr` (field-aware cast) because the planner may introduce either - /// form during rewrite steps; the core logic is the same in both cases. The - /// substitution is only allowed when the cast wraps **the very same child - /// expression** that the original sort used (an exact-child-match invariant), - /// and the casted type must be a widening/order-preserving conversion - /// `CastExpr::check_bigger_cast(...)` ensures. Without those restrictions the - /// existing sort order could be violated (e.g. a narrowing cast could collapse - /// distinct values together). - fn substitute_cast_like_ordering( + /// The substitution is only allowed when the cast wraps the very same child + /// expression that the original sort used and the casted type is a + /// widening/order-preserving conversion. Without those restrictions, a + /// narrowing cast could collapse distinct values and violate the existing + /// sort order. + fn substitute_cast_ordering( r_expr: Arc, sort_expr: &PhysicalSortExpr, expr_type: &DataType, ) -> Option { - let (child_expr, cast_type) = if let Some(cast_expr) = - r_expr.as_any().downcast_ref::() - { - (cast_expr.expr(), cast_expr.cast_type()) - } else if let Some(cast_expr) = r_expr.as_any().downcast_ref::() { - (cast_expr.expr(), cast_expr.target_field().data_type()) - } else { - return None; - }; + let cast_expr = r_expr.as_any().downcast_ref::()?; - (child_expr.eq(&sort_expr.expr) - && CastExpr::check_bigger_cast(cast_type, expr_type)) + (cast_expr.expr().eq(&sort_expr.expr) + && CastExpr::check_bigger_cast(cast_expr.cast_type(), expr_type)) .then(|| PhysicalSortExpr::new(r_expr, sort_expr.options)) } @@ -866,25 +854,25 @@ impl EquivalenceProperties { order .into_iter() .map(|sort_expr| { - let referring_exprs = mapping - .iter() - .map(|(source, _target)| source) - .filter(|source| expr_refers(source, &sort_expr.expr)) - .cloned(); - let mut result = vec![]; // The sort expression comes from this schema, so the // following call to `unwrap` is safe. let expr_type = sort_expr.expr.data_type(schema).unwrap(); + let original_sort_expr = sort_expr.clone(); // TODO: Add one-to-one analysis for ScalarFunctions. - for r_expr in referring_exprs { - if let Some(substituted) = Self::substitute_cast_like_ordering( - r_expr, &sort_expr, &expr_type, - ) { - result.push(substituted); - } - } - result.push(sort_expr); - result + mapping + .iter() + .map(|(source, _target)| source) + .filter(|source| expr_refers(source, &original_sort_expr.expr)) + .cloned() + .filter_map(|r_expr| { + Self::substitute_cast_ordering( + r_expr, + &original_sort_expr, + &expr_type, + ) + }) + .chain(std::iter::once(sort_expr)) + .collect::>() }) // Generate all valid orderings given substituted expressions: .multi_cartesian_product() diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 873e82a97303e..8ca8264fe3edb 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -42,7 +42,6 @@ use datafusion_common::{ tree_node::{Transformed, TreeNode}, }; use datafusion_expr_common::operator::Operator; -use datafusion_physical_expr::expressions::CastColumnExpr; use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee}; use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr}; use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt; @@ -1124,40 +1123,34 @@ fn rewrite_expr_to_prunable( Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr))) } else if let Some(cast) = column_expr_any.downcast_ref::() { // `cast(col) op lit()` - let arrow_schema = schema.as_arrow(); - let from_type = cast.expr().data_type(arrow_schema)?; - verify_support_type_for_prune(&from_type, cast.cast_type())?; - let (left, op, right) = - rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?; - let left = Arc::new(phys_expr::CastExpr::new( + let (left, op, right) = rewrite_cast_child_to_prunable( + cast.expr(), + cast.cast_type(), + op, + scalar_expr, + schema, + )?; + let left = Arc::new(phys_expr::CastExpr::new_with_target_field( left, - cast.cast_type().clone(), + Arc::clone(cast.target_field()), None, )); - Ok((left, op, right)) - } else if let Some(cast_col) = column_expr_any.downcast_ref::() { - // `cast_column(col) op lit()` - same as CastExpr but uses CastColumnExpr - let arrow_schema = schema.as_arrow(); - let from_type = cast_col.expr().data_type(arrow_schema)?; - let to_type = cast_col.target_field().data_type(); - verify_support_type_for_prune(&from_type, to_type)?; - let (left, op, right) = - rewrite_expr_to_prunable(cast_col.expr(), op, scalar_expr, schema)?; - // Predicate pruning / statistics generally don't support struct columns yet. - // In the future we may want to support pruning on nested fields, in which case we probably need to - // do something more sophisticated here. - // But for now since we don't support pruning on nested fields, we can just cast to the target type directly. - let left = Arc::new(phys_expr::CastExpr::new(left, to_type.clone(), None)); + // PruningPredicate does not support pruning on nested fields yet. + // End-to-end nested-field pruning also requires Parquet statistics + // extraction to agree with PruningPredicate on a stats representation + // for nested field expressions. Ok((left, op, right)) } else if let Some(try_cast) = column_expr_any.downcast_ref::() { // `try_cast(col) op lit()` - let arrow_schema = schema.as_arrow(); - let from_type = try_cast.expr().data_type(arrow_schema)?; - verify_support_type_for_prune(&from_type, try_cast.cast_type())?; - let (left, op, right) = - rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?; + let (left, op, right) = rewrite_cast_child_to_prunable( + try_cast.expr(), + try_cast.cast_type(), + op, + scalar_expr, + schema, + )?; let left = Arc::new(phys_expr::TryCastExpr::new( left, try_cast.cast_type().clone(), @@ -1191,6 +1184,20 @@ fn rewrite_expr_to_prunable( } } +fn rewrite_cast_child_to_prunable( + cast_child_expr: &PhysicalExprRef, + cast_type: &DataType, + op: Operator, + scalar_expr: &PhysicalExprRef, + schema: DFSchema, +) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> { + verify_support_type_for_prune( + &cast_child_expr.data_type(schema.as_arrow())?, + cast_type, + )?; + rewrite_expr_to_prunable(cast_child_expr, op, scalar_expr, schema) +} + fn is_compare_op(op: Operator) -> bool { matches!( op, @@ -4192,7 +4199,7 @@ mod tests { } #[test] - fn prune_cast_column_scalar() { + fn prune_cast_scalar() { // The data type of column i is INT32 let (schema, statistics) = int32_setup(); let expected_ret = &[true, true, false, true, true];