Adds bundleFinalizer support to Dataflow non-portable worker.#37723
Adds bundleFinalizer support to Dataflow non-portable worker.#37723scwhittle merged 17 commits intoapache:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Dataflow non-portable worker by introducing robust support for bundle finalization. The changes enable the worker to register and manage callbacks that execute after a bundle's state has been successfully committed, complete with an expiration mechanism. This brings the non-portable runner closer in functionality to its portable counterpart, improving consistency and reliability in how bundle-level finalization tasks are handled in streaming pipelines. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
…running on Dataflow streaming non-portable worker.
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
| }, | ||
| 0); | ||
| @Nullable FinalizationInfo info; | ||
| lock.lock(); |
There was a problem hiding this comment.
how about locking once and pulling out all the callbacks and then executing them?
previously we were finalizing on the provided executor, it seems we should we that behavior?
There was a problem hiding this comment.
Good call! I think I was worried about running the callbacks with the lock held, but that's not a problem. Also added back in running them on the executor instead of inline.
…callbacks in the given executor. This required changing tests since the callbacks are now running asynchronously.
| } | ||
| } | ||
|
|
||
| // Only exposed for tests. |
There was a problem hiding this comment.
does package private work? ie drop public
There is also the @VisibleForTest annotation for things like this
There was a problem hiding this comment.
Looks like we can just drop public.
...org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
Show resolved
Hide resolved
...ache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java
Show resolved
Hide resolved
| while (executor.elementsOutstanding() > 1) { | ||
| Thread.sleep(500); | ||
| } | ||
| verify(callback).run(); |
There was a problem hiding this comment.
instead of a mock runnable how about a runnable that counts down a CountDownLatch. then you can just block on that instead of using the less direct elementsOutstanding
...ache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java
Show resolved
Hide resolved
| ImmutableMap.of(3L, Pair.of(Instant.now().plus(Duration.millis(100)), callback3))); | ||
|
|
||
| while (finalizer.cleanupQueueSize() > 1) { | ||
| // Wait until it expires |
There was a problem hiding this comment.
nit: until the two short timeouts expire.
| Mockito.mock( | ||
| DataflowStepContext.class, | ||
| invocation -> { | ||
| if (invocation.getMethod().getName().equals("bundleFinalizer")) { |
There was a problem hiding this comment.
Do we need this here? Why on both on the DataflowStepContext and DataflowExecutionContext.DataflowStepContext? Maybe some comments to help explain
There was a problem hiding this comment.
I think you're right. This test is only calling user bundle finalizers, so we only need it mocked on userStepContext.
|
|
||
| // The counter increases by 1 in StartBundle, 5 in ProcessElement, and 1 in FinishBundle. | ||
| // Total should be 7. | ||
| assertThat(getBundleSuccessCount(), equalTo(7)); |
There was a problem hiding this comment.
maybe better to just duplicate the Atomics and methods so that you can explicitly verify that start/ 5 process /finish are called.
There was a problem hiding this comment.
I duplicated the Atomics, but got rid of the accessor methods to reduce boilerplate.
...low-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
Show resolved
Hide resolved
scwhittle
left a comment
There was a problem hiding this comment.
Remove the testing exclusion here as well:
You can modify https://github.com/apache/beam/blob/3197d88ef5c9f41371756c78b0efca8590b47165/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json to trigger these tests before submitting.
That is making me realize that you might still want to reject or filter batch jobs on non-portable runner that are attempting to use bundle finalizers. I don't think that it is worth trying to implement that for batch if it doesn't work since those users can just continue to use portable runner which is the default.
...g/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
Show resolved
Hide resolved
| while (finalizer.cleanupQueueSize() > 1) { | ||
| // Wait until it expires | ||
| // Wait until the two 100ms timeouts expire. | ||
| Thread.sleep(500); |
There was a problem hiding this comment.
nit: might as well make this 110 or something that will likely be enough and shorter, gradle doesn't have great parallelism
There was a problem hiding this comment.
Done. Reduced most of the sleeps.
| static int getBundleSuccessCount() { | ||
| return bundleSuccessCount.get(); | ||
| } | ||
| private static final AtomicInteger startBundleCount = new AtomicInteger(0); |
There was a problem hiding this comment.
nit: move these into WithBundleFinalizerDoFn to scope them better?
|
I went ahead and added back in the check for batch, non-portable bundle finalizers. |
| @@ -102,14 +105,16 @@ private Optional<AssembledWorkItem> flushToWorkItem() { | |||
| workItemBuilder.build(), | |||
There was a problem hiding this comment.
add the appliedFinalizeIds to the work item builder before building instead of passing to create? or could remove the list and just add to the builder where we are currently adding to the list.
Otherwise I had a concern we were passing out mutable list outside this class. It is safe since we can that create below is just iterating and not keeping a reference to it but seems like we might as well do it here so it's clearer.
There was a problem hiding this comment.
Nice. That seems a bit clearer. Now I don't even have to pass them to the AssembledWorkItem.
| package org.apache.beam.runners.dataflow.worker.windmill.work.processing; | ||
|
|
||
| import java.time.Duration; | ||
| import static com.google.common.base.Preconditions.checkState; |
There was a problem hiding this comment.
think this needs to be the vendored one?
| } | ||
| for (Runnable callback : callbacksToExecute) { | ||
| try { | ||
| finalizationExecutor.execute(callback, 0); |
There was a problem hiding this comment.
why the 0? can just execute(runnable) method be called?
There was a problem hiding this comment.
I was trying to preserve the original behavior. It looks like I should actually call forceExecute(callback, 0) to match the previous behavior. This ignores the elementsOutstanding limit. I'll change it to that to keep things as similar to the original.
There was a problem hiding this comment.
(BoundedQueueExecutor doesn't have a execute(runnable) method.) I wonder if, since all my calls to execute are forceExecute, we might consider changing to a completely different executor class. We could move to the simpler java.util.concurrent.ExecutorService. WDYT?
...g/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
Show resolved
Hide resolved
| return checkNotNull(userTimerInternals); | ||
| } | ||
|
|
||
| public List<Pair<Instant, BundleFinalizer.Callback>> getBundleFinalizerCallbacks() { |
There was a problem hiding this comment.
this is returning mutable internal state, it seems we just call this before clearing though. How about making this a flushBundleFinalizerCallbacks method that combines both? Could use an ImmutableList.Builder and reset it only if non-empty
There was a problem hiding this comment.
Thanks for the suggestion! I think I captured it correctly.
| finalizer.cacheCommitFinalizers( | ||
| ImmutableMap.of(1L, Pair.of(Instant.now().plus(Duration.standardHours(1)), callback))); | ||
| finalizer.finalizeCommits(Collections.singletonList(2L)); | ||
| while (executor.elementsOutstanding() > 1) { |
There was a problem hiding this comment.
can you just assert it is 1 here instead of a loop?
| } | ||
| // We can call finalize even though these were already cleaned up. | ||
| finalizer.finalizeCommits(ImmutableList.of(2L, 3L)); | ||
| while (executor.elementsOutstanding() > 1) { |
There was a problem hiding this comment.
could do this before the finalizeCommits above, and assert it is 1 after calling it instead
| List<Pair<Instant, BundleFinalizer.Callback>> bundleFinalizers = new ArrayList<>(); | ||
| for (StepContext stepContext : getAllStepContexts()) { | ||
| stepContext.flushState(); | ||
| bundleFinalizers.addAll(stepContext.flushBundleFinalizerCallbacks()); |
There was a problem hiding this comment.
nit: it doesn't seem like we need bundleFinalizers all at once. We could save the allocation by getting rid of it and just iterate over the callbacks here and add them to the callbacks map
…. Adds missing bundleFinalizer from SDF. Retriggering ValidatesRunner tests.
|
validates runners tests still has some errors: https://github.com/apache/beam/runs/67043286782 Also not sure if the StreamingDataflowWorkerTest errors are real. They could be because you are using the executor now for the background thread and some tests uses a fake executor. One idea (other than fixing that fake impl) woudl be to make the background cleanup thread lazily created whenever the first finalization is added |
… and pipelines that don't have any callbacks.
…idates runner tests.
| dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [ | ||
| name: 'validatesRunnerLegacyWorkerTestStreaming', | ||
| // Streaming appliance currently fails bundle finalizer tests. | ||
| excludedCategories: [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ], |
There was a problem hiding this comment.
I think this is overwriting the excluded categories instead of merging, and thus losing those from line 466
should be something like
excludedCategories: validatesRunnerStreamingConfig.excludedCategories + ...
There was a problem hiding this comment.
Thanks, that was a silly mistake. It explains why all the Validates runner tests were failing...
…laredArtigfact build errors and fixes bug in ValidatesRunner streaming test which was not excluding all categories. Retriggering post submits.
|
Failing beam_PreCommit_Java test is from |
|
Yes that failure looked unrelated, I sent out a PR yesterday to try to deflake it. |
This PR modifies the existing StreamingCommitFinalizer cache used for source checkpoint callbacks, but adds the ability to specify a callback expiration. This was done by mimic-ing the code in the portable runner's
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/FinalizeBundleHandler.java.We needed to expose more already-existing fields in the Windmill proto for bundle finalization ids. Windmill is already reading from and populating these fields for the portable runner.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.