Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,26 @@ public <T> void outputWindowedValue(
element.causedByDrain()));
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator)
.observeTimestamp(windowedValue.getTimestamp());
}
outputReceiver.output(mainOutputTag, windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator)
.observeTimestamp(windowedValue.getTimestamp());
}
outputReceiver.output(tag, windowedValue);
}

private void noteOutput() {
checkState(!hasClaimFailed, "Output is not allowed after a failed tryClaim()");
checkState(numClaimedBlocks > 0, "Output is not allowed before tryClaim()");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
emit(
contextFactory.base(window, StateStyle.DIRECT),
contextFactory.base(window, StateStyle.RENAMED),
null);
CausedByDrain.NORMAL);
}

// We're all done with merging and emitting elements so can compress the activeWindow state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,18 @@ public <T> void output(TupleTag<T> tag, T output) {
SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output));
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
}

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
Expand Down Expand Up @@ -1027,6 +1039,18 @@ public <T> void outputWindowedValue(
.output();
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
checkTimestamp(timestamp(), windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
checkTimestamp(timestamp(), windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -1286,6 +1310,18 @@ public <T> void outputWindowedValue(
.output();
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.OutputBuilder;
import org.apache.beam.sdk.values.WindowedValues;
import org.joda.time.Instant;
Expand Down Expand Up @@ -54,6 +55,7 @@ public OutputBuilder<T> builder(T value) {
.setWindow(fakeWindow)
.setPaneInfo(PaneInfo.NO_FIRING)
.setTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE)
.setCausedByDrain(CausedByDrain.NORMAL)
.setReceiver(windowedValue -> records.add(windowedValue.getValue()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;
Expand Down Expand Up @@ -284,6 +285,10 @@ public abstract <T> void outputWindowedValue(
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo);

public abstract <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue);

public abstract void outputWindowedValue(WindowedValue<OutputT> windowedValue);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles I'm really curious if we are able to avoid that. You've spent quite a lot of time on outputbuilder work to avoid changing public interface and this method is exposed in ProcessContext so any user can use it if they want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes. The old interface between the SDK and runner v1 is that the runner v1 provides a ProcessContext that will route the outputs to the correct place. When we added OutputReceiver we just made it a wrapper instead of porting runner v1 to the new design fully.

I think it is still OK to make the change you have here. The OutputBuilder work was also to make sure that the metadata was propagated, so it is still necessary. If a user choose to make a new WindowedValue from scratch and output it... well, it is not ideal but it could be worse. TBH I wish to deprecate the use of Contexts by users (ever since https://s.apache.org/a-new-dofn) since it is a hopeless interface in some ways.

I think the only alternative is to adjust runner v1 (and other runners) to implement OutputReceiver directly as though ProcessContext (and friends) do not exist. Never invested in this because of runner v2 being the focus plus it didn't seem too bad.

}

/** Information accessible when running a {@link DoFn.ProcessElement} method. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,8 @@ public OutputBuilder<Row> builder(Row value) {
rowWithMetadata -> {
((DoFn<?, T>.WindowedContext) context)
.outputWindowedValue(
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()),
rowWithMetadata.getTimestamp(),
rowWithMetadata.getWindows(),
rowWithMetadata.getPaneInfo());
rowWithMetadata.withValue(
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue())));
});

} else {
Expand All @@ -84,10 +82,8 @@ public OutputBuilder<Row> builder(Row value) {
rowWithMetadata -> {
context.outputWindowedValue(
tag,
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue()),
rowWithMetadata.getTimestamp(),
rowWithMetadata.getWindows(),
rowWithMetadata.getPaneInfo());
rowWithMetadata.withValue(
schemaCoder.getFromRowFunction().apply(rowWithMetadata.getValue())));
});
}
}
Expand Down Expand Up @@ -120,19 +116,9 @@ public OutputBuilder<T> builder(T value) {
@Override
public void output(WindowedValue<T> windowedValue) {
if (outputTag != null) {
context.outputWindowedValue(
outputTag,
windowedValue.getValue(),
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPaneInfo());
context.outputWindowedValue(outputTag, windowedValue);
} else {
((DoFn<?, T>.WindowedContext) context)
.outputWindowedValue(
windowedValue.getValue(),
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPaneInfo());
((DoFn<?, T>.WindowedContext) context).outputWindowedValue(windowedValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,38 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
CausedByDrain.NORMAL));
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
for (BoundedWindow w : windowedValue.getWindows()) {
getMutableOutput(mainOutputTag)
.add(
ValueInSingleWindow.of(
windowedValue.getValue(),
windowedValue.getTimestamp(),
w,
windowedValue.getPaneInfo(),
windowedValue.getRecordId(),
windowedValue.getRecordOffset(),
windowedValue.causedByDrain()));
}
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
for (BoundedWindow w : windowedValue.getWindows()) {
getMutableOutput(tag)
.add(
ValueInSingleWindow.of(
windowedValue.getValue(),
windowedValue.getTimestamp(),
w,
windowedValue.getPaneInfo(),
windowedValue.getRecordId(),
windowedValue.getRecordOffset(),
windowedValue.causedByDrain()));
}
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public void processElement(
.setTimestamp(kv.getValue().getTimestamp())
.setWindow(kv.getValue().getWindow())
.setPaneInfo(kv.getValue().getPaneInfo())
.setCausedByDrain(kv.getValue().getCausedByDrain())
.output();
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -141,16 +142,24 @@ public PCollection<KV<K, ValueInSingleWindow<V>>> expand(PCollection<KV<K, V>> i
new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() {
@ProcessElement
public void processElement(
ProcessContext pc,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be getting the record metadata through magic parametrs, not process context

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have yet currentRecordId currentRecordOffset exposed in doFn. Let me prepare follow up PR to make it work.

@Element KV<K, V> element,
@DoFn.Timestamp Instant timestamp,
BoundedWindow window,
PaneInfo paneInfo,
CausedByDrain causedByDrain,
OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
r.output(
KV.of(
element.getKey(),
ValueInSingleWindow.of(
element.getValue(), timestamp, window, paneInfo)));
element.getValue(),
timestamp,
window,
paneInfo,
pc.currentRecordId(),
pc.currentRecordOffset(),
causedByDrain)));
}
}))
.setCoder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -560,13 +561,7 @@ public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
public OutputBuilder<OutputT> builder(OutputT value) {
return outputBuilderSupplier
.builder(value)
.setReceiver(
windowedValue ->
outerContext.outputWindowedValue(
windowedValue.getValue(),
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPaneInfo()));
.setReceiver(windowedValue -> outerContext.outputWindowedValue(windowedValue));
}
};
}
Expand All @@ -582,13 +577,7 @@ public OutputBuilder<T> builder(T value) {
return outputBuilderSupplier
.builder(value)
.setReceiver(
windowedValue ->
outerContext.outputWindowedValue(
tag,
windowedValue.getValue(),
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPaneInfo()));
windowedValue -> outerContext.outputWindowedValue(tag, windowedValue));
}
};
}
Expand Down Expand Up @@ -659,6 +648,16 @@ public <T> void outputWindowedValue(
outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo);
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
outerContext.outputWindowedValue(windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
outerContext.outputWindowedValue(tag, windowedValue);
}

@Override
public InputT element() {
return element;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static <K> Timer<K> cleared(
*/
public abstract @Nullable PaneInfo getPaneInfo();

public abstract @Nullable CausedByDrain causedByDrain();
public abstract CausedByDrain causedByDrain();

@Override
public final boolean equals(@Nullable Object other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream,
BeamFnApi.Elements.ElementMetadata.Builder builder =
BeamFnApi.Elements.ElementMetadata.newBuilder();
// todo #33176 specify additional metadata in the future
builder.setDrain(
windowedElem.getCausedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);
BeamFnApi.Elements.ElementMetadata metadata = builder.build();
ByteArrayCoder.of().encode(metadata.toByteArray(), outStream);
}
Expand Down
Loading
Loading