Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Added `GroupIntoBatches` transform and the standard
`beam:coder:sharded_key:v1` coder to the Go SDK, along with
`beam.Coder.IsDeterministic`, `beam.PCollection.WindowingStrategy`,
and `coder.RegisterDeterministicCoder` for opt-in deterministic
custom coders (Go) ([#19868](https://github.com/apache/beam/issues/19868)).
* TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to
encode finished bitset. SentinelBitSetCoder and BitSetCoder are state
compatible. Both coders can decode encoded bytes from the other coder
Expand Down
17 changes: 17 additions & 0 deletions sdks/go/pkg/beam/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ func (c Coder) String() string {
return c.coder.String()
}

// IsDeterministic reports whether this coder produces a byte-deterministic
// encoding: encoding two equal values always yields identical byte
// sequences.
//
// Determinism is required for any coder used as a state key in a stateful
// DoFn or as the key component of a KV consumed by GroupByKey /
// GroupIntoBatches. A non-deterministic key coder would silently corrupt
// state keying, splintering state across apparently-distinct keys.
func (c Coder) IsDeterministic() bool {
if c.coder == nil {
return false
}
return c.coder.IsDeterministic()
}

// NewElementEncoder returns a new encoding function for the given type.
func NewElementEncoder(t reflect.Type) ElementEncoder {
c, err := inferCoder(typex.New(t))
Expand Down Expand Up @@ -249,6 +264,8 @@ func inferCoder(t FullType) (*coder.Coder, error) {
// are non-windowed? We either need to know the windowing strategy or
// we should remove this case.
return &coder.Coder{Kind: coder.WindowedValue, T: t, Components: c, Window: coder.NewGlobalWindow()}, nil
case typex.ShardedKeyType:
return &coder.Coder{Kind: coder.ShardedKey, T: t, Components: c}, nil

default:
panic(fmt.Sprintf("Unexpected composite type: %v", t))
Expand Down
116 changes: 116 additions & 0 deletions sdks/go/pkg/beam/core/graph/coder/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ func (c *CustomCoder) String() string {
return fmt.Sprintf("%v[%v;%v]", c.Type, c.Name, c.ID)
}

// IsDeterministic reports whether this CustomCoder produces a deterministic
// encoding. A CustomCoder is deterministic iff the user opted in by
// registering the coder via RegisterDeterministicCoder. Default is false
// (conservative): a non-deterministic key coder would silently corrupt state
// keying in stateful DoFns.
func (c *CustomCoder) IsDeterministic() bool {
if c == nil {
return false
}
return isCustomCoderDeterministic(c.Type)
}

// Type signatures of encode/decode for verification.
var (
encodeSig = &funcx.Signature{
Expand Down Expand Up @@ -156,6 +168,20 @@ func NewCustomCoder(id string, t reflect.Type, encode, decode any) (*CustomCoder
return c, nil
}

// NewCustomCoderWithFuncs creates a CustomCoder from pre-wrapped
// reflectx.Func values. This allows the caller to control the Name()
// returned by each function — critical for closures inside Go generic
// functions where the compiler assigns identical names to different
// type instantiations.
func NewCustomCoderWithFuncs(id string, t reflect.Type, enc, dec *funcx.Fn) *CustomCoder {
return &CustomCoder{
Name: id,
Type: t,
Enc: enc,
Dec: dec,
}
}

// Kind represents the type of coder used.
type Kind string

Expand Down Expand Up @@ -195,6 +221,17 @@ const (
//
// TODO(https://github.com/apache/beam/issues/18032): once this JIRA is done, this coder should become the new thing.
CoGBK Kind = "CoGBK"

// ShardedKey encodes a user key wrapped with an opaque shard identifier,
// used by GroupIntoBatchesWithShardedKey to distribute a single logical
// key's processing across workers. Wire format
// (beam:coder:sharded_key:v1):
//
// ByteArrayCoder.encode(shardId) ++ keyCoder.encode(key)
//
// matching sdks/java/core ShardedKey and the Python sharded_key
// encoding for cross-SDK interoperability.
ShardedKey Kind = "SK"
)

// Coder is a description of how to encode and decode values of a given type.
Expand Down Expand Up @@ -273,6 +310,62 @@ func (c *Coder) String() string {
return ret
}

// IsDeterministic reports whether this Coder produces a deterministic
// byte encoding — i.e. encoding two equal values always yields identical
// byte sequences.
//
// Determinism is a prerequisite for any Coder used as a state key in a
// stateful DoFn, as the key component of a KV consumed by GroupByKey, or as
// a grouping key in a CoGroupByKey. A non-deterministic key coder causes
// state-keyed operations to silently corrupt: two encodings of the same
// logical key map to distinct physical keys, splintering state across
// apparently-distinct keys.
//
// Built-in coders for primitive types (bytes, bool, varint, double,
// string) are deterministic. Composite coders (KV, Iterable, Nullable)
// are deterministic iff every component is. The Map coder is
// non-deterministic because Go map iteration order is unspecified.
// Custom user-registered coders are non-deterministic by default; users
// opt in by registering with RegisterDeterministicCoder.
func (c *Coder) IsDeterministic() bool {
if c == nil {
return false
}
switch c.Kind {
case Bytes, Bool, VarInt, Double, String:
return true
case Custom:
return c.Custom.IsDeterministic()
case KV, CoGBK, Nullable, Iterable, LP, ShardedKey:
for _, comp := range c.Components {
if !comp.IsDeterministic() {
return false
}
}
return true
case WindowedValue, ParamWindowedValue, Window, Timer, PaneInfo, IW:
// These coders are structural: they wrap runner/window bookkeeping that is
// not used as a state key. Recurse into the data component when present so
// that a non-deterministic inner coder is still reported.
for _, comp := range c.Components {
if !comp.IsDeterministic() {
return false
}
}
return true
case Row:
// Schema (row) coding encodes fields in a fixed field-id order and
// produces a stable byte layout; however, row coders may contain fields
// backed by custom coders we cannot introspect here. Conservative
// default: return false and allow users to opt in via schema-level
// determinism guarantees once they're exposed. Structs wanting
// deterministic behavior can register a deterministic custom coder
// instead.
return false
}
return false
}

// NewBytes returns a new []byte coder using the built-in scheme. It
// is always nested, for now.
func NewBytes() *Coder {
Expand Down Expand Up @@ -428,6 +521,29 @@ func NewCoGBK(components []*Coder) *Coder {
}
}

// NewSK returns a coder for ShardedKey-typed values. The component
// keyCoder encodes the user key; the ShardID is encoded as a
// length-prefixed byte string preceding it (beam:coder:sharded_key:v1).
//
// The resulting FullType root is typex.ShardedKeyType with the key's
// FullType as the single component, following the same Composite
// pattern as KV.
func NewSK(keyCoder *Coder) *Coder {
if keyCoder == nil {
panic("NewSK: keyCoder must not be nil")
}
return &Coder{
Kind: ShardedKey,
T: typex.New(typex.ShardedKeyType, keyCoder.T),
Components: []*Coder{keyCoder},
}
}

// IsSK returns true iff the coder is for a ShardedKey.
func IsSK(c *Coder) bool {
return c != nil && c.Kind == ShardedKey
}

// SkipW returns the data coder used by a WindowedValue, or returns the coder. This
// allows code to seamlessly traverse WindowedValues without additional conditional
// code.
Expand Down
66 changes: 66 additions & 0 deletions sdks/go/pkg/beam/core/graph/coder/coder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,72 @@ func TestNewNullable(t *testing.T) {
}
}

func TestCoder_IsDeterministic(t *testing.T) {
ints := NewVarInt()
bytes := NewBytes()
bools := NewBool()
doubles := NewDouble()
strs := NewString()

enc := func(string) []byte { return nil }
dec := func([]byte) string { return "" }

nonDetCustom, err := NewCustomCoder("nonDet", reflectx.String, enc, dec)
if err != nil {
t.Fatal(err)
}
nonDetC := &Coder{Kind: Custom, Custom: nonDetCustom, T: typex.New(reflectx.String)}

// Register a deterministic custom coder for a dedicated type.
type detType struct{}
detT := reflect.TypeOf((*detType)(nil)).Elem()
detEnc := func(detType) []byte { return nil }
detDec := func([]byte) detType { return detType{} }
RegisterDeterministicCoder(detT, detEnc, detDec)
detCustom, err := NewCustomCoder("det", detT, detEnc, detDec)
if err != nil {
t.Fatal(err)
}
detC := &Coder{Kind: Custom, Custom: detCustom, T: typex.New(detT)}

tests := []struct {
name string
c *Coder
want bool
}{
{"nil", nil, false},
{"bytes", bytes, true},
{"bool", bools, true},
{"varint", ints, true},
{"double", doubles, true},
{"string", strs, true},
{"nonDetCustom", nonDetC, false},
{"detCustom", detC, true},
{"KV_bytes_varint", NewKV([]*Coder{bytes, ints}), true},
{"KV_bytes_nonDet", NewKV([]*Coder{bytes, nonDetC}), false},
{"KV_nonDet_bytes", NewKV([]*Coder{nonDetC, bytes}), false},
{"iterable_varint", NewI(ints), true},
{"iterable_nonDet", NewI(nonDetC), false},
{"nullable_string", NewN(strs), true},
{"nullable_nonDet", NewN(nonDetC), false},
{"CoGBK_bytes_varint", NewCoGBK([]*Coder{bytes, ints}), true},
{"CoGBK_nonDet_varint", NewCoGBK([]*Coder{nonDetC, ints}), false},
{"WindowedValue_varint", NewW(ints, NewGlobalWindow()), true},
{"WindowedValue_nonDet", NewW(nonDetC, NewGlobalWindow()), false},
{"Row", NewR(typex.New(reflect.TypeOf((*namedTypeForTest)(nil)))), false},
}

for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
got := test.c.IsDeterministic()
if got != test.want {
t.Errorf("IsDeterministic(%v) = %v, want %v", test.c, got, test.want)
}
})
}
}

func TestNewCoGBK(t *testing.T) {
bytes := NewBytes()
ints := NewVarInt()
Expand Down
60 changes: 58 additions & 2 deletions sdks/go/pkg/beam/core/graph/coder/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package coder
import (
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
)

var (
coderRegistry = make(map[reflect.Type]func(reflect.Type) *CustomCoder)
interfaceOrdering []reflect.Type
coderRegistry = make(map[reflect.Type]func(reflect.Type) *CustomCoder)
interfaceOrdering []reflect.Type
deterministicRegistry = make(map[reflect.Type]bool)
)

// RegisterCoder registers a user defined coder for a given type, and will
Expand Down Expand Up @@ -76,6 +78,60 @@ func RegisterCoder(t reflect.Type, enc, dec any) {
}
}

// RegisterDeterministicCoder is the deterministic-affirming counterpart to
// RegisterCoder: it registers the (enc, dec) pair for t AND records that the
// resulting CustomCoder produces a deterministic encoding. The caller asserts
// by calling this function that enc produces byte-identical output for any
// two equal input values of type t.
//
// Deterministic coders are required for any type used as a state key in a
// stateful DoFn, as the key of a KV consumed by GroupByKey / GroupIntoBatches,
// or as a grouping key for CoGroupByKey.
//
// Prefer this over RegisterCoder whenever the encoded type may be used as a
// key. For types that cannot guarantee determinism (e.g. encodings backed by
// map[K]V iteration order), use the plain RegisterCoder.
// RegisterDeterministicCoderWithFuncs is like RegisterDeterministicCoder
// but accepts pre-wrapped reflectx.Func values (typically built via
// reflectx.MakeFuncWithName) so the caller controls the function name
// used during cross-worker serialization. This is required for
// closures inside Go generic functions where different type
// instantiations produce closures with the same compiler name.
func RegisterDeterministicCoderWithFuncs(t reflect.Type, encFn, decFn *funcx.Fn) {
name := t.String()
coderRegistry[t] = func(rt reflect.Type) *CustomCoder {
return NewCustomCoderWithFuncs(name, rt, encFn, decFn)
}
deterministicRegistry[t] = true
}

func RegisterDeterministicCoder(t reflect.Type, enc, dec any) {
RegisterCoder(t, enc, dec)
deterministicRegistry[t] = true
}

// isCustomCoderDeterministic returns true iff t has been registered via
// RegisterDeterministicCoder.
func isCustomCoderDeterministic(t reflect.Type) bool {
if t == nil {
return false
}
if ok, present := deterministicRegistry[t]; present {
return ok
}
// Also match against interface registrations: if the type implements a
// registered-deterministic interface, honor that.
for rt, det := range deterministicRegistry {
if !det {
continue
}
if rt.Kind() == reflect.Interface && t.Implements(rt) {
return true
}
}
return false
}

// LookupCustomCoder returns the custom coder for the type if any,
// first checking for a specific matching type, and then iterating
// through registered interface coders in reverse registration order.
Expand Down
Loading
Loading