Skip to content

[FLINK-39513][checkpointing] Parameterize allowNonRestoredState in restoreInitialCheckpointIfPresent#27991

Open
hemmatio wants to merge 1 commit intoapache:masterfrom
Shopify:hemmatio-fix-allow-non-restored-state
Open

[FLINK-39513][checkpointing] Parameterize allowNonRestoredState in restoreInitialCheckpointIfPresent#27991
hemmatio wants to merge 1 commit intoapache:masterfrom
Shopify:hemmatio-fix-allow-non-restored-state

Conversation

@hemmatio
Copy link
Copy Markdown

@hemmatio hemmatio commented Apr 21, 2026

What is the purpose of the change

CheckpointCoordinator.restoreInitialCheckpointIfPresent hardcodes allowNonRestoredState=false, so checkpoint restore on JobManager startup rejects state that cannot be mapped to any operator in the current JobGraph, regardless of execution.savepoint.ignore-unclaimed-state / execution.state-recovery.ignore-unclaimed-state. CheckpointCoordinator.restoreSavepoint already honors the flag via a parameter; this PR does the same for the checkpoint-restore path.

Addresses FLINK-39513. See the ticket for history and full context.

Brief change log

  • CheckpointCoordinator.restoreInitialCheckpointIfPresent takes allowNonRestoredState as a parameter and forwards it to restoreLatestCheckpointedStateInternal instead of hardcoding false.
  • DefaultExecutionGraphFactory.createAndRestoreExecutionGraph reads StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE from its Configuration and passes it in. Configuration is the source (rather than jobGraph.getSavepointRestoreSettings()) because SavepointRestoreSettings.fromConfiguration returns .none() when no savepoint path is set, which is always the case on this path.

Verifying this change

This change added tests and can be verified as follows:

  • Added a pair of tests in CheckpointCoordinatorRestoringTest covering both allowNonRestoredState=true (orphaned state silently skipped) and allowNonRestoredState=false (throws IllegalStateException, preserving existing strict behavior).
  • Updated two existing CheckpointCoordinatorRestoringTest call sites to pass false explicitly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes (Checkpointing)
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes (Claude Code)

@hemmatio hemmatio changed the title Parameterize allowNonRestoredState in restoreInitialCHeckpointIfPresent [FLINK-39513][checkpointing] Parameterize allowNonRestoredState in restoreInitialCheckpointIfPresent Apr 21, 2026
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 21, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@@ -186,7 +187,8 @@ public ExecutionGraph createAndRestoreExecutionGraph(
if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
new HashSet<>(newExecutionGraph.getAllVertices().values()),
configuration.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SavepointConfigOptions was removed as part of 2.0 release

- `org.apache.flink.runtime.jobgraph.SavepointConfigOptions`

Can you check an alternative.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved to using StateRecoveryOptions instead, which is the replacement from 2.0.

@@ -1113,7 +1113,7 @@ void testRestoreFinishedStateWithoutInFlightData() throws Exception {
.build(graph);

ExecutionJobVertex vertex = graph.getJobVertex(jobVertexID);
coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex));
coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex), false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add tests for allowNonRestoredState=true (mentioned in PR description).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to push my amended commit which contained the tests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we waiting here something?

@hemmatio hemmatio force-pushed the hemmatio-fix-allow-non-restored-state branch from 2b39a69 to 65c38c0 Compare April 21, 2026 20:43
@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Apr 22, 2026
@gaborgsomogyi
Copy link
Copy Markdown
Contributor

I'm not sure under which circumstances could this happen from user perspective. IIUC job fails and tries to restore from CP, right? In such situation the only thing what I can imagine where the operators are different is when SQL plan is different from god knows why. We've seen that same SQL end up in different plan. Can you elaborate please?

Copy link
Copy Markdown
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until it's answered I leave my possible objection here to avoid accidents

@hemmatio
Copy link
Copy Markdown
Author

@gaborgsomogyi: Can you elaborate please?

The most common trigger (from what we have seen internally) is user code changes across deployments. This happens with the Flink Kubernetes Operator's last-state upgrade mode as follows:

  1. The user modifies their job (adds/removes/renames an operator, which is the standard use case of allowNonRestoredState)
  2. The operator performs a last-state upgrade: Tears down the cluster without taking a fresh savepoint, relying on checkpoint metadata for the restore.
  3. If the prior JM's shutdown was not graceful (ex: crash during upgrade, OOM, pod eviction), the per-job HA ConfigMap survives. The new JM rebuilds the graph from the modified user code, finds the old checkpoint, and hits the state that doesn't map to any operator in the new graph.
  4. restoreInitialCheckpointIfPresent rejects it via the hardcoded false, regardless of the value of execution.state-recovery.ignore-unclaimed-state.

The asymmetry is the main issue, as there are two different outcomes depending on upgrade mode for the same job, and the same allowNonRestoredState: true:

  • upgradeMode: savepoint: works, orphaned state skipped, job restores
  • upgradeMode: last-state: fails with IllegalStateException: There is no operator for the state ...

This problem was also reported in FLINK-30638, where Gyula Fora correctly diagnosed it as a runtime issue in the comments. This PR finally addresses the bug.


The SQL plan drift case would also hit the same code path. Essentially, anything that produces a mismatch between the checkpoint's operator ID set and the new JobGraph's operator IDs will trigger it.

@gaborgsomogyi
Copy link
Copy Markdown
Contributor

We've had a discussion with Gyula and taken a look at the surrounding operator code. Based on that now I see the picture and from directional perspective good to go. Will take a look at the details and take care of this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants