Skip to content

Support fully-native multi-stage (distinct-combined) collect_list / collect_set #4724

Description

@andygrove

Describe the bug

collect_list / collect_set declare their aggregate buffer as BinaryType in Spark (serialized TypedImperativeAggregate state) but produce a native ArrayType (Arrow List) state in Comet. Comet bridges this only for the simple two-stage shape: CometObjectHashAggregateExec.adjustOutputForNativeState rewrites the buffer column of a pure-Partial aggregate to ArrayType, so Partial -> Final runs natively and correctly.

It does not handle multi-stage aggregates that contain a PartialMerge stage, which Spark introduces for the distinct-aggregate rewrite, e.g.:

SELECT x, count(DISTINCT y), collect_list(z) FROM t GROUP BY x

This plans as Partial(x,y) -> PartialMerge(x,y) -> [PartialMerge, Partial](x) -> Final(x). The intermediate PartialMerge outputs are still declared BinaryType, so a fully-native pipeline crashes at runtime:

CometNativeException: Cast error: Cannot cast LIST to non-list data type Binary

(and a related nullability drift List(non-null T) vs List(nullable T) for nested element types). This is a facet of the broader Arrow-type-drift tracked in #4515.

This affects both collect_list (added in #4720) and the already-shipped collect_setcollect_set has the same latent crash on this shape.

Current behavior (workaround in #4720)

To avoid the crash, multi-stage collect_list/collect_set aggregates now fall back to Spark consistently:

  • CometExecRule.tagUnsafePartialAggregates tags the feeding pure-Partial when a PartialMerge stage of a CollectList/CollectSet aggregate is present.
  • CometBaseAggregate.doConvert falls back a PartialMerge/Final stage of these functions when no Comet partial produced the buffer (the cross-engine LocalTableScan case).

The simple two-stage collect_list/collect_set cases continue to run natively.

Expected behavior

Enable fully-native multi-stage execution by correcting the intermediate buffer schema for PartialMerge (and multi-mode) stages of CollectList/CollectSet (extend adjustOutputForNativeState beyond pure-Partial, and fix the element-nullability drift), so the ArrayType buffer round-trips across all stages. Then remove the fallback guards added in #4720.

Additional context

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions