diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java index 8d300fc8bb6c..a21df8a900e0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.util.construction.PTransformReplacements; import org.apache.beam.sdk.util.construction.ReplacementOutputs; @@ -142,6 +143,17 @@ public void process(ProcessContext c) { c.output(KV.of(c.element().getKey(), currentBatch)); } } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + if (maxBatchSizeElements < Long.MAX_VALUE) { + builder.add(DisplayData.item("batchSize", maxBatchSizeElements)); + } + if (maxBatchSizeBytes < Long.MAX_VALUE) { + builder.add(DisplayData.item("batchSizeBytes", maxBatchSizeBytes)); + } + } })); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index 88c63d48de1e..773cdea2abc4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; @@ -103,9 +104,6 @@ @SuppressWarnings({ "nullness", // TODO(https://github.com/apache/beam/issues/20497) "rawtypes", - // TODO(https://github.com/apache/beam/issues/21230): Remove when new version of - // errorprone is released (2.11.0) - "unused" }) public class GroupIntoBatches extends PTransform>, PCollection>>> { @@ -675,5 +673,19 @@ private void clearState( timerTs.clear(); minBufferedTs.clear(); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + if (batchSize < Long.MAX_VALUE) { + builder.add(DisplayData.item("batchSize", batchSize)); + } + if (batchSizeBytes < Long.MAX_VALUE) { + builder.add(DisplayData.item("batchSizeBytes", batchSizeBytes)); + } + if (maxBufferingDuration.isLongerThan(Duration.ZERO)) { + builder.add(DisplayData.item("maxBufferingDuration", maxBufferingDuration)); + } + } } }