feat: add GroupsAccumulator for variance, stddev, covariance, correlation#4254
feat: add GroupsAccumulator for variance, stddev, covariance, correlation#4254andygrove wants to merge 16 commits intoapache:mainfrom
Conversation
beforeafter |
Should we try to wire DF aggregation instead of Comet implementation? or there are any Spark inconsistencies? |
They are not compatible the last time I checked - signed vs unsigned types, IIRC |
would be good to upstream these to datafusion-spark and then see if they could share common code with the core impl |
|
Thanks for picking this up, @andygrove! The speedups are really nice to see and the refactor into Correlation update_batch maskIn Correlation evaluate doing three finalizes
finalize duplication between variance and covariance
Stddev evaluate allocationIn Covariance argument count
Redundant null_on_divide_by_zero on Correlation
DataFusion accumulate helpersHave you looked at Stddev test coverageVariance has CometBenchmarkBase shuffle managerThe Overall very nice work, the unit tests do a good job of pinning down the Spark-specific edges and the state ordering comments on correlation are helpful for future readers. |
mbutrovich
left a comment
There was a problem hiding this comment.
I guess my comments above count as changes requested. Thanks @andygrove!
Per review on apache#4254: the aggregate benchmarks in this PR run with local[1] and don't cross a shuffle boundary, so wiring CometShuffleManager into CometBenchmarkBase does not affect their numbers and applies to every benchmark that extends the trait. Reverting keeps the benchmark harness identical to the baseline that produced the 'before' numbers. This reverts commit 7c0fd61.
VarianceGroupsAccumulator::finalize and CovarianceGroupsAccumulator::finalize were structurally identical apart from the m2/algo_const numerator. Move the count==0 / Sample-with-count==1 / divisor body into welford::finalize_moments so both grouped accumulators (and any future moment-style accumulator) share one Spark sample-divide-by-zero rule.
- update_batch: when both input columns are fully non-null, skip building the is_not_null + and combined mask and forward the caller's filter unchanged to the three child accumulators. Mirrors the per-row CorrelationAccumulator short-circuit so dense inputs don't pay an allocation per batch. - evaluate: drain children's raw state once and compute correlation inline rather than calling each child's evaluate() (which would allocate three Float64Arrays we'd unpack and discard) and snapshot covar.counts up front. Drops the pub(crate) counts() accessor on CovarianceGroupsAccumulator. - Drop the children's null_on_divide_by_zero argument: children run as StatsType::Population which never hits the count==1 sample-divide-by-zero branch, and the top-level evaluate() applies the count<=1 rule itself.
- StddevGroupsAccumulator::evaluate now uses PrimitiveArray::unary to run sqrt directly on the buffer produced by the variance accumulator, avoiding the intermediate Vec<f64> allocation per emit. - Mirror the variance grouped suite's sample_single_row_nan_legacy and sample_single_row_null_when_flag_set tests in the stddev suite. Stddev wraps variance, but pin the Spark sample-divide-by-zero contract here too so it can't silently regress.
Which issue does this PR close?
Closes #1627
Closes #4249
Rationale for this change
Detailed benchmark results are posted in a comment on this PR.
DataFusion's grouped aggregate operator has a fast path that uses
GroupsAccumulatorinstead of oneAccumulatorper group. Comet'sexisting variance, stddev, covariance, and correlation aggregates only
implement the per-row
Accumulatortrait, so grouped queries fall backto a slower row-at-a-time path. Issue #1275 measured a roughly 4x
speedup from applying the same change to
avg. After this PR the fourremaining Comet-owned aggregates are on parity with
avg/sum_int/
sum_decimal/avg_decimal, which already implement the fast path.What changes are included in this PR?
agg_funcs/welford.rswith the Welford update/merge math used byboth the per-row and grouped paths. The existing
VarianceAccumulatorand
CovarianceAccumulatorper-row methods now route through thesehelpers (no behavior change).
VarianceGroupsAccumulator,StddevGroupsAccumulator,CovarianceGroupsAccumulator, andCorrelationGroupsAccumulator.Each
AggregateUDFImplnow returnstruefromgroups_accumulator_supportedand constructs the matching groupedaccumulator. Stddev wraps variance; correlation wraps one covariance
plus two variance group accumulators, mirroring the existing
Accumulatorcomposition.f64counts) andSpark semantics (
null_on_divide_by_zerofor thecount == 1branches).How are these changes tested?
correctness, null handling on each input column (correlation needs
both columns non-null per row),
opt_filtermasking, multi-batchmerge equals single-shot, and the empty-group-yields-null and
single-row-sample edge cases.
aggregate, so existing JVM coverage in
CometAggregateSuiteexercisesthe new code end-to-end. CI runs the full sweep across Spark 3.4,
3.5, and 4.0.
This PR was scaffolded with the project's superpowers:brainstorming and
superpowers:writing-plans skills.