diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index d39801a75587..07e4756885dd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -477,6 +477,21 @@ public void outputWindowedValue( element.causedByDrain())); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + noteOutput(); + if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { + ((TimestampObservingWatermarkEstimator) watermarkEstimator) + .observeTimestamp(windowedValue.getTimestamp()); + } + outputReceiver.output(tag, windowedValue); + } + private void noteOutput() { checkState(!hasClaimFailed, "Output is not allowed after a failed tryClaim()"); checkState(numClaimedBlocks > 0, "Output is not allowed before tryClaim()"); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 85cf9cefde15..0721ddc4685e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -376,7 +376,7 @@ public void processElements(Iterable> values) throws Excep emit( contextFactory.base(window, StateStyle.DIRECT), contextFactory.base(window, StateStyle.RENAMED), - null); + CausedByDrain.NORMAL); } // We're all done with merging and emitting elements so can compress the activeWindow state. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 74f5a4d09001..46c14bf6dee6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -443,6 +443,17 @@ public void output(TupleTag tag, T output) { SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); @@ -1027,6 +1038,17 @@ public void outputWindowedValue( .output(); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + checkTimestamp(timestamp(), windowedValue.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); + } + @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( @@ -1286,6 +1308,17 @@ public void outputWindowedValue( .output(); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + checkTimestamp(this.timestamp, windowedValue.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); + } + @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java index 83d2af7b66bb..2f53bf2e9bc4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestOutputReceiver.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.WindowedValues; import org.joda.time.Instant; @@ -54,6 +55,7 @@ public OutputBuilder builder(T value) { .setWindow(fakeWindow) .setPaneInfo(PaneInfo.NO_FIRING) .setTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE) + .setCausedByDrain(CausedByDrain.NORMAL) .setReceiver(windowedValue -> records.add(windowedValue.getValue())); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 41beb93a5cbe..0d892ab12d33 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; @@ -284,6 +285,10 @@ public abstract void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo); + + public abstract void outputWindowedValue(TupleTag tag, WindowedValue windowedValue); + + public abstract void outputWindowedValue(WindowedValue windowedValue); } /** Information accessible when running a {@link DoFn.ProcessElement} method. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java index e2c0825e0274..37850a9fcf84 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java @@ -70,10 +70,8 @@ public OutputBuilder builder(Row value) { rowWithMetadata -> { ((DoFn.WindowedContext) context) .outputWindowedValue( - schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()), - rowWithMetadata.getTimestamp(), - rowWithMetadata.getWindows(), - rowWithMetadata.getPaneInfo()); + rowWithMetadata.withValue( + schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()))); }); } else { @@ -84,10 +82,8 @@ public OutputBuilder builder(Row value) { rowWithMetadata -> { context.outputWindowedValue( tag, - schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()), - rowWithMetadata.getTimestamp(), - rowWithMetadata.getWindows(), - rowWithMetadata.getPaneInfo()); + rowWithMetadata.withValue( + schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()))); }); } } @@ -120,19 +116,9 @@ public OutputBuilder builder(T value) { @Override public void output(WindowedValue windowedValue) { if (outputTag != null) { - context.outputWindowedValue( - outputTag, - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo()); + context.outputWindowedValue(outputTag, windowedValue); } else { - ((DoFn.WindowedContext) context) - .outputWindowedValue( - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo()); + ((DoFn.WindowedContext) context).outputWindowedValue(windowedValue); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index a4f3cba21050..4de2c3d2c9c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -649,6 +649,27 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp CausedByDrain.NORMAL)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + for (BoundedWindow w : windowedValue.getWindows()) { + getMutableOutput(tag) + .add( + ValueInSingleWindow.of( + windowedValue.getValue(), + windowedValue.getTimestamp(), + w, + windowedValue.getPaneInfo(), + windowedValue.getRecordId(), + windowedValue.getRecordOffset(), + windowedValue.causedByDrain())); + } + } + @Override public void outputWindowedValue( TupleTag tag, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java index 5fb4ea61e8ef..44f27824382d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java @@ -187,6 +187,7 @@ public void processElement( .setTimestamp(kv.getValue().getTimestamp()) .setWindow(kv.getValue().getWindow()) .setPaneInfo(kv.getValue().getPaneInfo()) + .setCausedByDrain(kv.getValue().getCausedByDrain()) .output(); } })); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java index af125d9e63e8..9974d29bfa0f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -141,16 +142,24 @@ public PCollection>> expand(PCollection> i new DoFn, KV>>() { @ProcessElement public void processElement( + ProcessContext pc, @Element KV element, @DoFn.Timestamp Instant timestamp, BoundedWindow window, PaneInfo paneInfo, + CausedByDrain causedByDrain, OutputReceiver>> r) { r.output( KV.of( element.getKey(), ValueInSingleWindow.of( - element.getValue(), timestamp, window, paneInfo))); + element.getValue(), + timestamp, + window, + paneInfo, + pc.currentRecordId(), + pc.currentRecordOffset(), + causedByDrain))); } })) .setCoder( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index 6d058b3b6ada..5adf3d32a1a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.checkerframework.checker.nullness.qual.Nullable; @@ -560,13 +561,7 @@ public OutputReceiver outputReceiver(DoFn doFn) { public OutputBuilder builder(OutputT value) { return outputBuilderSupplier .builder(value) - .setReceiver( - windowedValue -> - outerContext.outputWindowedValue( - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + .setReceiver(windowedValue -> outerContext.outputWindowedValue(windowedValue)); } }; } @@ -582,13 +577,7 @@ public OutputBuilder builder(T value) { return outputBuilderSupplier .builder(value) .setReceiver( - windowedValue -> - outerContext.outputWindowedValue( - tag, - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + windowedValue -> outerContext.outputWindowedValue(tag, windowedValue)); } }; } @@ -659,6 +648,16 @@ public void outputWindowedValue( outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outerContext.outputWindowedValue(windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outerContext.outputWindowedValue(tag, windowedValue); + } + @Override public InputT element() { return element; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java index 6e5bb3303c39..bd6e0d22a719 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java @@ -126,7 +126,7 @@ public static Timer cleared( */ public abstract @Nullable PaneInfo getPaneInfo(); - public abstract @Nullable CausedByDrain causedByDrain(); + public abstract CausedByDrain causedByDrain(); @Override public final boolean equals(@Nullable Object other) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index 30c06c5f0d9a..0d8b2f7515e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -123,6 +123,10 @@ public void encode(ValueInSingleWindow windowedElem, OutputStream outStream, BeamFnApi.Elements.ElementMetadata.Builder builder = BeamFnApi.Elements.ElementMetadata.newBuilder(); // todo #33176 specify additional metadata in the future + builder.setDrain( + windowedElem.getCausedByDrain() == CausedByDrain.CAUSED_BY_DRAIN + ? BeamFnApi.Elements.DrainMode.Enum.DRAINING + : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING); BeamFnApi.Elements.ElementMetadata metadata = builder.build(); ByteArrayCoder.of().encode(metadata.toByteArray(), outStream); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index ebe26be91c95..79f4b4a62ac2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -77,7 +77,10 @@ public static Builder builder(WindowedValue template) { .setValue(template.getValue()) .setTimestamp(template.getTimestamp()) .setWindows(template.getWindows()) - .setPaneInfo(template.getPaneInfo()); + .setPaneInfo(template.getPaneInfo()) + .setRecordOffset(template.getRecordOffset()) + .setRecordId(template.getRecordId()) + .setCausedByDrain(template.causedByDrain()); } public static class Builder implements OutputBuilder { @@ -145,6 +148,7 @@ public Builder setRecordOffset(@Nullable Long recordOffset) { @Override public Builder setCausedByDrain(CausedByDrain causedByDrain) { + checkStateNotNull(causedByDrain, "CausedByDrain is null"); this.causedByDrain = causedByDrain; return this; } @@ -199,6 +203,7 @@ public PaneInfo getPaneInfo() { @Override public CausedByDrain causedByDrain() { + checkStateNotNull(causedByDrain, "CausedByDrain not set"); return causedByDrain; } @@ -269,9 +274,16 @@ public static WindowedValue of( CausedByDrain causedByDrain) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); - + checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null"); if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain); + return of( + value, + timestamp, + windows.iterator().next(), + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain); } else { return new TimestampedValueInMultipleWindows<>( value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); @@ -287,7 +299,7 @@ static WindowedValue createWithoutValidation( PaneInfo paneInfo, CausedByDrain causedByDrain) { if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain); + return of(value, timestamp, windows.iterator().next(), paneInfo, null, null, causedByDrain); } else { return new TimestampedValueInMultipleWindows<>( value, timestamp, windows, paneInfo, null, null, causedByDrain); @@ -299,7 +311,7 @@ public static WindowedValue of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); - return of(value, timestamp, window, paneInfo, CausedByDrain.NORMAL); + return of(value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL); } /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ @@ -308,18 +320,22 @@ public static WindowedValue of( Instant timestamp, BoundedWindow window, PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset, CausedByDrain causedByDrain) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); + checkArgument(causedByDrain != null, "WindowedValue requires CausedByDrain, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { - return new ValueInGlobalWindow<>(value, paneInfo, null, null, causedByDrain); + return new ValueInGlobalWindow<>( + value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } else if (isGlobal) { return new TimestampedValueInGlobalWindow<>( - value, timestamp, paneInfo, null, null, causedByDrain); + value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } else { return new TimestampedValueInSingleWindow<>( - value, timestamp, window, paneInfo, null, null, causedByDrain); + value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java new file mode 100644 index 000000000000..5cfc3d3a1eb6 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.CausedByDrain; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowedValues; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +public class MetadataPropagationTest { + + @RunWith(JUnit4.class) + public static class MiscTest { + + /** Tests for metadata propagation. */ + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + static class CausedByDrainSettingDoFn extends DoFn { + @ProcessElement + public void process(OutputReceiver r) { + r.builder("value").setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output(); + } + } + + static class CausedByDrainExtractingDoFn extends DoFn { + @ProcessElement + public void process(ProcessContext pc, OutputReceiver r) { + r.output(pc.causedByDrain().toString()); + } + } + + @Test + @Category(NeedsRunner.class) + public void testMetadataPropagationAcrossShuffleParameter() { + WindowedValues.WindowedValueCoder.setMetadataSupported(); + PCollection results = + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new CausedByDrainSettingDoFn())) + .apply(Redistribute.arbitrarily()) + .apply(ParDo.of(new CausedByDrainExtractingDoFn())); + + PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN"); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMetadataPropagationParameter() { + PCollection results = + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new CausedByDrainSettingDoFn())) + .apply(ParDo.of(new CausedByDrainExtractingDoFn())); + + PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN"); + + pipeline.run(); + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 186d58e33189..e9269deb18ea 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -77,6 +77,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; @@ -561,6 +562,7 @@ public OutputBuilder builder(SomeRestriction value) { .setTimestamp(mockTimestamp) .setWindow(mockWindow) .setPaneInfo(PaneInfo.NO_FIRING) + .setCausedByDrain(CausedByDrain.NORMAL) .setReceiver(windowedValue -> outputs.add(windowedValue.getValue())); } }; @@ -801,6 +803,7 @@ public OutputBuilder builder(String value) { .setTimestamp(mockTimestamp) .setWindow(mockWindow) .setPaneInfo(PaneInfo.NO_FIRING) + .setCausedByDrain(CausedByDrain.NORMAL) .setReceiver( windowedValue -> { assertFalse(invoked); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index de69f49ecc3c..2eb5816a864f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -624,15 +624,18 @@ private void startBundle() { private void processElementForParDo(WindowedValue elem) { currentElement = elem; + causedByDrain = currentElement.causedByDrain(); try { doFnInvoker.invokeProcessElement(processContext); } finally { currentElement = null; + causedByDrain = null; } } private void processElementForWindowObservingParDo(WindowedValue elem) { currentElement = elem; + causedByDrain = currentElement.causedByDrain(); try { Iterator windowIterator = (Iterator) elem.getWindows().iterator(); @@ -643,12 +646,14 @@ private void processElementForWindowObservingParDo(WindowedValue elem) { } finally { currentElement = null; currentWindow = null; + causedByDrain = null; } } private void processElementForWindowObservingSizedElementAndRestriction( WindowedValue>, Double>> elem) { currentElement = elem.withValue(elem.getValue().getKey().getKey()); + causedByDrain = elem.causedByDrain(); windowCurrentIndex = -1; windowStopIndex = currentElement.getWindows().size(); currentWindows = ImmutableList.copyOf(currentElement.getWindows()); @@ -660,6 +665,7 @@ private void processElementForWindowObservingSizedElementAndRestriction( windowCurrentIndex = -1; windowStopIndex = 0; currentElement = null; + causedByDrain = null; currentWindows = null; currentRestriction = null; currentWatermarkEstimatorState = null; @@ -1202,7 +1208,8 @@ private void processTimer( checkNotNull(timerBundleTracker); try { currentKey = timer.getUserKey(); - causedByDrain = timer.causedByDrain(); + causedByDrain = Preconditions.checkNotNull(timer.causedByDrain()); + // add drain Iterator windowIterator = (Iterator) timer.getWindows().iterator(); @@ -1286,6 +1293,7 @@ private void processOnWindowExpiration(Timer timer) { try { currentKey = timer.getUserKey(); currentTimer = timer; + causedByDrain = timer.causedByDrain(); Iterator windowIterator = (Iterator) timer.getWindows().iterator(); while (windowIterator.hasNext()) { @@ -1296,6 +1304,7 @@ private void processOnWindowExpiration(Timer timer) { currentKey = null; currentTimer = null; currentWindow = null; + causedByDrain = null; } } @@ -1787,6 +1796,16 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp WindowedValues.of(output, timestamp, currentWindow, currentElement.getPaneInfo())); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @Override public void outputWindowedValue( TupleTag tag, @@ -1937,6 +1956,16 @@ public void outputWindowedValue( outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @Override public CausedByDrain causedByDrain() { return currentElement.causedByDrain(); @@ -2078,10 +2107,8 @@ public OutputBuilder builder(Row value) { .setReceiver( windowedRow -> ProcessBundleContextBase.this.outputWindowedValue( - fromRowFunction.apply(windowedRow.getValue()), - windowedRow.getTimestamp(), - windowedRow.getWindows(), - windowedRow.getPaneInfo())); + windowedRow.withValue( + fromRowFunction.apply(windowedRow.getValue())))); } }; @@ -2114,12 +2141,7 @@ public OutputBuilder builder(T value) { .withValue(value) .setReceiver( windowedValue -> - ProcessBundleContextBase.this.outputWindowedValue( - tag, - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + ProcessBundleContextBase.this.outputWindowedValue(tag, windowedValue)); } }; } @@ -2154,10 +2176,8 @@ public OutputBuilder builder(Row value) { windowedRow -> ProcessBundleContextBase.this.outputWindowedValue( tag, - fromRowFunction.apply(windowedRow.getValue()), - windowedRow.getTimestamp(), - windowedRow.getWindows(), - windowedRow.getPaneInfo())); + windowedRow.withValue( + fromRowFunction.apply(windowedRow.getValue())))); } }; } @@ -2304,7 +2324,7 @@ public OutputBuilder builder(OutputT value) { .setWindow(currentWindow) .setTimestamp(currentTimer.getHoldTimestamp()) .setPaneInfo(currentTimer.getPaneInfo()) - .setCausedByDrain(causedByDrain) + .setCausedByDrain(currentTimer.causedByDrain()) .setReceiver( windowedValue -> { checkOnWindowExpirationTimestamp(windowedValue.getTimestamp()); @@ -2325,7 +2345,10 @@ public void output(TupleTag tag, T output) { output, currentTimer.getHoldTimestamp(), currentWindow, - currentTimer.getPaneInfo())); + currentTimer.getPaneInfo(), + null, + null, + currentTimer.causedByDrain())); } @Override @@ -2354,6 +2377,16 @@ public void outputWindowedValue( outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @SuppressWarnings( "deprecation") // Allowed Skew is deprecated for users, but must be respected private void checkOnWindowExpirationTimestamp(Instant timestamp) { @@ -2426,10 +2459,8 @@ public OutputBuilder builder(Row value) { .setReceiver( windowedValue -> context.outputWindowedValue( - fromRowFunction.apply(windowedValue.getValue()), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + windowedValue.withValue( + fromRowFunction.apply(windowedValue.getValue())))); } }; @@ -2460,14 +2491,7 @@ public OutputBuilder builder(T value) { .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) .setCausedByDrain(causedByDrain) - .setReceiver( - windowedValue -> - context.outputWindowedValue( - tag, - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + .setReceiver(windowedValue -> context.outputWindowedValue(tag, windowedValue)); } }; } @@ -2502,10 +2526,8 @@ public OutputBuilder builder(Row value) { windowedValue -> context.outputWindowedValue( tag, - fromRowFunction.apply(windowedValue.getValue()), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + windowedValue.withValue( + fromRowFunction.apply(windowedValue.getValue())))); } }; } @@ -2644,6 +2666,16 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp WindowedValues.of(output, timestamp, currentWindow, currentTimer.getPaneInfo())); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @Override public void outputWindowedValue( TupleTag tag, @@ -2735,10 +2767,8 @@ public OutputBuilder builder(Row value) { .setReceiver( windowedValue -> context.outputWindowedValue( - fromRowFunction.apply(windowedValue.getValue()), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + windowedValue.withValue( + fromRowFunction.apply(windowedValue.getValue())))); } }; @@ -2768,15 +2798,9 @@ public OutputBuilder builder(T value) { .setValue(value) .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) - .setCausedByDrain(causedByDrain) + .setCausedByDrain(currentTimer.causedByDrain()) .setPaneInfo(currentTimer.getPaneInfo()) - .setReceiver( - windowedValue -> - context.outputWindowedValue( - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + .setReceiver(windowedValue -> context.outputWindowedValue(windowedValue)); } }; } @@ -2807,14 +2831,12 @@ public OutputBuilder builder(Row value) { .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) .setPaneInfo(currentTimer.getPaneInfo()) - .setCausedByDrain(causedByDrain) + .setCausedByDrain(currentTimer.causedByDrain()) .setReceiver( windowedValue -> context.outputWindowedValue( - fromRowFunction.apply(windowedValue.getValue()), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + windowedValue.withValue( + fromRowFunction.apply(windowedValue.getValue())))); } }; }