From 76152d6da743380f798bde5191b54d407f08698a Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 6 Mar 2026 14:45:04 +0000 Subject: [PATCH 1/5] add bigquery destination test --- .../beam_PostCommit_Yaml_Xlang_Direct.json | 2 +- .../bigquery_dynamic_destinations.yaml | 92 +++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml diff --git a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json index b5704c67ef1c..86bf1193abd9 100644 --- a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 6 + "revision": 7 } diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml new file mode 100644 index 000000000000..a5e7897eedd9 --- /dev/null +++ b/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: BQ_TABLE + type: "apache_beam.yaml.integration_tests.temp_bigquery_table" + config: + project: "apache-beam-testing" + +pipelines: + - pipeline: + type: chain + transforms: + - type: Create + name: CreateUsers + config: + elements: + - {id: 1, name: "Alice", country: "US"} + - {id: 2, name: "Bob", country: "UK"} + - {id: 3, name: "Charlie", country: "CN"} + - {id: 4, name: "David", country: "US"} + - type: WriteToBigQuery + name: WriteWithDynamicDestinations + config: + # NOTE: This will create 3 tables: BQ_TABLE_US, BQ_TABLE_UK, BQ_TABLE_CN + # This is because the table name is dynamically generated using the country field. + # The {{country}} syntax is used to dynamically generate the table name. + # For this testing example we are using {{country}} to dynamically generate the table name. + # In production, we would use {country} to dynamically generate the table name. + table: "{BQ_TABLE}_{{country}}" + create_disposition: CREATE_IF_NEEDED + write_disposition: WRITE_APPEND + options: + project: "apache-beam-testing" + temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" + + - pipeline: + type: chain + transforms: + - type: ReadFromBigQuery + config: + table: "{BQ_TABLE}_US" + - type: AssertEqual + config: + elements: + - {id: 1, name: "Alice", country: "US"} + - {id: 4, name: "David", country: "US"} + options: + project: "apache-beam-testing" + temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" + + - pipeline: + type: chain + transforms: + - type: ReadFromBigQuery + config: + table: "{BQ_TABLE}_UK" + - type: AssertEqual + config: + elements: + - {id: 2, name: "Bob", country: "UK"} + options: + project: "apache-beam-testing" + temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" + + - pipeline: + type: chain + transforms: + - type: ReadFromBigQuery + config: + table: "{BQ_TABLE}_CN" + - type: AssertEqual + config: + elements: + - {id: 3, name: "Charlie", country: "CN"} + options: + project: "apache-beam-testing" + temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" From 00c9bf6fb8aca062dc0c47972a6116b1f38a5527 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 16 Mar 2026 18:05:00 +0000 Subject: [PATCH 2/5] try postcommit test --- .github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json index 86bf1193abd9..273008f1b3ee 100644 --- a/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json +++ b/.github/trigger_files/beam_PostCommit_Yaml_Xlang_Direct.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 7 + "revision": 8 } From 85ce9e905889672fadb7080f1d4e14fbea8d1022 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 19 Mar 2026 19:50:25 +0000 Subject: [PATCH 3/5] comment out blocked writeToBQ part --- .../bigquery_dynamic_destinations.yaml | 109 +++++++++--------- 1 file changed, 55 insertions(+), 54 deletions(-) diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml index a5e7897eedd9..8083c860ebdb 100644 --- a/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml +++ b/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml @@ -33,60 +33,61 @@ pipelines: - {id: 2, name: "Bob", country: "UK"} - {id: 3, name: "Charlie", country: "CN"} - {id: 4, name: "David", country: "US"} - - type: WriteToBigQuery - name: WriteWithDynamicDestinations - config: - # NOTE: This will create 3 tables: BQ_TABLE_US, BQ_TABLE_UK, BQ_TABLE_CN - # This is because the table name is dynamically generated using the country field. - # The {{country}} syntax is used to dynamically generate the table name. - # For this testing example we are using {{country}} to dynamically generate the table name. - # In production, we would use {country} to dynamically generate the table name. - table: "{BQ_TABLE}_{{country}}" - create_disposition: CREATE_IF_NEEDED - write_disposition: WRITE_APPEND - options: - project: "apache-beam-testing" - temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" +# Blocked by https://github.com/apache/beam/issues/30950 + # - type: WriteToBigQuery + # name: WriteWithDynamicDestinations + # config: + # # NOTE: This will create 3 tables: BQ_TABLE_US, BQ_TABLE_UK, BQ_TABLE_CN + # # This is because the table name is dynamically generated using the country field. + # # The {{country}} syntax is used to dynamically generate the table name. + # # For this testing example we are using {{country}} to dynamically generate the table name. + # # In production, we would use {country} to dynamically generate the table name. + # table: "{BQ_TABLE}_{{country}}" + # create_disposition: CREATE_IF_NEEDED + # write_disposition: WRITE_APPEND + # options: + # project: "apache-beam-testing" + # temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" - - pipeline: - type: chain - transforms: - - type: ReadFromBigQuery - config: - table: "{BQ_TABLE}_US" - - type: AssertEqual - config: - elements: - - {id: 1, name: "Alice", country: "US"} - - {id: 4, name: "David", country: "US"} - options: - project: "apache-beam-testing" - temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" + # - pipeline: + # type: chain + # transforms: + # - type: ReadFromBigQuery + # config: + # table: "{BQ_TABLE}_US" + # - type: AssertEqual + # config: + # elements: + # - {id: 1, name: "Alice", country: "US"} + # - {id: 4, name: "David", country: "US"} + # options: + # project: "apache-beam-testing" + # temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" - - pipeline: - type: chain - transforms: - - type: ReadFromBigQuery - config: - table: "{BQ_TABLE}_UK" - - type: AssertEqual - config: - elements: - - {id: 2, name: "Bob", country: "UK"} - options: - project: "apache-beam-testing" - temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" + # - pipeline: + # type: chain + # transforms: + # - type: ReadFromBigQuery + # config: + # table: "{BQ_TABLE}_UK" + # - type: AssertEqual + # config: + # elements: + # - {id: 2, name: "Bob", country: "UK"} + # options: + # project: "apache-beam-testing" + # temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" - - pipeline: - type: chain - transforms: - - type: ReadFromBigQuery - config: - table: "{BQ_TABLE}_CN" - - type: AssertEqual - config: - elements: - - {id: 3, name: "Charlie", country: "CN"} - options: - project: "apache-beam-testing" - temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" + # - pipeline: + # type: chain + # transforms: + # - type: ReadFromBigQuery + # config: + # table: "{BQ_TABLE}_CN" + # - type: AssertEqual + # config: + # elements: + # - {id: 3, name: "Charlie", country: "CN"} + # options: + # project: "apache-beam-testing" + # temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" From 5b2290004c592a08bdb305f8c85931db098b2600 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 19 Mar 2026 20:41:55 +0000 Subject: [PATCH 4/5] add tmp location --- .../bigquery_dynamic_destinations.yaml | 114 +++++++++--------- 1 file changed, 59 insertions(+), 55 deletions(-) diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml index 8083c860ebdb..b5643c29106b 100644 --- a/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml +++ b/sdks/python/apache_beam/yaml/extended_tests/databases/bigquery_dynamic_destinations.yaml @@ -20,6 +20,11 @@ fixtures: type: "apache_beam.yaml.integration_tests.temp_bigquery_table" config: project: "apache-beam-testing" + - name: TEMP_DIR_0 + # Need distributed filesystem to be able to read and write from a container. + type: "apache_beam.yaml.integration_tests.gcs_temp_dir" + config: + bucket: "gs://temp-storage-for-end-to-end-tests/temp-it" pipelines: - pipeline: @@ -33,61 +38,60 @@ pipelines: - {id: 2, name: "Bob", country: "UK"} - {id: 3, name: "Charlie", country: "CN"} - {id: 4, name: "David", country: "US"} -# Blocked by https://github.com/apache/beam/issues/30950 - # - type: WriteToBigQuery - # name: WriteWithDynamicDestinations - # config: - # # NOTE: This will create 3 tables: BQ_TABLE_US, BQ_TABLE_UK, BQ_TABLE_CN - # # This is because the table name is dynamically generated using the country field. - # # The {{country}} syntax is used to dynamically generate the table name. - # # For this testing example we are using {{country}} to dynamically generate the table name. - # # In production, we would use {country} to dynamically generate the table name. - # table: "{BQ_TABLE}_{{country}}" - # create_disposition: CREATE_IF_NEEDED - # write_disposition: WRITE_APPEND - # options: - # project: "apache-beam-testing" - # temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" + - type: WriteToBigQuery + name: WriteWithDynamicDestinations + config: + # NOTE: This will create 3 tables: BQ_TABLE_US, BQ_TABLE_UK, BQ_TABLE_CN + # This is because the table name is dynamically generated using the country field. + # The {{country}} syntax is used to dynamically generate the table name. + # For this testing example we are using {{country}} to dynamically generate the table name. + # In production, we would use {country} to dynamically generate the table name. + table: "{BQ_TABLE}_{{country}}" + create_disposition: CREATE_IF_NEEDED + write_disposition: WRITE_APPEND + options: + project: "apache-beam-testing" + temp_location: "{TEMP_DIR_0}" - # - pipeline: - # type: chain - # transforms: - # - type: ReadFromBigQuery - # config: - # table: "{BQ_TABLE}_US" - # - type: AssertEqual - # config: - # elements: - # - {id: 1, name: "Alice", country: "US"} - # - {id: 4, name: "David", country: "US"} - # options: - # project: "apache-beam-testing" - # temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" + - pipeline: + type: chain + transforms: + - type: ReadFromBigQuery + config: + table: "{BQ_TABLE}_US" + - type: AssertEqual + config: + elements: + - {id: 1, name: "Alice", country: "US"} + - {id: 4, name: "David", country: "US"} + options: + project: "apache-beam-testing" + temp_location: "{TEMP_DIR_0}" - # - pipeline: - # type: chain - # transforms: - # - type: ReadFromBigQuery - # config: - # table: "{BQ_TABLE}_UK" - # - type: AssertEqual - # config: - # elements: - # - {id: 2, name: "Bob", country: "UK"} - # options: - # project: "apache-beam-testing" - # temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" + - pipeline: + type: chain + transforms: + - type: ReadFromBigQuery + config: + table: "{BQ_TABLE}_UK" + - type: AssertEqual + config: + elements: + - {id: 2, name: "Bob", country: "UK"} + options: + project: "apache-beam-testing" + temp_location: "{TEMP_DIR_0}" - # - pipeline: - # type: chain - # transforms: - # - type: ReadFromBigQuery - # config: - # table: "{BQ_TABLE}_CN" - # - type: AssertEqual - # config: - # elements: - # - {id: 3, name: "Charlie", country: "CN"} - # options: - # project: "apache-beam-testing" - # temp_location: "gs://temp-storage-for-end-to-end-tests/temp-it" + - pipeline: + type: chain + transforms: + - type: ReadFromBigQuery + config: + table: "{BQ_TABLE}_CN" + - type: AssertEqual + config: + elements: + - {id: 3, name: "Charlie", country: "CN"} + options: + project: "apache-beam-testing" + temp_location: "{TEMP_DIR_0}" From ed531aa764d0d1cbdeb25d77a3dacfb0296d44b0 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 20 Mar 2026 15:59:40 +0000 Subject: [PATCH 5/5] add BQ project detect and test --- .../bigquery/BigQueryStorageSourceBase.java | 24 ++++++-- .../bigquery/BigQueryIOStorageReadTest.java | 60 +++++++++++++++++++ 2 files changed, 79 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java index 45763c6ac14f..9c3ec7cdd39d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java @@ -156,13 +156,27 @@ public List> split( streamCount = Math.max(streamCount, MIN_SPLIT_COUNT); } + String project = + bqOptions.getBigQueryProject() == null + ? bqOptions.getProject() + : bqOptions.getBigQueryProject(); + if (project == null) { + if (targetTable != null + && targetTable.getTableReference() != null + && targetTable.getTableReference().getProjectId() != null) { + project = targetTable.getTableReference().getProjectId(); + } else { + @Nullable String tableReferenceId = getTargetTableId(bqOptions); + if (tableReferenceId != null) { + TableReference tableReference = BigQueryHelpers.parseTableUrn(tableReferenceId); + project = tableReference.getProjectId(); + } + } + } + CreateReadSessionRequest createReadSessionRequest = CreateReadSessionRequest.newBuilder() - .setParent( - BigQueryHelpers.toProjectResourceName( - bqOptions.getBigQueryProject() == null - ? bqOptions.getProject() - : bqOptions.getBigQueryProject())) + .setParent(BigQueryHelpers.toProjectResourceName(project)) .setReadSession(readSessionBuilder) .setMaxStreamCount(streamCount) .build(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java index 95f472f5c61b..24f3cefe4d12 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java @@ -1600,6 +1600,66 @@ public void testReadFromBigQueryIO() throws Exception { p.run(); } + @Test + public void testReadFromBigQueryIOWithFallbackProject() throws Exception { + fakeDatasetService.createDataset("fallback-project", "dataset", "", "", null); + TableReference tableRef = BigQueryHelpers.parseTableSpec("fallback-project:dataset.table"); + Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA); + fakeDatasetService.createTable(table); + + CreateReadSessionRequest expectedCreateReadSessionRequest = + CreateReadSessionRequest.newBuilder() + .setParent("projects/fallback-project") + .setReadSession( + ReadSession.newBuilder() + .setTable("projects/fallback-project/datasets/dataset/tables/table") + .setDataFormat(DataFormat.AVRO) + .setReadOptions(ReadSession.TableReadOptions.newBuilder())) + .setMaxStreamCount(10) + .build(); + + ReadSession readSession = + ReadSession.newBuilder() + .setName("readSessionName") + .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) + .addStreams(ReadStream.newBuilder().setName("streamName")) + .setDataFormat(DataFormat.AVRO) + .build(); + + ReadRowsRequest expectedReadRowsRequest = + ReadRowsRequest.newBuilder().setReadStream("streamName").build(); + + List records = Lists.newArrayList(createRecord("A", 1, AVRO_SCHEMA)); + + List readRowsResponses = + Lists.newArrayList(createResponse(AVRO_SCHEMA, records.subList(0, 1), 0.0, 1.0)); + + StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable()); + when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest)) + .thenReturn(readSession); + when(fakeStorageClient.readRows(expectedReadRowsRequest, "")) + .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses)); + + // Explicitly set the pipeline's project option to null to simulate missing + // cross-language parameters, and verify it uses the project from the TableReference. + options.as(BigQueryOptions.class).setProject(null); + + PCollection> output = + p.apply( + BigQueryIO.read(new ParseKeyValue()) + .from("fallback-project:dataset.table") + .withMethod(Method.DIRECT_READ) + .withFormat(DataFormat.AVRO) + .withTestServices( + new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withStorageClient(fakeStorageClient))); + + PAssert.that(output).containsInAnyOrder(ImmutableList.of(KV.of("A", 1L))); + + p.run(); + } + @Test public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception { fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);