[Go SDK] Add GroupIntoBatches transform (#19868)#38220
[Go SDK] Add GroupIntoBatches transform (#19868)#38220florian-trehaut wants to merge 4 commits intoapache:masterfrom
Conversation
This introduces the supporting infrastructure required by the upcoming GroupIntoBatches transform (apache#19868): - (*coder.Coder).IsDeterministic() reports whether a coder produces byte-stable output. Primitives (bytes, bool, varint, double, string) are deterministic; composite coders (KV, CoGBK, Nullable, Iterable, LP, ShardedKey) are deterministic iff every component is. Custom user-registered coders are non-deterministic by default and opt in via the new RegisterDeterministicCoder registration helper. - beam.Coder.IsDeterministic() forwards to the inner coder's method so transform authors can gate on determinism without reaching into internals. - beam.PCollection.WindowingStrategy() exposes the input's windowing strategy publicly so transforms honoring allowed lateness (e.g. GroupIntoBatches) can read it without package-private access. - typex.ShardedKey[K] is a concrete Go generic struct representing a sharded user key. The accompanying Kind (coder.ShardedKey) and beam:coder:sharded_key:v1 URN wiring (graphx marshal/unmarshal, exec encode/decode) produce the exact wire format documented in standard_coders.yaml:501-521 — verified byte-identical against the four published fixtures. Cross-SDK byte compatibility is required for Dataflow/Flink interoperability; a single divergent byte would silently corrupt pipelines. Roundtrip tests cover all four yaml fixtures.
…che#19868) Builds on top of the Coder.IsDeterministic / DeterministicCoder foundation and introduces the full user-facing surface for batching PCollection<KV<K,V>> elements by key. * typex.ShardedKey is added as a new Composite marker type (alongside KV, CoGBK, WindowedValue, Timers). Its runtime representation is a two-part FullValue (Elm=key, Elm2=[]byte shardID). * coder.NewSK builds the associated coder; graphx/coder and exec/coder wire the beam:coder:sharded_key:v1 URN in both directions. The wire format is byte-identical to the Java util.ShardedKey.Coder and the Python sharded_key coder — verified against all four standard_coders.yaml fixtures (lines 501-521). * beam.PCollection.WindowingStrategy and beam.Coder.IsDeterministic are exposed publicly, matching the access pattern already used inside the beam package (pardo.go, gbk.go). * transforms/batch introduces GroupIntoBatches, a stateful DoFn that buffers per-key values in a state.Bag and flushes when BatchSize / BatchSizeBytes / MaxBufferingDuration / end-of-window + allowed lateness triggers fire. The transform honors the input's allowed lateness (Java parity; Python currently ignores it) and panics at pipeline-build time on invalid params, non-KV inputs, or non-deterministic key coders. * CHANGES.md is updated under [2.74.0] - Unreleased. Scope note: this release ships GroupIntoBatches with string keys and string values. The underlying ShardedKey infrastructure is fully in place (type, coder, URN, tests); GroupIntoBatchesWithShardedKey and arbitrary K/V generics are follow-up work once the Go SDK binds universal types through state.Bag element coders. End-to-end Prism integration testing of the stateful DoFn path remains a follow-up — the pipeline hangs on job completion in the bounded case, pending investigation of Prism's watermark signalling for event-time timers set on the GlobalWindow maxTimestamp. All unit tests (coder roundtrip, Params validation, primitive sizer) pass.
…pache#19868) Extends GroupIntoBatches to arbitrary key/value types and adds GroupIntoBatchesWithShardedKey, completing the Apache Beam GroupIntoBatches feature parity with Java/Python (apache#19868). Generic K, V support: - Replaces the string-only DoFn with a typex.T / typex.V universal pair, resolved by beam.ParDo's type-binding engine at graph construction. Values flow through a state.Bag[[]byte] encoded via a cached beam.ElementEncoder/Decoder lazily initialised from beam.EncodedType{T: valueType} — a single reflect.Type captured at graph time and serialised across the SDK-worker boundary. - Separates into two concrete DoFn shapes: the plain groupIntoBatchesFn (event-time timer only) and groupIntoBatchesBufferedFn (event-time + processing-time). A single DoFn with an unused processing-time timer family stalls Prism waiting for the family's completion signal — splitting the shape by params.MaxBufferingDuration avoids the stall. - ProcessingTime timer is only wired when the user requests buffering, eliminating the Prism stall we hit on the initial implementation. WithShardedKey: - Adds GroupIntoBatchesWithShardedKey(s, params, col) that round-trips KV<K, V> → KV<[]byte-shardKey, V> → batched → KV<K, []V>. ShardIDs are 24-byte worker-UUID + atomic-counter tuples matching Java/Python layouts; downstream workers see independent state per shard, so a single hot logical key's processing spreads across workers on distributed runners. - Output shape: PCollection<KV<K, []V>>, identical to GroupIntoBatches. The Go SDK's type-binding engine does not accept custom generic structs as DoFn output types, so we do not surface ShardedKey<K> to the user. Cross-SDK bytes-compat ShardedKey coder infrastructure is still wired at the core/typex + core/graph/coder level for future bidirectional pipelines. Testing: - End-to-end Prism tests for GroupIntoBatches across count, byte and per-key-isolation triggers, including a non-string value type (int). - GroupIntoBatchesWithShardedKey pipeline construction test (Prism panics on the 3-stage round-trip pipeline with "assignment to nil map" in aggregateStageKind.buildEventTimeBundle — a runner-side regression we verify does NOT reproduce on non-Prism runners). Follow-up items documented in the package godoc.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces the GroupIntoBatches transform to the Go SDK, enabling efficient batching of PCollection elements based on count, byte size, or buffering duration. It also includes the sharded variant for handling hot keys by distributing processing across workers. To support these features, the PR adds necessary core SDK infrastructure for deterministic coder registration and cross-SDK compatible ShardedKey wire formats. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
Assigning reviewers: R: @lostluck for label go. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
…che#19868) Go generic functions produce closures with identical compiler-assigned symbol names across type instantiations — all RegisterShardedKeyType[K] instantiations generated closures named "RegisterShardedKeyType[...].func1", causing cross-worker deserialization to resolve the wrong enc/dec function (last-registered wins). Root cause: reflectx.FunctionName calls runtime.FuncForPC which returns the compiler name; Go does not qualify closure names by type parameter. Fix: three surgical additions to core SDK infrastructure: 1. reflectx.MakeFuncWithName wraps a Func with a caller-supplied Name() so the serializer (encodeUserFn → u.Fn.Name()) emits a type-qualified name like "batch.encShardedKey[string]". 2. runtime.RegisterFunctionWithName registers a function under a custom name in the resolution cache so the deserializer (decodeUserFn → ResolveFunction) finds it. 3. coder.RegisterDeterministicCoderWithFuncs accepts pre-wrapped funcx.Fn values carrying the qualified names, bypassing the automatic name derivation in NewCustomCoder. RegisterShardedKeyType[K] now uses these three mechanisms to produce stable, collision-free names per type parameter. Additionally completes GroupIntoBatchesWithShardedKey as a fully generic function that wraps each key with ShardedKey{Key, ShardID} and routes through GroupIntoBatches. End-to-end Prism test passes.
Addresses #19868 — adds
GroupIntoBatchesandGroupIntoBatchesWithShardedKeyto the Go SDK, with the supporting infrastructure for deterministic coder introspection and the standardbeam:coder:sharded_key:v1cross-SDK coder.Summary
transforms/batch.GroupIntoBatches(s, params, col)— stateful DoFn that buffers values per key in astate.Bag[[]byte](coder-encoded for generic V support) and emitsPCollection<KV<K, []V>>whenBatchSize,BatchSizeBytes,MaxBufferingDuration, or end-of-window + allowed lateness triggers fire. Arbitrary K/V types viatypex.T/typex.Vuniversals resolved at graph construction.transforms/batch.GroupIntoBatchesWithShardedKey[K](s, params, col)— wraps each user key withShardedKey[K]{Key, ShardID}(24 bytes: worker UUID + atomic counter, matching Java/Python layout) then appliesGroupIntoBatches. Output isPCollection<KV<ShardedKey[K], []V>>. Common K types (string, int, int64) registered automatically; others viaRegisterShardedKeyType[K]().(*coder.Coder).IsDeterministic()— recursive traversal over all Kind values (primitives true, composites iff components are, custom opt-in via registry).coder.RegisterDeterministicCoder(t, enc, dec)— opt-in deterministic registration for user custom coders.beam.Coder.IsDeterministic()andbeam.PCollection.WindowingStrategy()— public accessors previously limited tobeampackage internals.typex.ShardedKeycomposite marker type (alongside KV, CoGBK, WindowedValue) withIsShardedKey/NewShardedKeyhelpers.beam:coder:sharded_key:v1URN wired end-to-end:graphxmarshal/unmarshal,execencode/decode viaFullValue{Elm: key, Elm2: shardID}. Byte-identical against all 4standard_coders.yaml:506-521fixtures.reflectx.MakeFuncWithName— wraps aFuncwith a caller-suppliedName()so the serializer emits a type-qualified name instead of the compiler-assigned closure name. Required because Go generic functions produce closures with identical names across type instantiations.runtime.RegisterFunctionWithName— registers a function under a custom name in the resolution cache for cross-worker deserialization.coder.RegisterDeterministicCoderWithFuncs— accepts pre-wrappedfuncx.Fnvalues carrying qualified names, bypassing automatic name derivation.Problems solved during implementation
Generic state.Bag[V] — Go SDK state fields require concrete element types at graph construction. Values are coder-encoded into
state.Bag[[]byte]via a cachedbeam.ElementEncoder/Decoderlazily initialized frombeam.EncodedType{T: valueType}.Unused timer family stalls Prism — declaring a
timers.ProcessingTimefield without ever firing it prevents Prism from completing the bundle. Split into two DoFn shapes:groupIntoBatchesFn(event-time timer only) selected whenMaxBufferingDuration == 0, andgroupIntoBatchesBufferedFn(both timer families) otherwise.Generic closure name collisions —
RegisterShardedKeyType[string]()andRegisterShardedKeyType[int64]()produce closures with the same compiler name (RegisterShardedKeyType[...].func1). Cross-worker deserialization resolves the last-registered function (wrong type). Fixed byreflectx.MakeFuncWithName+runtime.RegisterFunctionWithNamewhich qualify the name with the type parameter (e.g.batch.encShardedKey[string]).Test plan
All tests pass on Prism loopback:
TestGroupIntoBatches_CountLimitTestGroupIntoBatches_ByteLimitTestGroupIntoBatches_PerKeyTestGroupIntoBatches_IntValuesTestGroupIntoBatchesWithShardedKey_E2ETestShardedKeyCoder_WireFormatstandard_coders.yamlfixtures byte-identicalTestCoder_IsDeterministicTestNewSK,TestSK_IsDeterministicTestParams_validateTestDefaultElementByteSizeTestIsBuiltinSizeableDesign decisions open for committer review
ShardedKeyas composite marker intypex/int64forBatchSize/BatchSizeBytesParams{}vs functional optionsvalidate(), aligns with Python_GroupIntoBatchesParams.transforms/batchtop,stats,filter.ShardedKey[K]as concrete Go generic inbatch/RegisterShardedKeyType[K]().runtime.Stackparsing fragility. Same 24-byte layout as Java/Python.reflectx.MakeFuncWithName+runtime.RegisterFunctionWithNameFiles changed
New (7 files):
sdks/go/pkg/beam/transforms/batch/batch.go— Params, DoFns, GroupIntoBatches, GroupIntoBatchesWithShardedKey, ShardedKey[K], RegisterShardedKeyType[K]sdks/go/pkg/beam/transforms/batch/batch_prism_test.go— E2E Prism testssdks/go/pkg/beam/transforms/batch/batch_test.go— Params validationsdks/go/pkg/beam/transforms/batch/doc.go— Package godoc + examplesdks/go/pkg/beam/transforms/batch/size.go— Primitive byte sizersdks/go/pkg/beam/transforms/batch/size_test.go— Sizer testssdks/go/pkg/beam/core/graph/coder/sharded_key_test.go— NewSK + IsDeterministic testsModified (10 files):
CHANGES.md— entry under[2.74.0] - Unreleasedsdks/go/pkg/beam/coder.go—IsDeterministic(),inferCoderShardedKey casesdks/go/pkg/beam/pcollection.go—WindowingStrategy()public accessorsdks/go/pkg/beam/core/graph/coder/coder.go—IsDeterministic(),ShardedKeyKind,NewSK,NewCustomCoderWithFuncssdks/go/pkg/beam/core/graph/coder/registry.go—RegisterDeterministicCoder,RegisterDeterministicCoderWithFuncssdks/go/pkg/beam/core/runtime/exec/coder.go— ShardedKey encode/decode via FullValue{Elm, Elm2}sdks/go/pkg/beam/core/runtime/exec/coder_test.go— Wire format fixturessdks/go/pkg/beam/core/runtime/graphx/coder.go— URNbeam:coder:sharded_key:v1marshal/unmarshalsdks/go/pkg/beam/core/typex/special.go,class.go,fulltype.go— ShardedKey composite typesdks/go/pkg/beam/core/util/reflectx/call.go—MakeFuncWithName,namedFuncsdks/go/pkg/beam/core/runtime/symbols.go—RegisterFunctionWithName