Skip to content

Adds bundleFinalizer support to Dataflow non-portable worker.#37723

Merged
scwhittle merged 17 commits intoapache:masterfrom
acrites:bundle-finalizer
Mar 20, 2026
Merged

Adds bundleFinalizer support to Dataflow non-portable worker.#37723
scwhittle merged 17 commits intoapache:masterfrom
acrites:bundle-finalizer

Conversation

@acrites
Copy link
Contributor

@acrites acrites commented Feb 26, 2026

This PR modifies the existing StreamingCommitFinalizer cache used for source checkpoint callbacks, but adds the ability to specify a callback expiration. This was done by mimic-ing the code in the portable runner's sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/FinalizeBundleHandler.java.

We needed to expose more already-existing fields in the Windmill proto for bundle finalization ids. Windmill is already reading from and populating these fields for the portable runner.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Dataflow non-portable worker by introducing robust support for bundle finalization. The changes enable the worker to register and manage callbacks that execute after a bundle's state has been successfully committed, complete with an expiration mechanism. This brings the non-portable runner closer in functionality to its portable counterpart, improving consistency and reliability in how bundle-level finalization tasks are handled in streaming pipelines.

Highlights

  • Bundle Finalizer Support: Introduced BundleFinalizer support within the SimpleDoFnRunner for StartBundle and FinishBundle contexts, allowing callbacks to be registered for execution upon bundle commit.
  • Enhanced StreamingCommitFinalizer: Refactored StreamingCommitFinalizer to manage bundle finalization callbacks with an expiration mechanism, mimicking the portable runner's FinalizeBundleHandler. This new implementation uses a PriorityQueue and ReentrantLock for efficient, thread-safe management and cleanup of expired callbacks.
  • Windmill Proto Updates: Extended the Windmill protocol buffers to include applied_finalize_ids in WorkItem and StreamingGetWorkResponseChunk messages, and finalize_ids in WorkItemCommitRequest, enabling the Dataflow worker to communicate bundle finalization IDs with the Windmill service.
  • Dataflow Worker Integration: Integrated the new bundle finalization logic into StreamingModeExecutionContext to collect and manage BundleFinalizer callbacks, and updated StreamingWorkScheduler to process these applied_finalize_ids received from Windmill.
  • Comprehensive Testing: Added new unit tests for SimpleParDoFn to verify BundleFinalizer functionality and introduced a dedicated test suite for the refactored StreamingCommitFinalizer to ensure correct behavior, including expiration and duplicate ID handling.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
    • Added bundleFinalizer() method to StartBundleContext and FinishBundleContext.
    • Updated UnsupportedOperationException messages for bundleFinalizer() in OnTimer and OnWindowExpiration contexts for clarity.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
    • Added necessary imports for ArrayList, BundleFinalizer, and Pair.
    • Modified flushState() to return Map<Long, Pair<Instant, Runnable>> and incorporate BundleFinalizer callbacks with expiration.
    • Updated flushState() to apply an expiration to checkpointMark.finalizeCheckpoint() callbacks.
    • Implemented bundleFinalizer() in DataflowStepContext.
    • Introduced WindmillBundleFinalizer as an inner class to manage BundleFinalizer.Callback instances with associated expiration times.
    • Added getBundleFinalizerCallbacks() and clearBundleFinalizerCallbacks() methods to StepContext.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
    • Added a new field appliedFinalizeIds to track bundle finalization IDs.
    • Updated the append method to collect appliedFinalizeIds from incoming chunks.
    • Corrected a minor typo in a Javadoc comment.
    • Modified the flushToWorkItem and create methods to pass and process appliedFinalizeIds when assembling WorkItem objects, and ensured appliedFinalizeIds are cleared after use.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
    • Completely refactored the class to use ReentrantLock, Condition, HashMap, and PriorityQueue for managing callbacks with expiration.
    • Removed the Guava Cache dependency and its associated default expiry mechanism.
    • Introduced an AutoValue class FinalizationInfo to encapsulate callback details and expiration.
    • Implemented a cleanupThreadBody() method for background processing and removal of expired callbacks.
    • Updated cacheCommitFinalizers to store callbacks with their expiration times and add them to the priority queue.
    • Modified finalizeCommits to safely remove callbacks from both the map and priority queue.
    • Added cleanupQueueSize() for testing purposes.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
    • Added a call to commitFinalizer.finalizeCommits(workItem.getAppliedFinalizeIdsList()) to process bundle finalization IDs.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
    • Added imports for AtomicInteger, DataflowStepContext, VarLongCoder, BundleFinalizer, Duration, Instant, and Mockito.
    • Introduced testBundleFinalizer() to validate the functionality of BundleFinalizer within SimpleParDoFn.
    • Added helper methods increaseBundleSuccessCount() and getBundleSuccessCount() for test assertions.
    • Defined WithBundleFinalizerDoFn as a test DoFn to demonstrate BundleFinalizer usage in StartBundle, ProcessElement, and FinishBundle.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java
    • Added a new test file StreamingCommitFinalizerTest.java to provide comprehensive unit tests for the refactored StreamingCommitFinalizer.
    • Included tests for initialization, caching callbacks, handling duplicate IDs, successful finalization, ignoring unknown IDs, and verifying cleanup of expired callbacks.
  • runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
    • Added repeated int64 applied_finalize_ids = 16; to the WorkItem message.
    • Added repeated int64 finalize_ids = 19 [packed = true]; to the WorkItemCommitRequest message, along with a description.
    • Added repeated int64 applied_finalize_ids = 6 [packed = true]; to the StreamingGetWorkResponseChunk message, along with a description.
    • Updated reserved field ranges in WorkItem, WorkItemCommitRequest, and StreamingGetWorkResponseChunk to accommodate new fields.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

…running on Dataflow streaming non-portable worker.
@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

},
0);
@Nullable FinalizationInfo info;
lock.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about locking once and pulling out all the callbacks and then executing them?

previously we were finalizing on the provided executor, it seems we should we that behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! I think I was worried about running the callbacks with the lock held, but that's not a problem. Also added back in running them on the executor instead of inline.

…callbacks in the given executor. This required changing tests since the callbacks are now running asynchronously.
}
}

// Only exposed for tests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does package private work? ie drop public

There is also the @VisibleForTest annotation for things like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can just drop public.

while (executor.elementsOutstanding() > 1) {
Thread.sleep(500);
}
verify(callback).run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of a mock runnable how about a runnable that counts down a CountDownLatch. then you can just block on that instead of using the less direct elementsOutstanding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

ImmutableMap.of(3L, Pair.of(Instant.now().plus(Duration.millis(100)), callback3)));

while (finalizer.cleanupQueueSize() > 1) {
// Wait until it expires
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: until the two short timeouts expire.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Mockito.mock(
DataflowStepContext.class,
invocation -> {
if (invocation.getMethod().getName().equals("bundleFinalizer")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this here? Why on both on the DataflowStepContext and DataflowExecutionContext.DataflowStepContext? Maybe some comments to help explain

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right. This test is only calling user bundle finalizers, so we only need it mocked on userStepContext.


// The counter increases by 1 in StartBundle, 5 in ProcessElement, and 1 in FinishBundle.
// Total should be 7.
assertThat(getBundleSuccessCount(), equalTo(7));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to just duplicate the Atomics and methods so that you can explicitly verify that start/ 5 process /finish are called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I duplicated the Atomics, but got rid of the accessor methods to reduce boilerplate.

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the testing exclusion here as well:

'org.apache.beam.sdk.testing.UsesBundleFinalizer',

You can modify https://github.com/apache/beam/blob/3197d88ef5c9f41371756c78b0efca8590b47165/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json to trigger these tests before submitting.

That is making me realize that you might still want to reject or filter batch jobs on non-portable runner that are attempting to use bundle finalizers. I don't think that it is worth trying to implement that for batch if it doesn't work since those users can just continue to use portable runner which is the default.

while (finalizer.cleanupQueueSize() > 1) {
// Wait until it expires
// Wait until the two 100ms timeouts expire.
Thread.sleep(500);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might as well make this 110 or something that will likely be enough and shorter, gradle doesn't have great parallelism

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Reduced most of the sleeps.

static int getBundleSuccessCount() {
return bundleSuccessCount.get();
}
private static final AtomicInteger startBundleCount = new AtomicInteger(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move these into WithBundleFinalizerDoFn to scope them better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@acrites
Copy link
Contributor Author

acrites commented Mar 4, 2026

I went ahead and added back in the check for batch, non-portable bundle finalizers.

@@ -102,14 +105,16 @@ private Optional<AssembledWorkItem> flushToWorkItem() {
workItemBuilder.build(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the appliedFinalizeIds to the work item builder before building instead of passing to create? or could remove the list and just add to the builder where we are currently adding to the list.

Otherwise I had a concern we were passing out mutable list outside this class. It is safe since we can that create below is just iterating and not keeping a reference to it but seems like we might as well do it here so it's clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. That seems a bit clearer. Now I don't even have to pass them to the AssembledWorkItem.

package org.apache.beam.runners.dataflow.worker.windmill.work.processing;

import java.time.Duration;
import static com.google.common.base.Preconditions.checkState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think this needs to be the vendored one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
for (Runnable callback : callbacksToExecute) {
try {
finalizationExecutor.execute(callback, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the 0? can just execute(runnable) method be called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to preserve the original behavior. It looks like I should actually call forceExecute(callback, 0) to match the previous behavior. This ignores the elementsOutstanding limit. I'll change it to that to keep things as similar to the original.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(BoundedQueueExecutor doesn't have a execute(runnable) method.) I wonder if, since all my calls to execute are forceExecute, we might consider changing to a completely different executor class. We could move to the simpler java.util.concurrent.ExecutorService. WDYT?

return checkNotNull(userTimerInternals);
}

public List<Pair<Instant, BundleFinalizer.Callback>> getBundleFinalizerCallbacks() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is returning mutable internal state, it seems we just call this before clearing though. How about making this a flushBundleFinalizerCallbacks method that combines both? Could use an ImmutableList.Builder and reset it only if non-empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I think I captured it correctly.

finalizer.cacheCommitFinalizers(
ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
finalizer.finalizeCommits(Collections.singletonList(2L));
while (executor.elementsOutstanding() > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you just assert it is 1 here instead of a loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
// We can call finalize even though these were already cleaned up.
finalizer.finalizeCommits(ImmutableList.of(2L, 3L));
while (executor.elementsOutstanding() > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could do this before the finalizeCommits above, and assert it is 1 after calling it instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

List<Pair<Instant, BundleFinalizer.Callback>> bundleFinalizers = new ArrayList<>();
for (StepContext stepContext : getAllStepContexts()) {
stepContext.flushState();
bundleFinalizers.addAll(stepContext.flushBundleFinalizerCallbacks());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it doesn't seem like we need bundleFinalizers all at once. We could save the allocation by getting rid of it and just iterate over the callbacks here and add them to the callbacks map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

@scwhittle
Copy link
Contributor

validates runners tests still has some errors: https://github.com/apache/beam/runs/67043286782

Also not sure if the StreamingDataflowWorkerTest errors are real. They could be because you are using the executor now for the background thread and some tests uses a fake executor.

One idea (other than fixing that fake impl) woudl be to make the background cleanup thread lazily created whenever the first finalization is added

dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [
name: 'validatesRunnerLegacyWorkerTestStreaming',
// Streaming appliance currently fails bundle finalizer tests.
excludedCategories: [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is overwriting the excluded categories instead of merging, and thus losing those from line 466

should be something like
excludedCategories: validatesRunnerStreamingConfig.excludedCategories + ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that was a silly mistake. It explains why all the Validates runner tests were failing...

…laredArtigfact build errors and fixes bug in ValidatesRunner streaming test which was not excluding all categories. Retriggering post submits.
@acrites
Copy link
Contributor Author

acrites commented Mar 19, 2026

Failing beam_PreCommit_Java test is from org.apache.beam.runners.fnexecution.logging.GrpcLoggingServiceTest. testMultipleClientsFailingIsHandledGracefullyByServer, so probably not related. ValidatesRunner post commit is now passing.

@scwhittle
Copy link
Contributor

Yes that failure looked unrelated, I sent out a PR yesterday to try to deflake it.

@scwhittle scwhittle merged commit 2209bd6 into apache:master Mar 20, 2026
24 of 26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants