From 8ed3e0e8a1d2bf288e4b9b4ce2c751fd51be4e65 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 16 Mar 2026 14:39:58 -0700 Subject: [PATCH 1/2] Adds a ValidatesRunner post-commit test suite using streaming engine. Previously we only tested the Java worker on appliance. --- ...datesRunner_Dataflow_Streaming_Engine.json | 4 + ...idatesRunner_Dataflow_Streaming_Engine.yml | 96 +++++++++++++++++++ .../google-cloud-dataflow-java/build.gradle | 30 ++++++ 3 files changed, 130 insertions(+) create mode 100644 .github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json create mode 100644 .github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json new file mode 100644 index 000000000000..e623d3373a93 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run!", + "modification": 1, +} diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml new file mode 100644 index 000000000000..99f67b3fb31c --- /dev/null +++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.yml @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: PostCommit Java ValidatesRunner Dataflow Streaming Engine + +on: + schedule: + - cron: '30 4/8 * * *' + pull_request_target: + paths: ['release/trigger_all_tests.json', '.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json'] + workflow_dispatch: + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming: + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + runs-on: [self-hosted, ubuntu-20.04, main] + timeout-minutes: 720 + strategy: + matrix: + job_name: [beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine] + job_phrase: [Run Dataflow Streaming Engine ValidatesRunner] + if: | + github.event_name == 'workflow_dispatch' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event.comment.body == 'Run Dataflow Streaming Engine ValidatesRunner' + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + with: + java-version: default + - name: run validatesRunnerStreaming script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :runners:google-cloud-dataflow-java:validatesRunnerStreamingEngine + max-workers: 12 + - name: Archive JUnit Test Results + uses: actions/upload-artifact@v4 + if: ${{ !success() }} + with: + name: JUnit Test Results + path: "**/build/reports/tests/" + - name: Publish JUnit Test Results + uses: EnricoMi/publish-unit-test-result-action@v2 + if: always() + with: + commit: '${{ env.prsha || env.GITHUB_SHA }}' + comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} + files: '**/build/test-results/**/*.xml' + large_files: true diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 3792626a1fdf..6b61b33a1c94 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -490,6 +490,36 @@ task validatesRunnerStreaming { )) } +task validatesRunnerStreamingEngine { + group = "Verification" + description "Validates Dataflow runner forcing streaming mode on streaming engine" + dependsOn(createLegacyWorkerValidatesRunnerTest( + name: 'validatesRunnerLegacyWorkerTestStreaming', + pipelineOptions: legacyPipelineOptions + ['--streaming', '--experiments=enable_streaming_engine'], + excludedCategories: [ + 'org.apache.beam.sdk.testing.UsesCommittedMetrics', + 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', + ], + excludedTests: [ + // TODO(https://github.com/apache/beam/issues/21472) + 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', + // GroupIntoBatches.withShardedKey not supported on Java streaming worker + // https://github.com/apache/beam/issues/22592 + 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', + + // These tests use static state and don't work with remote execution. + 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle', + 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful', + 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement', + 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful', + 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup', + 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful', + 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle', + 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful', + ] + )) +} + def setupXVR = tasks.register("setupXVR") { dependsOn buildAndPushDockerJavaContainer dependsOn buildAndPushDockerPythonContainer From 118a531f628a50e47dd3b90b5190e24a1266b390 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 16 Mar 2026 14:48:04 -0700 Subject: [PATCH 2/2] Merges in refactoring from https://github.com/apache/beam/commit/0699f3594e65912a311c91c93920930b7f4779b7 --- .../google-cloud-dataflow-java/build.gradle | 29 ++++--------------- 1 file changed, 5 insertions(+), 24 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 5221c9c23a7b..49de59fac32c 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -509,31 +509,12 @@ task validatesRunnerStreamingTagEncodingV2 { task validatesRunnerStreamingEngine { group = "Verification" description "Validates Dataflow runner forcing streaming mode on streaming engine" - dependsOn(createLegacyWorkerValidatesRunnerTest( - name: 'validatesRunnerLegacyWorkerTestStreaming', - pipelineOptions: legacyPipelineOptions + ['--streaming', '--experiments=enable_streaming_engine'], - excludedCategories: [ - 'org.apache.beam.sdk.testing.UsesCommittedMetrics', - 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', + dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [ + name: 'validatesRunnerLegacyWorkerTestStreamingEngine', + pipelineOptions: validatesRunnerStreamingConfig.pipelineOptions + [ + '--experiments=enable_streaming_engine', ], - excludedTests: [ - // TODO(https://github.com/apache/beam/issues/21472) - 'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState', - // GroupIntoBatches.withShardedKey not supported on Java streaming worker - // https://github.com/apache/beam/issues/22592 - 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow', - - // These tests use static state and don't work with remote execution. - 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle', - 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful', - 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement', - 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful', - 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup', - 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful', - 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle', - 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful', - ] - )) + ])) } def setupXVR = tasks.register("setupXVR") {