Describe the bug
collect_list / collect_set declare their aggregate buffer as BinaryType in Spark (serialized TypedImperativeAggregate state) but produce a native ArrayType (Arrow List) state in Comet. Comet bridges this only for the simple two-stage shape: CometObjectHashAggregateExec.adjustOutputForNativeState rewrites the buffer column of a pure-Partial aggregate to ArrayType, so Partial -> Final runs natively and correctly.
It does not handle multi-stage aggregates that contain a PartialMerge stage, which Spark introduces for the distinct-aggregate rewrite, e.g.:
SELECT x, count(DISTINCT y), collect_list(z) FROM t GROUP BY x
This plans as Partial(x,y) -> PartialMerge(x,y) -> [PartialMerge, Partial](x) -> Final(x). The intermediate PartialMerge outputs are still declared BinaryType, so a fully-native pipeline crashes at runtime:
CometNativeException: Cast error: Cannot cast LIST to non-list data type Binary
(and a related nullability drift List(non-null T) vs List(nullable T) for nested element types). This is a facet of the broader Arrow-type-drift tracked in #4515.
This affects both collect_list (added in #4720) and the already-shipped collect_set — collect_set has the same latent crash on this shape.
Current behavior (workaround in #4720)
To avoid the crash, multi-stage collect_list/collect_set aggregates now fall back to Spark consistently:
CometExecRule.tagUnsafePartialAggregates tags the feeding pure-Partial when a PartialMerge stage of a CollectList/CollectSet aggregate is present.
CometBaseAggregate.doConvert falls back a PartialMerge/Final stage of these functions when no Comet partial produced the buffer (the cross-engine LocalTableScan case).
The simple two-stage collect_list/collect_set cases continue to run natively.
Expected behavior
Enable fully-native multi-stage execution by correcting the intermediate buffer schema for PartialMerge (and multi-mode) stages of CollectList/CollectSet (extend adjustOutputForNativeState beyond pure-Partial, and fix the element-nullability drift), so the ArrayType buffer round-trips across all stages. Then remove the fallback guards added in #4720.
Additional context
Describe the bug
collect_list/collect_setdeclare their aggregate buffer asBinaryTypein Spark (serializedTypedImperativeAggregatestate) but produce a nativeArrayType(ArrowList) state in Comet. Comet bridges this only for the simple two-stage shape:CometObjectHashAggregateExec.adjustOutputForNativeStaterewrites the buffer column of a pure-Partialaggregate toArrayType, soPartial -> Finalruns natively and correctly.It does not handle multi-stage aggregates that contain a
PartialMergestage, which Spark introduces for the distinct-aggregate rewrite, e.g.:This plans as
Partial(x,y) -> PartialMerge(x,y) -> [PartialMerge, Partial](x) -> Final(x). The intermediatePartialMergeoutputs are still declaredBinaryType, so a fully-native pipeline crashes at runtime:(and a related nullability drift
List(non-null T)vsList(nullable T)for nested element types). This is a facet of the broader Arrow-type-drift tracked in #4515.This affects both
collect_list(added in #4720) and the already-shippedcollect_set—collect_sethas the same latent crash on this shape.Current behavior (workaround in #4720)
To avoid the crash, multi-stage
collect_list/collect_setaggregates now fall back to Spark consistently:CometExecRule.tagUnsafePartialAggregatestags the feeding pure-Partialwhen aPartialMergestage of aCollectList/CollectSetaggregate is present.CometBaseAggregate.doConvertfalls back aPartialMerge/Finalstage of these functions when no Comet partial produced the buffer (the cross-engineLocalTableScancase).The simple two-stage
collect_list/collect_setcases continue to run natively.Expected behavior
Enable fully-native multi-stage execution by correcting the intermediate buffer schema for
PartialMerge(and multi-mode) stages ofCollectList/CollectSet(extendadjustOutputForNativeStatebeyond pure-Partial, and fix the element-nullability drift), so theArrayTypebuffer round-trips across all stages. Then remove the fallback guards added in #4720.Additional context
collect_list/array_agg)