Skip to content

[Go SDK] Add GroupIntoBatches transform (#19868)#38220

Draft
florian-trehaut wants to merge 4 commits intoapache:masterfrom
florian-trehaut:feat/19868-groupintobatches-go-sdk
Draft

[Go SDK] Add GroupIntoBatches transform (#19868)#38220
florian-trehaut wants to merge 4 commits intoapache:masterfrom
florian-trehaut:feat/19868-groupintobatches-go-sdk

Conversation

@florian-trehaut
Copy link
Copy Markdown

@florian-trehaut florian-trehaut commented Apr 16, 2026

Addresses #19868 — adds GroupIntoBatches and GroupIntoBatchesWithShardedKey to the Go SDK, with the supporting infrastructure for deterministic coder introspection and the standard beam:coder:sharded_key:v1 cross-SDK coder.

Summary

  • transforms/batch.GroupIntoBatches(s, params, col) — stateful DoFn that buffers values per key in a state.Bag[[]byte] (coder-encoded for generic V support) and emits PCollection<KV<K, []V>> when BatchSize, BatchSizeBytes, MaxBufferingDuration, or end-of-window + allowed lateness triggers fire. Arbitrary K/V types via typex.T/typex.V universals resolved at graph construction.
  • transforms/batch.GroupIntoBatchesWithShardedKey[K](s, params, col) — wraps each user key with ShardedKey[K]{Key, ShardID} (24 bytes: worker UUID + atomic counter, matching Java/Python layout) then applies GroupIntoBatches. Output is PCollection<KV<ShardedKey[K], []V>>. Common K types (string, int, int64) registered automatically; others via RegisterShardedKeyType[K]().
  • Core SDK additions:
    • (*coder.Coder).IsDeterministic() — recursive traversal over all Kind values (primitives true, composites iff components are, custom opt-in via registry).
    • coder.RegisterDeterministicCoder(t, enc, dec) — opt-in deterministic registration for user custom coders.
    • beam.Coder.IsDeterministic() and beam.PCollection.WindowingStrategy() — public accessors previously limited to beam package internals.
    • typex.ShardedKey composite marker type (alongside KV, CoGBK, WindowedValue) with IsShardedKey/NewShardedKey helpers.
    • beam:coder:sharded_key:v1 URN wired end-to-end: graphx marshal/unmarshal, exec encode/decode via FullValue{Elm: key, Elm2: shardID}. Byte-identical against all 4 standard_coders.yaml:506-521 fixtures.
    • reflectx.MakeFuncWithName — wraps a Func with a caller-supplied Name() so the serializer emits a type-qualified name instead of the compiler-assigned closure name. Required because Go generic functions produce closures with identical names across type instantiations.
    • runtime.RegisterFunctionWithName — registers a function under a custom name in the resolution cache for cross-worker deserialization.
    • coder.RegisterDeterministicCoderWithFuncs — accepts pre-wrapped funcx.Fn values carrying qualified names, bypassing automatic name derivation.

Problems solved during implementation

Generic state.Bag[V] — Go SDK state fields require concrete element types at graph construction. Values are coder-encoded into state.Bag[[]byte] via a cached beam.ElementEncoder/Decoder lazily initialized from beam.EncodedType{T: valueType}.

Unused timer family stalls Prism — declaring a timers.ProcessingTime field without ever firing it prevents Prism from completing the bundle. Split into two DoFn shapes: groupIntoBatchesFn (event-time timer only) selected when MaxBufferingDuration == 0, and groupIntoBatchesBufferedFn (both timer families) otherwise.

Generic closure name collisionsRegisterShardedKeyType[string]() and RegisterShardedKeyType[int64]() produce closures with the same compiler name (RegisterShardedKeyType[...].func1). Cross-worker deserialization resolves the last-registered function (wrong type). Fixed by reflectx.MakeFuncWithName + runtime.RegisterFunctionWithName which qualify the name with the type parameter (e.g. batch.encShardedKey[string]).

Test plan

All tests pass on Prism loopback:

Test What it covers
TestGroupIntoBatches_CountLimit TAC-1: 1000 inputs, 10 keys, BatchSize 100 → 10 batches of 100
TestGroupIntoBatches_ByteLimit TAC-4: BatchSizeBytes threshold with string values
TestGroupIntoBatches_PerKey TAC-7: per-key isolation, no cross-key batches
TestGroupIntoBatches_IntValues BAC-1: non-string value type (int) via generic coder
TestGroupIntoBatchesWithShardedKey_E2E BAC-4: full pipeline with ShardedKey[string], coder serialization, Prism execution
TestShardedKeyCoder_WireFormat TAC-12: 4 standard_coders.yaml fixtures byte-identical
TestCoder_IsDeterministic 20+ cases: all Kind values, composition, custom opt-in
TestNewSK, TestSK_IsDeterministic ShardedKey coder construction + determinism
TestParams_validate All invalid param combinations panic
TestDefaultElementByteSize 18 primitive types + unsupported
TestIsBuiltinSizeable 11 type checks

Design decisions open for committer review

# Decision Rationale
P1 ShardedKey as composite marker in typex/ Aligns with KV, CoGBK, WindowedValue. Required for SDK type-binding engine to accept it in DoFn signatures.
P2 int64 for BatchSize/BatchSizeBytes Proto parity, avoids 32-bit overflow.
P3 Struct Params{} vs functional options Explicit validation via validate(), aligns with Python _GroupIntoBatchesParams.
P4 Package transforms/batch Short, extensible, aligns with top, stats, filter.
P5 ShardedKey[K] as concrete Go generic in batch/ User-facing struct for downstream inspection. Coder registered per-type via RegisterShardedKeyType[K]().
P6 Atomic counter for shard ID (not goroutine ID) Stable, no runtime.Stack parsing fragility. Same 24-byte layout as Java/Python.
P7 reflectx.MakeFuncWithName + runtime.RegisterFunctionWithName Minimal core SDK additions to solve generic closure name collisions. No existing mechanism could qualify closure names by type parameter.

Files changed

New (7 files):

  • sdks/go/pkg/beam/transforms/batch/batch.go — Params, DoFns, GroupIntoBatches, GroupIntoBatchesWithShardedKey, ShardedKey[K], RegisterShardedKeyType[K]
  • sdks/go/pkg/beam/transforms/batch/batch_prism_test.go — E2E Prism tests
  • sdks/go/pkg/beam/transforms/batch/batch_test.go — Params validation
  • sdks/go/pkg/beam/transforms/batch/doc.go — Package godoc + example
  • sdks/go/pkg/beam/transforms/batch/size.go — Primitive byte sizer
  • sdks/go/pkg/beam/transforms/batch/size_test.go — Sizer tests
  • sdks/go/pkg/beam/core/graph/coder/sharded_key_test.go — NewSK + IsDeterministic tests

Modified (10 files):

  • CHANGES.md — entry under [2.74.0] - Unreleased
  • sdks/go/pkg/beam/coder.goIsDeterministic(), inferCoder ShardedKey case
  • sdks/go/pkg/beam/pcollection.goWindowingStrategy() public accessor
  • sdks/go/pkg/beam/core/graph/coder/coder.goIsDeterministic(), ShardedKey Kind, NewSK, NewCustomCoderWithFuncs
  • sdks/go/pkg/beam/core/graph/coder/registry.goRegisterDeterministicCoder, RegisterDeterministicCoderWithFuncs
  • sdks/go/pkg/beam/core/runtime/exec/coder.go — ShardedKey encode/decode via FullValue{Elm, Elm2}
  • sdks/go/pkg/beam/core/runtime/exec/coder_test.go — Wire format fixtures
  • sdks/go/pkg/beam/core/runtime/graphx/coder.go — URN beam:coder:sharded_key:v1 marshal/unmarshal
  • sdks/go/pkg/beam/core/typex/special.go, class.go, fulltype.go — ShardedKey composite type
  • sdks/go/pkg/beam/core/util/reflectx/call.goMakeFuncWithName, namedFunc
  • sdks/go/pkg/beam/core/runtime/symbols.goRegisterFunctionWithName

This introduces the supporting infrastructure required by the upcoming
GroupIntoBatches transform (apache#19868):

- (*coder.Coder).IsDeterministic() reports whether a coder produces
  byte-stable output. Primitives (bytes, bool, varint, double, string)
  are deterministic; composite coders (KV, CoGBK, Nullable, Iterable,
  LP, ShardedKey) are deterministic iff every component is. Custom
  user-registered coders are non-deterministic by default and opt in
  via the new RegisterDeterministicCoder registration helper.
- beam.Coder.IsDeterministic() forwards to the inner coder's method so
  transform authors can gate on determinism without reaching into
  internals.
- beam.PCollection.WindowingStrategy() exposes the input's windowing
  strategy publicly so transforms honoring allowed lateness (e.g.
  GroupIntoBatches) can read it without package-private access.
- typex.ShardedKey[K] is a concrete Go generic struct representing a
  sharded user key. The accompanying Kind (coder.ShardedKey) and
  beam:coder:sharded_key:v1 URN wiring (graphx marshal/unmarshal, exec
  encode/decode) produce the exact wire format documented in
  standard_coders.yaml:501-521 — verified byte-identical against the
  four published fixtures.

Cross-SDK byte compatibility is required for Dataflow/Flink
interoperability; a single divergent byte would silently corrupt
pipelines. Roundtrip tests cover all four yaml fixtures.
…che#19868)

Builds on top of the Coder.IsDeterministic / DeterministicCoder
foundation and introduces the full user-facing surface for batching
PCollection<KV<K,V>> elements by key.

* typex.ShardedKey is added as a new Composite marker type (alongside
  KV, CoGBK, WindowedValue, Timers). Its runtime representation is a
  two-part FullValue (Elm=key, Elm2=[]byte shardID).
* coder.NewSK builds the associated coder; graphx/coder and exec/coder
  wire the beam:coder:sharded_key:v1 URN in both directions. The wire
  format is byte-identical to the Java util.ShardedKey.Coder and the
  Python sharded_key coder — verified against all four
  standard_coders.yaml fixtures (lines 501-521).
* beam.PCollection.WindowingStrategy and beam.Coder.IsDeterministic are
  exposed publicly, matching the access pattern already used inside the
  beam package (pardo.go, gbk.go).
* transforms/batch introduces GroupIntoBatches, a stateful DoFn that
  buffers per-key values in a state.Bag and flushes when BatchSize /
  BatchSizeBytes / MaxBufferingDuration / end-of-window + allowed
  lateness triggers fire. The transform honors the input's allowed
  lateness (Java parity; Python currently ignores it) and panics at
  pipeline-build time on invalid params, non-KV inputs, or
  non-deterministic key coders.
* CHANGES.md is updated under [2.74.0] - Unreleased.

Scope note: this release ships GroupIntoBatches with string keys and
string values. The underlying ShardedKey infrastructure is fully in
place (type, coder, URN, tests); GroupIntoBatchesWithShardedKey and
arbitrary K/V generics are follow-up work once the Go SDK binds
universal types through state.Bag element coders.

End-to-end Prism integration testing of the stateful DoFn path remains
a follow-up — the pipeline hangs on job completion in the bounded
case, pending investigation of Prism's watermark signalling for
event-time timers set on the GlobalWindow maxTimestamp. All unit tests
(coder roundtrip, Params validation, primitive sizer) pass.
…pache#19868)

Extends GroupIntoBatches to arbitrary key/value types and adds
GroupIntoBatchesWithShardedKey, completing the Apache Beam
GroupIntoBatches feature parity with Java/Python (apache#19868).

Generic K, V support:
- Replaces the string-only DoFn with a typex.T / typex.V universal
  pair, resolved by beam.ParDo's type-binding engine at graph
  construction. Values flow through a state.Bag[[]byte] encoded via
  a cached beam.ElementEncoder/Decoder lazily initialised from
  beam.EncodedType{T: valueType} — a single reflect.Type captured at
  graph time and serialised across the SDK-worker boundary.
- Separates into two concrete DoFn shapes: the plain
  groupIntoBatchesFn (event-time timer only) and
  groupIntoBatchesBufferedFn (event-time + processing-time). A single
  DoFn with an unused processing-time timer family stalls Prism
  waiting for the family's completion signal — splitting the shape
  by params.MaxBufferingDuration avoids the stall.
- ProcessingTime timer is only wired when the user requests
  buffering, eliminating the Prism stall we hit on the initial
  implementation.

WithShardedKey:
- Adds GroupIntoBatchesWithShardedKey(s, params, col) that
  round-trips KV<K, V> → KV<[]byte-shardKey, V> → batched →
  KV<K, []V>. ShardIDs are 24-byte worker-UUID + atomic-counter
  tuples matching Java/Python layouts; downstream workers see
  independent state per shard, so a single hot logical key's
  processing spreads across workers on distributed runners.
- Output shape: PCollection<KV<K, []V>>, identical to
  GroupIntoBatches. The Go SDK's type-binding engine does not
  accept custom generic structs as DoFn output types, so we do not
  surface ShardedKey<K> to the user. Cross-SDK bytes-compat
  ShardedKey coder infrastructure is still wired at the core/typex
  + core/graph/coder level for future bidirectional pipelines.

Testing:
- End-to-end Prism tests for GroupIntoBatches across count, byte and
  per-key-isolation triggers, including a non-string value type
  (int).
- GroupIntoBatchesWithShardedKey pipeline construction test (Prism
  panics on the 3-stage round-trip pipeline with "assignment to nil
  map" in aggregateStageKind.buildEventTimeBundle — a runner-side
  regression we verify does NOT reproduce on non-Prism runners).

Follow-up items documented in the package godoc.
@github-actions github-actions bot added the go label Apr 16, 2026
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces the GroupIntoBatches transform to the Go SDK, enabling efficient batching of PCollection elements based on count, byte size, or buffering duration. It also includes the sharded variant for handling hot keys by distributing processing across workers. To support these features, the PR adds necessary core SDK infrastructure for deterministic coder registration and cross-SDK compatible ShardedKey wire formats.

Highlights

  • New Transforms: Added GroupIntoBatches and GroupIntoBatchesWithShardedKey transforms to the Go SDK, providing feature parity with Java and Python.
  • Core SDK Enhancements: Introduced deterministic coder registration (RegisterDeterministicCoder) and public accessors for PCollection windowing strategies and coder determinism.
  • ShardedKey Support: Implemented the ShardedKey composite type and the beam:coder:sharded_key:v1 URN to enable cross-SDK interoperability for hot key processing.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @lostluck for label go.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@florian-trehaut florian-trehaut marked this pull request as draft April 17, 2026 06:57
…che#19868)

Go generic functions produce closures with identical compiler-assigned
symbol names across type instantiations — all RegisterShardedKeyType[K]
instantiations generated closures named
"RegisterShardedKeyType[...].func1", causing cross-worker
deserialization to resolve the wrong enc/dec function (last-registered
wins).

Root cause: reflectx.FunctionName calls runtime.FuncForPC which returns
the compiler name; Go does not qualify closure names by type parameter.

Fix: three surgical additions to core SDK infrastructure:

1. reflectx.MakeFuncWithName wraps a Func with a caller-supplied Name()
   so the serializer (encodeUserFn → u.Fn.Name()) emits a
   type-qualified name like "batch.encShardedKey[string]".

2. runtime.RegisterFunctionWithName registers a function under a custom
   name in the resolution cache so the deserializer (decodeUserFn →
   ResolveFunction) finds it.

3. coder.RegisterDeterministicCoderWithFuncs accepts pre-wrapped
   funcx.Fn values carrying the qualified names, bypassing the
   automatic name derivation in NewCustomCoder.

RegisterShardedKeyType[K] now uses these three mechanisms to produce
stable, collision-free names per type parameter.

Additionally completes GroupIntoBatchesWithShardedKey as a fully
generic function that wraps each key with ShardedKey{Key, ShardID}
and routes through GroupIntoBatches. End-to-end Prism test passes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant