-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Fixes metadata propagation from doFn output to next parDo by introducing new interface and plumbing propagation #37851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,6 +51,7 @@ | |
| import org.apache.beam.sdk.values.Row; | ||
| import org.apache.beam.sdk.values.TupleTag; | ||
| import org.apache.beam.sdk.values.TypeDescriptor; | ||
| import org.apache.beam.sdk.values.WindowedValue; | ||
| import org.apache.beam.sdk.values.WindowingStrategy; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
| import org.checkerframework.dataflow.qual.Pure; | ||
|
|
@@ -284,6 +285,10 @@ public abstract <T> void outputWindowedValue( | |
| Instant timestamp, | ||
| Collection<? extends BoundedWindow> windows, | ||
| PaneInfo paneInfo); | ||
|
|
||
| public abstract <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue); | ||
|
|
||
| public abstract void outputWindowedValue(WindowedValue<OutputT> windowedValue); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kennknowles I'm really curious if we are able to avoid that. You've spent quite a lot of time on outputbuilder work to avoid changing public interface and this method is exposed in ProcessContext so any user can use it if they want.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, yes. The old interface between the SDK and runner v1 is that the runner v1 provides a ProcessContext that will route the outputs to the correct place. When we added OutputReceiver we just made it a wrapper instead of porting runner v1 to the new design fully. I think it is still OK to make the change you have here. The OutputBuilder work was also to make sure that the metadata was propagated, so it is still necessary. If a user choose to make a new WindowedValue from scratch and output it... well, it is not ideal but it could be worse. TBH I wish to deprecate the use of Contexts by users (ever since https://s.apache.org/a-new-dofn) since it is a hopeless interface in some ways. I think the only alternative is to adjust runner v1 (and other runners) to implement OutputReceiver directly as though ProcessContext (and friends) do not exist. Never invested in this because of runner v2 being the focus plus it didn't seem too bad. |
||
| } | ||
|
|
||
| /** Information accessible when running a {@link DoFn.ProcessElement} method. */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import org.apache.beam.sdk.coders.VoidCoder; | ||
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; | ||
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; | ||
| import org.apache.beam.sdk.values.CausedByDrain; | ||
| import org.apache.beam.sdk.values.KV; | ||
| import org.apache.beam.sdk.values.PBegin; | ||
| import org.apache.beam.sdk.values.PCollection; | ||
|
|
@@ -141,16 +142,24 @@ public PCollection<KV<K, ValueInSingleWindow<V>>> expand(PCollection<KV<K, V>> i | |
| new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() { | ||
| @ProcessElement | ||
| public void processElement( | ||
| ProcessContext pc, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be getting the record metadata through magic parametrs, not process context
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have yet currentRecordId currentRecordOffset exposed in doFn. Let me prepare follow up PR to make it work. |
||
| @Element KV<K, V> element, | ||
| @DoFn.Timestamp Instant timestamp, | ||
| BoundedWindow window, | ||
| PaneInfo paneInfo, | ||
| CausedByDrain causedByDrain, | ||
| OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) { | ||
| r.output( | ||
| KV.of( | ||
| element.getKey(), | ||
| ValueInSingleWindow.of( | ||
| element.getValue(), timestamp, window, paneInfo))); | ||
| element.getValue(), | ||
| timestamp, | ||
| window, | ||
| paneInfo, | ||
| pc.currentRecordId(), | ||
| pc.currentRecordOffset(), | ||
| causedByDrain))); | ||
| } | ||
| })) | ||
| .setCoder( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.