-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-39437][table] Support interruptible timers in PTFs #27962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1e440ae
b2dc501
d935a5f
9de3af1
a3ffef7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; | ||
| import org.apache.flink.streaming.api.operators.OneInputStreamOperator; | ||
| import org.apache.flink.streaming.api.operators.StreamOperatorParameters; | ||
| import org.apache.flink.streaming.api.watermark.Watermark; | ||
| import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; | ||
| import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; | ||
| import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; | ||
|
|
@@ -82,11 +83,21 @@ public void processElement(StreamRecord<RowData> element) throws Exception { | |
| 0, | ||
| element.getValue(), | ||
| inputSemantics.timeColumn(), | ||
| processTableRunner.getCurrentWatermark()); | ||
| combinedWatermark.getCombinedWatermark()); | ||
| } | ||
| processTableRunner.processEval(); | ||
| } | ||
|
|
||
| @Override | ||
| public void processWatermark(Watermark mark) throws Exception { | ||
| // Advance combinedWatermark before super.processWatermark(mark): super.processWatermark() | ||
| // fires due timers, which read the watermark via timeContext(). V2's OneInputStreamOperator | ||
| // watermark path doesn't update combinedWatermark itself — only reportWatermark() | ||
| // (multi-input) does — so we do it here. | ||
| combinedWatermark.updateWatermark(0, mark.getTimestamp()); | ||
| super.processWatermark(mark); | ||
|
Comment on lines
+93
to
+98
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about we simply call
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried this but it doesn't really work. |
||
| } | ||
|
|
||
| @Override | ||
| public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { | ||
| super.processWatermarkStatus(watermarkStatus, 1); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] final could be dropped on useInterruptibleTimers? For consistency, all seven analogous overrides (WindowOperator, TimeIntervalJoin, BaseTemporalSortOperator, CepOperator, and others) have the similar function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the final on purpose to prevent this function from being overwritten by extending classes.
AbstractProcessTableOperator is an internal class. If it becomes necessary for a child class to overwrite the the method, we can always remove the final.
Until then, it documents that all child classes must be able to handle interruptible timers.