[FLINK-39513][checkpointing] Parameterize allowNonRestoredState in restoreInitialCheckpointIfPresent#27991
Conversation
…storeInitialCheckpointIfPresent
| @@ -186,7 +187,8 @@ public ExecutionGraph createAndRestoreExecutionGraph( | |||
| if (checkpointCoordinator != null) { | |||
| // check whether we find a valid checkpoint | |||
| if (!checkpointCoordinator.restoreInitialCheckpointIfPresent( | |||
| new HashSet<>(newExecutionGraph.getAllVertices().values()))) { | |||
| new HashSet<>(newExecutionGraph.getAllVertices().values()), | |||
| configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))) { | |||
There was a problem hiding this comment.
SavepointConfigOptions was removed as part of 2.0 release
flink/docs/content/release-notes/flink-2.0.md
Line 373 in 9373256
Can you check an alternative.
There was a problem hiding this comment.
I've moved to using StateRecoveryOptions instead, which is the replacement from 2.0.
| @@ -1113,7 +1113,7 @@ void testRestoreFinishedStateWithoutInFlightData() throws Exception { | |||
| .build(graph); | |||
|
|
|||
| ExecutionJobVertex vertex = graph.getJobVertex(jobVertexID); | |||
| coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex)); | |||
| coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex), false); | |||
There was a problem hiding this comment.
Can you add tests for allowNonRestoredState=true (mentioned in PR description).
There was a problem hiding this comment.
Forgot to push my amended commit which contained the tests.
There was a problem hiding this comment.
Are we waiting here something?
2b39a69 to
65c38c0
Compare
|
I'm not sure under which circumstances could this happen from user perspective. IIUC job fails and tries to restore from CP, right? In such situation the only thing what I can imagine where the operators are different is when SQL plan is different from god knows why. We've seen that same SQL end up in different plan. Can you elaborate please? |
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Until it's answered I leave my possible objection here to avoid accidents
The most common trigger (from what we have seen internally) is user code changes across deployments. This happens with the Flink Kubernetes Operator's
The asymmetry is the main issue, as there are two different outcomes depending on upgrade mode for the same job, and the same
This problem was also reported in FLINK-30638, where Gyula Fora correctly diagnosed it as a runtime issue in the comments. This PR finally addresses the bug. The SQL plan drift case would also hit the same code path. Essentially, anything that produces a mismatch between the checkpoint's operator ID set and the new JobGraph's operator IDs will trigger it. |
|
We've had a discussion with Gyula and taken a look at the surrounding operator code. Based on that now I see the picture and from directional perspective good to go. Will take a look at the details and take care of this PR |
What is the purpose of the change
CheckpointCoordinator.restoreInitialCheckpointIfPresenthardcodesallowNonRestoredState=false, so checkpoint restore on JobManager startup rejects state that cannot be mapped to any operator in the current JobGraph, regardless ofexecution.savepoint.ignore-unclaimed-state/execution.state-recovery.ignore-unclaimed-state.CheckpointCoordinator.restoreSavepointalready honors the flag via a parameter; this PR does the same for the checkpoint-restore path.Addresses FLINK-39513. See the ticket for history and full context.
Brief change log
CheckpointCoordinator.restoreInitialCheckpointIfPresenttakesallowNonRestoredStateas a parameter and forwards it torestoreLatestCheckpointedStateInternalinstead of hardcodingfalse.DefaultExecutionGraphFactory.createAndRestoreExecutionGraphreadsStateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATEfrom itsConfigurationand passes it in.Configurationis the source (rather thanjobGraph.getSavepointRestoreSettings()) becauseSavepointRestoreSettings.fromConfigurationreturns.none()when no savepoint path is set, which is always the case on this path.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation
Was generative AI tooling used to co-author this PR?