Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public abstract class ProcessTableRunner extends AbstractRichFunction {

// Current time
private long tableWatermark = Long.MIN_VALUE;
private long currentWatermark = Long.MIN_VALUE;
private @Nullable Long rowtime;
private @Nullable StringData timerName;

Expand Down Expand Up @@ -131,10 +130,6 @@ public void ingestTimerEvent(RowData key, @Nullable StringData name, long timerT
tableWatermark = Long.MIN_VALUE;
}

public void ingestCurrentWatermarkEvent(long watermark) {
currentWatermark = watermark;
}

public void clearAllState() {
Arrays.fill(stateCleared, true);
}
Expand All @@ -143,10 +138,6 @@ public void clearState(int statePos) {
stateCleared[statePos] = true;
}

public long getCurrentWatermark() {
return currentWatermark;
}

public long getTableWatermark() {
return tableWatermark;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
Expand Down Expand Up @@ -145,11 +146,14 @@ public void open() throws Exception {
FunctionUtils.openFunction(processTableRunner, DefaultOpenContext.INSTANCE);
}

@Override
public final boolean useInterruptibleTimers(ReadableConfig config) {
Copy link
Copy Markdown
Contributor

@spuru9 spuru9 Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] final could be dropped on useInterruptibleTimers? For consistency, all seven analogous overrides (WindowOperator, TimeIntervalJoin, BaseTemporalSortOperator, CepOperator, and others) have the similar function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the final on purpose to prevent this function from being overwritten by extending classes.
AbstractProcessTableOperator is an internal class. If it becomes necessary for a child class to overwrite the the method, we can always remove the final.
Until then, it documents that all child classes must be able to handle interruptible timers.

return true;
}

@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
// TODO this line has issues with interruptible timers, see FLINK-39437
processTableRunner.ingestCurrentWatermarkEvent(mark.getTimestamp());
}

@Override
Expand Down Expand Up @@ -220,7 +224,7 @@ public <TimeType> TimeContext<TimeType> timeContext(Class<TimeType> conversionCl

internalTimeContext.setTime(
processTableRunner.getTableWatermark(),
processTableRunner.getCurrentWatermark(),
combinedWatermark.getCombinedWatermark(),
processTableRunner.getTime());

return (TimeContext<TimeType>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
Expand Down Expand Up @@ -82,11 +83,21 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
0,
element.getValue(),
inputSemantics.timeColumn(),
processTableRunner.getCurrentWatermark());
combinedWatermark.getCombinedWatermark());
}
processTableRunner.processEval();
}

@Override
public void processWatermark(Watermark mark) throws Exception {
// Advance combinedWatermark before super.processWatermark(mark): super.processWatermark()
// fires due timers, which read the watermark via timeContext(). V2's OneInputStreamOperator
// watermark path doesn't update combinedWatermark itself — only reportWatermark()
// (multi-input) does — so we do it here.
combinedWatermark.updateWatermark(0, mark.getTimestamp());
super.processWatermark(mark);
Comment on lines +93 to +98
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we simply call reportWatermark here? I think this fits better and also takes care of calling processWatermark. After that I think the comment is not necessary anymore, the code would be self explaining.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this but it doesn't really work.
reportWatermark() first updates the combinedWatermark but its call to processWatermark() again (recursively) is directed to the overriding ProcessRowTableOperator.processWatermark() which calls reportWatermark() again, but this time the combinedWatermark is already updated and we don't call processWatermark() again (otherwise we would run into a stack-overflow situation).
The consequence is that we never call AbstractStreamOperatorV2.processWatermark().

}

@Override
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
super.processWatermarkStatus(watermarkStatus, 1);
Expand Down
Loading