From f847ba5c94530fcaa452066e9c3ebd12df527ef0 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Mon, 26 Jan 2026 00:32:15 +0200 Subject: [PATCH 1/7] Set quota project in beam.io.ReadFromBigQuery in Python SDK --- CHANGES.md | 1 + sdks/python/apache_beam/internal/gcp/auth.py | 41 ++++ .../apache_beam/internal/gcp/auth_test.py | 60 +++++ sdks/python/apache_beam/io/gcp/bigquery.py | 86 +++++++- .../apache_beam/io/gcp/bigquery_test.py | 207 ++++++++++++++++++ .../apache_beam/io/gcp/bigquery_tools.py | 81 ++++++- .../apache_beam/io/gcp/bigquery_tools_test.py | 75 ++++++- .../apache_beam/options/pipeline_options.py | 8 + 8 files changed, 528 insertions(+), 31 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e20194f9ab09..43c85788d33d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -82,6 +82,7 @@ * (YAML) Added WriteToDatadog transform ([#38362](https://github.com/apache/beam/issues/38362)). * (Java) Flink 2.1 and 2.2 support is added ([#38947](https://github.com/apache/beam/issues/38947)) ([#38978](https://github.com/apache/beam/issues/38978)); Flink 1.17 and 1.18 support is dropped. * (Python) MqttIO is now supported in Python via cross-language ([#21060](https://github.com/apache/beam/issues/21060)). +* Added support for setting quota project ID in BigQuery read operations via `--quota_project_id` pipeline option or `quota_project_id` parameter in ReadFromBigQuery transform (Python) ([#37431](https://github.com/apache/beam/issues/37431)). ## Breaking Changes diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 168d6aa26939..9a848ea430be 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -82,6 +82,47 @@ def get_service_credentials(pipeline_options): return _Credentials.get_service_credentials(pipeline_options) +def with_quota_project(credentials, quota_project_id): + """For internal use only; no backwards-compatibility guarantees. + + Apply a quota project to credentials if supported. + + The quota project is used to bill API requests to a specific GCP project, + separate from the project that owns the service account or data. + + Args: + credentials: The credentials object (either _ApitoolsCredentialsAdapter + or a google.auth credentials object). + quota_project_id: The GCP project ID to use for quota and billing. + + Returns: + Credentials with the quota project applied, or the original credentials + if quota project is not supported or credentials is None. + """ + if credentials is None or quota_project_id is None: + return credentials + + # Get the underlying google-auth credentials if wrapped + if hasattr(credentials, 'get_google_auth_credentials'): + underlying_creds = credentials.get_google_auth_credentials() + else: + underlying_creds = credentials + + # Apply quota project if supported + if hasattr(underlying_creds, 'with_quota_project'): + new_creds = underlying_creds.with_quota_project(quota_project_id) + # Re-wrap if the original was wrapped + if hasattr(credentials, 'get_google_auth_credentials'): + return _ApitoolsCredentialsAdapter(new_creds) + return new_creds + + _LOGGER.warning( + 'Credentials of type %s do not support quota project. ' + 'The quota_project_id parameter will be ignored.', + type(underlying_creds).__name__) + return credentials + + if _GOOGLE_AUTH_AVAILABLE: class _ApitoolsCredentialsAdapter: diff --git a/sdks/python/apache_beam/internal/gcp/auth_test.py b/sdks/python/apache_beam/internal/gcp/auth_test.py index fe16acc3c089..1f811a16287e 100644 --- a/sdks/python/apache_beam/internal/gcp/auth_test.py +++ b/sdks/python/apache_beam/internal/gcp/auth_test.py @@ -132,5 +132,65 @@ def raise_(scopes=None): auth._LOGGER.removeHandler(loggerHandler) +@unittest.skipIf(gauth is None, 'Google Auth dependencies are not installed') +class WithQuotaProjectTest(unittest.TestCase): + """Tests for with_quota_project function.""" + def test_with_quota_project_returns_credentials_unchanged_when_none(self): + """Test that None credentials are returned unchanged.""" + result = auth.with_quota_project(None, 'my-project') + self.assertIsNone(result) + + def test_with_quota_project_returns_credentials_unchanged_when_no_quota(self): + """Test that credentials are returned unchanged when + quota_project_id is None.""" + mock_creds = mock.MagicMock() + result = auth.with_quota_project(mock_creds, None) + self.assertEqual(result, mock_creds) + mock_creds.with_quota_project.assert_not_called() + + @mock.patch('apache_beam.internal.gcp.auth._ApitoolsCredentialsAdapter') + def test_with_quota_project_applies_quota_to_wrapped_credentials( + self, mock_adapter_class): + """Test that quota project is applied to wrapped credentials.""" + mock_inner_creds = mock.MagicMock() + mock_new_creds = mock.MagicMock() + mock_inner_creds.with_quota_project.return_value = mock_new_creds + + mock_adapter = mock.MagicMock() + mock_adapter.get_google_auth_credentials.return_value = mock_inner_creds + + mock_adapter_instance = mock.MagicMock() + mock_adapter_class.return_value = mock_adapter_instance + + result = auth.with_quota_project(mock_adapter, 'my-billing-project') + + mock_inner_creds.with_quota_project.assert_called_once_with( + 'my-billing-project') + # Result should be a new adapter wrapping the new credentials + mock_adapter_class.assert_called_once_with(mock_new_creds) + self.assertEqual(result, mock_adapter_instance) + + def test_with_quota_project_applies_quota_to_direct_credentials(self): + """Test that quota project is applied to direct credentials.""" + mock_creds = mock.MagicMock(spec=['with_quota_project']) + mock_new_creds = mock.MagicMock() + mock_creds.with_quota_project.return_value = mock_new_creds + + result = auth.with_quota_project(mock_creds, 'my-billing-project') + + mock_creds.with_quota_project.assert_called_once_with('my-billing-project') + self.assertEqual(result, mock_new_creds) + + def test_with_quota_project_returns_original_when_not_supported(self): + """Test that original credentials are returned when + with_quota_project is not supported.""" + # Create a mock without with_quota_project method + mock_creds = mock.MagicMock(spec=[]) + + result = auth.with_quota_project(mock_creds, 'my-billing-project') + + self.assertEqual(result, mock_creds) + + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index a2d17f12569e..6e9908724895 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -375,6 +375,7 @@ def chain_after(result): import apache_beam as beam from apache_beam import coders from apache_beam import pvalue +from apache_beam.internal.gcp import auth from apache_beam.internal.gcp.json_value import from_json_value from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io import range_trackers @@ -659,7 +660,8 @@ def __init__( step_name=None, unique_id=None, temp_dataset=None, - query_priority=BigQueryQueryPriority.BATCH): + query_priority=BigQueryQueryPriority.BATCH, + quota_project_id=None): if table is not None and query is not None: raise ValueError( 'Both a BigQuery table and a query were specified.' @@ -693,6 +695,7 @@ def __init__( self.use_json_exports = use_json_exports self.temp_dataset = temp_dataset self.query_priority = query_priority + self.quota_project_id = quota_project_id self._job_name = job_name or 'BQ_EXPORT_JOB' self._step_name = step_name self._source_uuid = unique_id @@ -712,6 +715,7 @@ def display_data(self): 'use_legacy_sql': self.use_legacy_sql, 'bigquery_job_labels': json.dumps(self.bigquery_job_labels), 'export_file_format': export_format, + 'quota_project_id': self._get_quota_project_id() or '', 'launchesBigQueryJobs': DisplayDataItem( True, label="This Dataflow job launches bigquery jobs."), } @@ -779,6 +783,18 @@ def _get_project(self): project = self.project return project + def _get_quota_project_id(self): + """Returns the quota project ID for API calls. + + Prefers the explicit quota_project_id parameter, falls back to + quota_project_id from GoogleCloudOptions. + """ + if self.quota_project_id: + return self.quota_project_id + if self.options is not None: + return self.options.view_as(GoogleCloudOptions).quota_project_id + return None + def _create_source(self, path, coder): if not self.use_json_exports: return create_avro_source(path, validate=self.validate) @@ -796,7 +812,8 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): bq = bigquery_tools.BigQueryWrapper( temp_dataset_id=( self.temp_dataset.datasetId if self.temp_dataset else None), - client=bigquery_tools.BigQueryWrapper._bigquery_client(self.options)) + client=bigquery_tools.BigQueryWrapper._bigquery_client(self.options), + quota_project_id=self._get_quota_project_id()) if self.query is not None: self._setup_temporary_dataset(bq) @@ -929,6 +946,31 @@ def _export_files(self, bq): return table.schema, metadata_list +def _create_bq_storage_client(quota_project_id=None): + """Create a BigQueryReadClient with optional quota project. + + Args: + quota_project_id: Optional GCP project ID to use for quota and billing. + + Returns: + A BigQueryReadClient instance. + """ + if quota_project_id: + try: + import google.auth + from google.auth import exceptions as auth_exceptions + credentials, _ = google.auth.default() + credentials = auth.with_quota_project(credentials, quota_project_id) + return bq_storage.BigQueryReadClient(credentials=credentials) + except (auth_exceptions.DefaultCredentialsError, AttributeError) as e: + _LOGGER.warning( + 'Failed to apply quota project %s to BigQuery Storage client: %s. ' + 'Falling back to default client.', + quota_project_id, + e) + return bq_storage.BigQueryReadClient() + + class _CustomBigQueryStorageSource(BoundedSource): """A base class for BoundedSource implementations which read from BigQuery using the BigQuery Storage API. @@ -986,7 +1028,8 @@ def __init__( temp_dataset: Optional[DatasetReference] = None, temp_table: Optional[TableReference] = None, use_native_datetime: Optional[bool] = False, - timeout: Optional[float] = None): + timeout: Optional[float] = None, + quota_project_id: Optional[str] = None): if table is not None and query is not None: raise ValueError( @@ -1025,6 +1068,7 @@ def __init__( self._job_name = job_name or 'BQ_DIRECT_READ_JOB' self._step_name = step_name self._source_uuid = unique_id + self.quota_project_id = quota_project_id def _get_project(self): """Returns the project that queries and exports will be billed to.""" @@ -1036,6 +1080,18 @@ def _get_project(self): return project return self.project + def _get_quota_project_id(self): + """Returns the quota project ID for API calls. + + Prefers the explicit quota_project_id parameter, falls back to + quota_project_id from GoogleCloudOptions. + """ + if self.quota_project_id: + return self.quota_project_id + if self.pipeline_options is not None: + return self.pipeline_options.view_as(GoogleCloudOptions).quota_project_id + return None + def _get_parent_project(self): """Returns the project that will be billed.""" if self.temp_table: @@ -1165,7 +1221,8 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): bq = bigquery_tools.BigQueryWrapper( temp_table_ref=(self.temp_table if self.temp_table else None), client=bigquery_tools.BigQueryWrapper._bigquery_client( - self.pipeline_options)) + self.pipeline_options), + quota_project_id=self._get_quota_project_id()) if self.query is not None: self._setup_temporary_dataset(bq) @@ -1198,7 +1255,7 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): if self.row_restriction is not None: requested_session.read_options.row_restriction = self.row_restriction - storage_client = bq_storage.BigQueryReadClient() + storage_client = _create_bq_storage_client(self._get_quota_project_id()) stream_count = 0 if desired_bundle_size > 0: table_size = self._get_table_size(bq, self.table_reference) @@ -1229,8 +1286,10 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): self.split_result = [ _CustomBigQueryStorageStreamSource( - stream.name, self.use_native_datetime, self.timeout) - for stream in read_session.streams + stream.name, + self.use_native_datetime, + self.timeout, + self._get_quota_project_id()) for stream in read_session.streams ] for source in self.split_result: @@ -1264,10 +1323,12 @@ def __init__( self, read_stream_name: str, use_native_datetime: Optional[bool] = True, - timeout: Optional[float] = None): + timeout: Optional[float] = None, + quota_project_id: Optional[str] = None): self.read_stream_name = read_stream_name self.use_native_datetime = use_native_datetime self.timeout = timeout + self.quota_project_id = quota_project_id def display_data(self): return { @@ -1290,7 +1351,10 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): return SourceBundle( weight=1.0, source=_CustomBigQueryStorageStreamSource( - self.read_stream_name, self.use_native_datetime), + self.read_stream_name, + self.use_native_datetime, + self.timeout, + self.quota_project_id), start_position=None, stop_position=None) @@ -1326,7 +1390,7 @@ def retry_delay_callback(delay): def read_arrow(self): - storage_client = bq_storage.BigQueryReadClient() + storage_client = _create_bq_storage_client(self.quota_project_id) read_rows_kwargs = {'retry_delay_callback': self.retry_delay_callback} if self.timeout is not None: read_rows_kwargs['timeout'] = self.timeout @@ -1345,7 +1409,7 @@ def read_arrow(self): yield py_row def read_avro(self): - storage_client = bq_storage.BigQueryReadClient() + storage_client = _create_bq_storage_client(self.quota_project_id) read_rows_kwargs = {'retry_delay_callback': self.retry_delay_callback} if self.timeout is not None: read_rows_kwargs['timeout'] = self.timeout diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 234c99847a44..c09cecd7bff9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -65,6 +65,7 @@ from apache_beam.io.gcp.tests.bigquery_matcher import BigQueryTableMatcher from apache_beam.metrics.metric import Lineage from apache_beam.options import value_provider +from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.value_provider import RuntimeValueProvider @@ -338,6 +339,10 @@ def test_repeatable_field_is_properly_converted(self): class TestReadFromBigQuery(unittest.TestCase): @classmethod def setUpClass(cls): + cls.env_patch = mock.patch.dict( + os.environ, {'GOOGLE_CLOUD_PROJECT': 'test-project'}) + cls.env_patch.start() + class UserDefinedOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): @@ -351,6 +356,7 @@ def tearDown(self): @classmethod def tearDownClass(cls): + cls.env_patch.stop() # Unset the option added in setupClass to avoid interfere with other tests. # Force a gc so PipelineOptions.__subclass__() no longer contains it. del cls.UserDefinedOptions @@ -778,6 +784,196 @@ def test_read_all_lineage(self): ])) +@unittest.skipIf( + HttpError is None or gcp_bigquery is None, + 'GCP dependencies are not installed') +class TestReadFromBigQueryQuotaProject(unittest.TestCase): + """Tests for quota_project_id in ReadFromBigQuery sources.""" + def test_quota_project_id_from_pipeline_options(self): + """Test that quota_project_id is read from GoogleCloudOptions.""" + options = PipelineOptions(['--quota_project_id=my-billing-project']) + gcp_options = options.view_as(GoogleCloudOptions) + self.assertEqual(gcp_options.quota_project_id, 'my-billing-project') + + def test_quota_project_id_none_by_default_in_options(self): + """Test that quota_project_id is None by default in options.""" + options = PipelineOptions([]) + gcp_options = options.view_as(GoogleCloudOptions) + self.assertIsNone(gcp_options.quota_project_id) + + def test_export_source_explicit_quota_project(self): + """Test that explicit quota_project_id is stored in + _CustomBigQuerySource.""" + source = beam_bq._CustomBigQuerySource( + method=ReadFromBigQuery.Method.EXPORT, + table='project:dataset.table', + quota_project_id='my-billing-project') + self.assertEqual(source.quota_project_id, 'my-billing-project') + self.assertEqual(source._get_quota_project_id(), 'my-billing-project') + + def test_export_source_gets_quota_from_options(self): + """Test that _CustomBigQuerySource falls back to options for + quota_project_id.""" + options = PipelineOptions(['--quota_project_id=my-billing-project']) + source = beam_bq._CustomBigQuerySource( + method=ReadFromBigQuery.Method.EXPORT, + table='project:dataset.table', + pipeline_options=options) + self.assertEqual(source._get_quota_project_id(), 'my-billing-project') + + def test_export_source_explicit_overrides_options(self): + """Test that explicit quota_project_id overrides options.""" + options = PipelineOptions(['--quota_project_id=options-project']) + source = beam_bq._CustomBigQuerySource( + method=ReadFromBigQuery.Method.EXPORT, + table='project:dataset.table', + pipeline_options=options, + quota_project_id='explicit-project') + self.assertEqual(source._get_quota_project_id(), 'explicit-project') + + def test_storage_source_explicit_quota_project(self): + """Test that explicit quota_project_id is stored in + _CustomBigQueryStorageSource.""" + source = beam_bq._CustomBigQueryStorageSource( + method=ReadFromBigQuery.Method.DIRECT_READ, + table='project:dataset.table', + quota_project_id='my-billing-project') + self.assertEqual(source.quota_project_id, 'my-billing-project') + self.assertEqual(source._get_quota_project_id(), 'my-billing-project') + + def test_storage_source_gets_quota_from_options(self): + """Test that _CustomBigQueryStorageSource falls back to options.""" + options = PipelineOptions(['--quota_project_id=my-billing-project']) + source = beam_bq._CustomBigQueryStorageSource( + method=ReadFromBigQuery.Method.DIRECT_READ, + table='project:dataset.table', + pipeline_options=options) + self.assertEqual(source._get_quota_project_id(), 'my-billing-project') + + def test_quota_project_id_in_export_source_display_data(self): + """Test that quota_project_id appears in display data for export source.""" + source = beam_bq._CustomBigQuerySource( + method=ReadFromBigQuery.Method.EXPORT, + table='project:dataset.table', + quota_project_id='my-billing-project') + display_data = source.display_data() + self.assertEqual(display_data['quota_project_id'], 'my-billing-project') + + def test_quota_project_id_empty_in_display_data_when_not_set(self): + """Test that quota_project_id is empty string in display data + when not set.""" + options = PipelineOptions([]) + source = beam_bq._CustomBigQuerySource( + method=ReadFromBigQuery.Method.EXPORT, + table='project:dataset.table', + pipeline_options=options) + display_data = source.display_data() + self.assertEqual(display_data['quota_project_id'], '') + + def test_stream_source_stores_quota_project_id(self): + """Test that quota_project_id is stored in + _CustomBigQueryStorageStreamSource.""" + stream_source = beam_bq._CustomBigQueryStorageStreamSource( + read_stream_name='projects/p/locations/l/sessions/s/streams/stream1', + use_native_datetime=True, + timeout=30.0, + quota_project_id='my-billing-project') + self.assertEqual(stream_source.quota_project_id, 'my-billing-project') + + def test_stream_source_quota_project_id_none_by_default(self): + """Test that quota_project_id is None by default in stream source.""" + stream_source = beam_bq._CustomBigQueryStorageStreamSource( + read_stream_name='projects/p/locations/l/sessions/s/streams/stream1') + self.assertIsNone(stream_source.quota_project_id) + + def test_stream_source_split_preserves_quota_project_id(self): + """Test that split() preserves quota_project_id.""" + stream_source = beam_bq._CustomBigQueryStorageStreamSource( + read_stream_name='projects/p/locations/l/sessions/s/streams/stream1', + use_native_datetime=True, + timeout=30.0, + quota_project_id='my-billing-project') + bundle = stream_source.split(desired_bundle_size=0) + self.assertEqual(bundle.source.quota_project_id, 'my-billing-project') + self.assertEqual(bundle.source.timeout, 30.0) + self.assertEqual(bundle.source.use_native_datetime, True) + + @mock.patch('apache_beam.io.gcp.bigquery._create_bq_storage_client') + def test_stream_source_read_arrow_uses_quota_project( + self, mock_create_client): + """Test that read_arrow() uses _create_bq_storage_client + with quota_project_id.""" + mock_client = mock.MagicMock() + mock_create_client.return_value = mock_client + # Mock read_rows to return empty iterator + mock_client.read_rows.return_value.rows.return_value = iter([]) + + stream_source = beam_bq._CustomBigQueryStorageStreamSource( + read_stream_name='projects/p/locations/l/sessions/s/streams/stream1', + use_native_datetime=True, + quota_project_id='my-billing-project') + # Consume the iterator + list(stream_source.read_arrow()) + + mock_create_client.assert_called_once_with('my-billing-project') + + @mock.patch('apache_beam.io.gcp.bigquery._create_bq_storage_client') + def test_stream_source_read_avro_uses_quota_project(self, mock_create_client): + """Test that read_avro() uses _create_bq_storage_client + with quota_project_id.""" + mock_client = mock.MagicMock() + mock_create_client.return_value = mock_client + # Mock read_rows to return empty iterator + mock_client.read_rows.return_value = iter([]) + + stream_source = beam_bq._CustomBigQueryStorageStreamSource( + read_stream_name='projects/p/locations/l/sessions/s/streams/stream1', + use_native_datetime=False, + quota_project_id='my-billing-project') + # Consume the iterator + list(stream_source.read_avro()) + + mock_create_client.assert_called_once_with('my-billing-project') + + @mock.patch('apache_beam.io.gcp.bigquery.bq_storage') + @mock.patch('apache_beam.io.gcp.bigquery._LOGGER') + def test_create_bq_storage_client_logs_on_failure( + self, mock_logger, mock_bq_storage): + """Test that _create_bq_storage_client logs when quota project fails.""" + # Make google.auth.default raise a DefaultCredentialsError + from google.auth import exceptions as auth_exceptions + with mock.patch( + 'google.auth.default', + side_effect=auth_exceptions.DefaultCredentialsError('Auth error')): + beam_bq._create_bq_storage_client('my-billing-project') + + mock_logger.warning.assert_called_once() + warning_args = mock_logger.warning.call_args[0] + self.assertIn('Failed to apply quota project', warning_args[0]) + self.assertIn('my-billing-project', warning_args[1]) + + @mock.patch('apache_beam.io.gcp.bigquery.bq_storage') + def test_create_bq_storage_client_with_quota_project(self, mock_bq_storage): + """Test _create_bq_storage_client applies quota project to credentials.""" + mock_creds = mock.MagicMock(spec=['with_quota_project']) + mock_new_creds = mock.MagicMock() + mock_creds.with_quota_project.return_value = mock_new_creds + + with mock.patch('google.auth.default', return_value=(mock_creds, 'proj')): + beam_bq._create_bq_storage_client('my-billing-project') + + mock_creds.with_quota_project.assert_called_once_with('my-billing-project') + mock_bq_storage.BigQueryReadClient.assert_called_once_with( + credentials=mock_new_creds) + + @mock.patch('apache_beam.io.gcp.bigquery.bq_storage') + def test_create_bq_storage_client_without_quota_project( + self, mock_bq_storage): + """Test _create_bq_storage_client without quota project uses default.""" + beam_bq._create_bq_storage_client(None) + mock_bq_storage.BigQueryReadClient.assert_called_once_with() + + @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQuerySink(unittest.TestCase): def test_table_spec_display_data(self): @@ -819,10 +1015,14 @@ def _cleanup_files(self): os.remove('insert_calls2') def setUp(self): + self.env_patch = mock.patch.dict( + os.environ, {'GOOGLE_CLOUD_PROJECT': 'test-project'}) + self.env_patch.start() self._cleanup_files() def tearDown(self): self._cleanup_files() + self.env_patch.stop() def test_noop_schema_parsing(self): expected_table_schema = None @@ -1238,6 +1438,13 @@ def test_copy_load_job_exception(self, exception_type, error_message): HttpError is None or exceptions is None, 'GCP dependencies are not installed') class BigQueryStreamingInsertsErrorHandling(unittest.TestCase): + def setUp(self): + self.env_patch = mock.patch.dict( + os.environ, {'GOOGLE_CLOUD_PROJECT': 'test-project'}) + self.env_patch.start() + + def tearDown(self): + self.env_patch.stop() # Running tests with a variety of exceptions from https://googleapis.dev # /python/google-api-core/latest/_modules/google/api_core/exceptions.html. diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 8dd58cd55a01..ffa132d7a634 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -59,6 +59,7 @@ from apache_beam.metrics import monitoring_infos from apache_beam.metrics.metric import Metrics from apache_beam.options import value_provider +from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms import DoFn from apache_beam.typehints.row_type import RowTypeConstraint @@ -358,11 +359,25 @@ class BigQueryWrapper(object): HISTOGRAM_METRIC_LOGGER = MetricLogger() - def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None): - self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions()) - self.gcp_bq_client = client or gcp_bigquery.Client( - client_info=ClientInfo( - user_agent="apache-beam-%s" % apache_beam.__version__)) + def __init__( + self, + client=None, + temp_dataset_id=None, + temp_table_ref=None, + quota_project_id=None): + self.quota_project_id = quota_project_id + self.client = client or BigQueryWrapper._bigquery_client( + PipelineOptions(), quota_project_id=quota_project_id) + + # If the client is a mock (common in tests) or has the specific method + # we use, we use it as the gcp_bq_client to preserve backward + # compatibility for tests. Otherwise (e.g. it's a real apitools client), + # we create the correct google-cloud-bigquery client. + if client and hasattr(client, 'insert_rows_json'): + self.gcp_bq_client = client + else: + self.gcp_bq_client = BigQueryWrapper._gcp_bigquery_client( + quota_project_id=quota_project_id) self._unique_row_id = 0 # For testing scenarios where we pass in a client we do not want a @@ -1406,19 +1421,69 @@ def convert_row_to_dict(self, row, schema): @staticmethod def from_pipeline_options(pipeline_options: PipelineOptions): + """Create a BigQueryWrapper from pipeline options. + + Args: + pipeline_options: Pipeline options containing GCP configuration. + The quota_project_id is read from GoogleCloudOptions if set. + """ + quota_project_id = None + if pipeline_options is not None: + quota_project_id = pipeline_options.view_as( + GoogleCloudOptions).quota_project_id return BigQueryWrapper( - client=BigQueryWrapper._bigquery_client(pipeline_options)) + client=BigQueryWrapper._bigquery_client(pipeline_options), + quota_project_id=quota_project_id) @staticmethod - def _bigquery_client(pipeline_options: PipelineOptions): + def _bigquery_client( + pipeline_options: PipelineOptions, quota_project_id: str = None): + """Create a BigQuery API client from pipeline options. + + Args: + pipeline_options: Pipeline options for credentials. + quota_project_id: Optional quota project ID. If not provided, will be + extracted from pipeline_options. + """ + credentials = auth.get_service_credentials(pipeline_options) + # Use explicit quota_project_id if provided, otherwise get from options + if quota_project_id is None and pipeline_options is not None: + quota_project_id = pipeline_options.view_as( + GoogleCloudOptions).quota_project_id + if quota_project_id: + credentials = auth.with_quota_project(credentials, quota_project_id) return bigquery.BigqueryV2( http=get_new_http(), - credentials=auth.get_service_credentials(pipeline_options), + credentials=credentials, response_encoding='utf8', additional_http_headers={ "user-agent": "apache-beam-%s" % apache_beam.__version__ }) + @staticmethod + def _gcp_bigquery_client(quota_project_id: str = None): + """Create a google-cloud-bigquery Client with optional quota project.""" + credentials = None + + if quota_project_id: + # Get default credentials and apply quota project + try: + import google.auth + from google.auth import exceptions as auth_exceptions + credentials, _ = google.auth.default() + credentials = auth.with_quota_project(credentials, quota_project_id) + except (auth_exceptions.DefaultCredentialsError, AttributeError) as e: + _LOGGER.warning( + 'Failed to apply quota project %s to gcp-bigquery client: %s. ' + 'Falling back to default client.', + quota_project_id, + e) + + return gcp_bigquery.Client( + credentials=credentials, + client_info=ClientInfo( + user_agent="apache-beam-%s" % apache_beam.__version__)) + class RowAsDictJsonCoder(coders.Coder): """A coder for a table row (represented as a dict) to/from a JSON string. diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 078c42160941..ac32a69b716b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -23,6 +23,7 @@ import json import logging import math +import os import re import unittest from typing import Optional @@ -45,6 +46,7 @@ from apache_beam.io.gcp.bigquery_tools import check_schema_equal from apache_beam.io.gcp.bigquery_tools import generate_bq_job_name from apache_beam.io.gcp.bigquery_tools import get_beam_typehints_from_tableschema +from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.bigquery_tools import parse_table_reference from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json from apache_beam.io.gcp.internal.clients import bigquery @@ -236,14 +238,16 @@ def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep): @mock.patch('google.cloud._http.JSONConnection.http') def test_user_agent_insert_all( self, http_mock, patched_skip_get_credentials, patched_sleep): - wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper() - try: - wrapper._insert_all_rows('p', 'd', 't', [{'name': 'any'}], None) - except: # pylint: disable=bare-except - # Ignore errors. The errors come from the fact that we did not mock - # the response from the API, so the overall insert_all_rows call fails - # soon after the BQ API is called. - pass + # Set GOOGLE_CLOUD_PROJECT to ensure Client creation succeeds in test env + with mock.patch.dict(os.environ, {'GOOGLE_CLOUD_PROJECT': 'test-project'}): + wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper() + try: + wrapper._insert_all_rows('p', 'd', 't', [{'name': 'any'}], None) + except: # pylint: disable=bare-except + # Ignore errors. The errors come from the fact that we did not mock + # the response from the API, so the overall insert_all_rows call fails + # soon after the BQ API is called. + pass call = http_mock.request.mock_calls[-2] self.assertIn('apache-beam-', call[2]['headers']['User-Agent']) @@ -1106,8 +1110,6 @@ def test_geography_in_bigquery_type_mapping(self): def test_geography_field_conversion(self): """Test that GEOGRAPHY fields are converted correctly.""" - from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper - # Create a mock field with GEOGRAPHY type field = bigquery.TableFieldSchema() field.type = 'GEOGRAPHY' @@ -1229,8 +1231,6 @@ def test_geography_json_encoding(self): def test_geography_with_special_characters(self): """Test GEOGRAPHY values with special characters and geometries.""" - from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper - field = bigquery.TableFieldSchema() field.type = 'GEOGRAPHY' field.name = 'complex_geo' @@ -1409,6 +1409,57 @@ def test_type_overrides_json_to_dict(self): self.assertEqual(typehints_dict, [("data", Optional[dict])]) +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class TestBigQueryWrapperQuotaProject(unittest.TestCase): + """Tests for quota_project_id in BigQueryWrapper.""" + @mock.patch( + 'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._bigquery_client') + @mock.patch( + 'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._gcp_bigquery_client') + def test_quota_project_id_stored(self, mock_gcp_client, mock_bq_client): + """Test that quota_project_id is stored in BigQueryWrapper.""" + mock_bq_client.return_value = mock.Mock() + mock_gcp_client.return_value = mock.Mock() + + wrapper = BigQueryWrapper(quota_project_id='my-billing-project') + self.assertEqual(wrapper.quota_project_id, 'my-billing-project') + + @mock.patch( + 'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._bigquery_client') + @mock.patch( + 'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._gcp_bigquery_client') + def test_from_pipeline_options_reads_quota_from_options( + self, mock_gcp_client, mock_bq_client): + """Test from_pipeline_options reads quota_project_id from + GoogleCloudOptions.""" + from apache_beam.options.pipeline_options import PipelineOptions + + mock_bq_client.return_value = mock.Mock() + mock_gcp_client.return_value = mock.Mock() + + options = PipelineOptions(['--quota_project_id=my-billing-project']) + wrapper = BigQueryWrapper.from_pipeline_options(options) + + self.assertEqual(wrapper.quota_project_id, 'my-billing-project') + + @mock.patch( + 'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._bigquery_client') + @mock.patch( + 'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._gcp_bigquery_client') + def test_from_pipeline_options_none_when_not_set( + self, mock_gcp_client, mock_bq_client): + """Test from_pipeline_options returns None when quota_project_id not set.""" + from apache_beam.options.pipeline_options import PipelineOptions + + mock_bq_client.return_value = mock.Mock() + mock_gcp_client.return_value = mock.Mock() + + options = PipelineOptions([]) + wrapper = BigQueryWrapper.from_pipeline_options(options) + + self.assertIsNone(wrapper.quota_project_id) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 4b06b8eda613..b84a7b6e308a 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1187,6 +1187,14 @@ def _add_argparse_args(cls, parser): action='store_true', help='Throttling counter in GcsIO is enabled by default. Set ' '--no_gcsio_throttling_counter to avoid it.') + parser.add_argument( + '--quota_project_id', + default=None, + help='GCP project ID to use for quota and billing purposes. ' + 'If not specified, the project associated with the credentials ' + 'will be used for quota. This is useful when running pipelines ' + 'that access resources in a different project than the one ' + 'associated with the credentials.') parser.add_argument( '--enable_gcsio_blob_generation', default=False, From 324dbebf5e278b1f4f2ff5f0fce0df96373102d6 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Tue, 27 Jan 2026 21:50:58 +0200 Subject: [PATCH 2/7] Fix isort linting --- sdks/python/apache_beam/internal/gcp/auth_test.py | 1 - sdks/python/apache_beam/io/gcp/bigquery_test.py | 10 ++++------ sdks/python/apache_beam/io/gcp/bigquery_tools.py | 3 +-- sdks/python/apache_beam/io/gcp/bigquery_tools_test.py | 7 +++---- 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth_test.py b/sdks/python/apache_beam/internal/gcp/auth_test.py index 1f811a16287e..9c04e461735f 100644 --- a/sdks/python/apache_beam/internal/gcp/auth_test.py +++ b/sdks/python/apache_beam/internal/gcp/auth_test.py @@ -18,7 +18,6 @@ import unittest import mock - from apache_beam.internal.gcp import auth from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index c09cecd7bff9..5862ad036b12 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -31,15 +31,12 @@ import unittest import uuid +import apache_beam as beam import hamcrest as hc import mock import pytest import pytz import requests -from parameterized import param -from parameterized import parameterized - -import apache_beam as beam from apache_beam.internal import pickler from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp @@ -80,18 +77,19 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher +from parameterized import param +from parameterized import parameterized # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: + from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_client from apitools.base.py.exceptions import HttpError from apitools.base.py.exceptions import HttpForbiddenError from google.api_core import exceptions from google.cloud import bigquery as gcp_bigquery from google.cloud import bigquery_storage_v1 as bq_storage - - from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_client except ImportError: gcp_bigquery = None bq_storage = None diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index ffa132d7a634..7dd7a2ab4a90 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -43,10 +43,9 @@ from typing import TypeVar from typing import Union +import apache_beam import fastavro import numpy as np - -import apache_beam from apache_beam import coders from apache_beam.internal.gcp import auth from apache_beam.internal.gcp.json_value import from_json_value diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index ac32a69b716b..b06cba39e914 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -29,24 +29,22 @@ from typing import Optional from typing import Sequence +import apache_beam as beam import fastavro import mock import numpy as np import pytz -from parameterized import parameterized - -import apache_beam as beam from apache_beam.io.gcp import resource_identifiers from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR from apache_beam.io.gcp.bigquery_tools import AvroRowWriter from apache_beam.io.gcp.bigquery_tools import BigQueryJobTypes +from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.bigquery_tools import JsonRowWriter from apache_beam.io.gcp.bigquery_tools import RowAsDictJsonCoder from apache_beam.io.gcp.bigquery_tools import beam_row_from_dict from apache_beam.io.gcp.bigquery_tools import check_schema_equal from apache_beam.io.gcp.bigquery_tools import generate_bq_job_name from apache_beam.io.gcp.bigquery_tools import get_beam_typehints_from_tableschema -from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.bigquery_tools import parse_table_reference from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json from apache_beam.io.gcp.internal.clients import bigquery @@ -55,6 +53,7 @@ from apache_beam.options.value_provider import StaticValueProvider from apache_beam.typehints.row_type import RowTypeConstraint from apache_beam.utils.timestamp import Timestamp +from parameterized import parameterized # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position From 12e2845086fe9e321c7e10e2b5425caf8633c4c3 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Thu, 2 Jul 2026 09:25:31 +0300 Subject: [PATCH 3/7] Document quota_project_id in ReadFromBigQuery and simplify client init Post-rebase follow-ups to the Python quota project support: - Document the quota_project_id parameter in the ReadFromBigQuery docstring. - Restore BigQueryWrapper.gcp_bq_client to master semantics (use the passed client if any), applying the quota project only to the self-created client. - Apply isort with the flags used by CI (scripts/run_lint.sh). - Update CHANGES.md to cover the Java and Go SDKs. Generated-by: Claude Code (claude-fable-5) Co-Authored-By: Claude Fable 5 --- CHANGES.md | 2 +- sdks/python/apache_beam/internal/gcp/auth_test.py | 1 + sdks/python/apache_beam/io/gcp/bigquery.py | 6 ++++++ sdks/python/apache_beam/io/gcp/bigquery_test.py | 10 ++++++---- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 15 ++++----------- .../apache_beam/io/gcp/bigquery_tools_test.py | 5 +++-- 6 files changed, 21 insertions(+), 18 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 43c85788d33d..26c12ffbdc32 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -82,7 +82,7 @@ * (YAML) Added WriteToDatadog transform ([#38362](https://github.com/apache/beam/issues/38362)). * (Java) Flink 2.1 and 2.2 support is added ([#38947](https://github.com/apache/beam/issues/38947)) ([#38978](https://github.com/apache/beam/issues/38978)); Flink 1.17 and 1.18 support is dropped. * (Python) MqttIO is now supported in Python via cross-language ([#21060](https://github.com/apache/beam/issues/21060)). -* Added support for setting quota project ID in BigQuery read operations via `--quota_project_id` pipeline option or `quota_project_id` parameter in ReadFromBigQuery transform (Python) ([#37431](https://github.com/apache/beam/issues/37431)). +* Added support for attributing BigQuery API quota and billing to a specific GCP project (quota project): `quota_project_id` parameter in `ReadFromBigQuery` or `--quota_project_id` pipeline option (Python), `--bigQueryQuotaProjectId` pipeline option (Java), and `bigqueryio.WithQuotaProject` read/query option (Go) ([#37431](https://github.com/apache/beam/issues/37431)). ## Breaking Changes diff --git a/sdks/python/apache_beam/internal/gcp/auth_test.py b/sdks/python/apache_beam/internal/gcp/auth_test.py index 9c04e461735f..1f811a16287e 100644 --- a/sdks/python/apache_beam/internal/gcp/auth_test.py +++ b/sdks/python/apache_beam/internal/gcp/auth_test.py @@ -18,6 +18,7 @@ import unittest import mock + from apache_beam.internal.gcp import auth from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 6e9908724895..7e53a594db5c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -3001,6 +3001,12 @@ class ReadFromBigQuery(PTransform): PCollection with a schema and yielding Beam Rows via the option `BEAM_ROW`. For more information on schemas, see https://beam.apache.org/documentation/programming-guide/#what-is-a-schema) + quota_project_id (str): The GCP project ID to use for quota and billing + of BigQuery API requests issued by this transform, if different from + the project the data resides in. Falls back to the + ``--quota_project_id`` pipeline option if not set. The credentials + used must have the ``serviceusage.services.use`` permission on that + project. """ class Method(object): EXPORT = 'EXPORT' # This is currently the default. diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 5862ad036b12..c09cecd7bff9 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -31,12 +31,15 @@ import unittest import uuid -import apache_beam as beam import hamcrest as hc import mock import pytest import pytz import requests +from parameterized import param +from parameterized import parameterized + +import apache_beam as beam from apache_beam.internal import pickler from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp @@ -77,19 +80,18 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher -from parameterized import param -from parameterized import parameterized # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: - from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_client from apitools.base.py.exceptions import HttpError from apitools.base.py.exceptions import HttpForbiddenError from google.api_core import exceptions from google.cloud import bigquery as gcp_bigquery from google.cloud import bigquery_storage_v1 as bq_storage + + from apache_beam.io.gcp.internal.clients.bigquery import bigquery_v2_client except ImportError: gcp_bigquery = None bq_storage = None diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 7dd7a2ab4a90..5243e841a57d 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -43,9 +43,10 @@ from typing import TypeVar from typing import Union -import apache_beam import fastavro import numpy as np + +import apache_beam from apache_beam import coders from apache_beam.internal.gcp import auth from apache_beam.internal.gcp.json_value import from_json_value @@ -367,16 +368,8 @@ def __init__( self.quota_project_id = quota_project_id self.client = client or BigQueryWrapper._bigquery_client( PipelineOptions(), quota_project_id=quota_project_id) - - # If the client is a mock (common in tests) or has the specific method - # we use, we use it as the gcp_bq_client to preserve backward - # compatibility for tests. Otherwise (e.g. it's a real apitools client), - # we create the correct google-cloud-bigquery client. - if client and hasattr(client, 'insert_rows_json'): - self.gcp_bq_client = client - else: - self.gcp_bq_client = BigQueryWrapper._gcp_bigquery_client( - quota_project_id=quota_project_id) + self.gcp_bq_client = client or BigQueryWrapper._gcp_bigquery_client( + quota_project_id=quota_project_id) self._unique_row_id = 0 # For testing scenarios where we pass in a client we do not want a diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index b06cba39e914..46344e536eaf 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -29,11 +29,13 @@ from typing import Optional from typing import Sequence -import apache_beam as beam import fastavro import mock import numpy as np import pytz +from parameterized import parameterized + +import apache_beam as beam from apache_beam.io.gcp import resource_identifiers from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR from apache_beam.io.gcp.bigquery_tools import AvroRowWriter @@ -53,7 +55,6 @@ from apache_beam.options.value_provider import StaticValueProvider from apache_beam.typehints.row_type import RowTypeConstraint from apache_beam.utils.timestamp import Timestamp -from parameterized import parameterized # Protect against environments where bigquery library is not available. # pylint: disable=wrong-import-order, wrong-import-position From 74488048d911bfd3f6aafaba57c65bb1d8c12da8 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Thu, 2 Jul 2026 09:25:47 +0300 Subject: [PATCH 4/7] Add bigQueryQuotaProjectId pipeline option to Java BigQueryIO Adds a BigQueryOptions option that attributes quota and billing of BigQuery API requests to a specific GCP project by setting the X-Goog-User-Project header: the HTTP Bigquery client wraps its credentials via GoogleCredentials.createWithQuotaProject, and the Storage read/write gRPC clients set quotaProjectId on their settings. Generated-by: Claude Code (claude-fable-5) Co-Authored-By: Claude Fable 5 --- .../sdk/io/gcp/bigquery/BigQueryOptions.java | 10 +++++ .../io/gcp/bigquery/BigQueryServicesImpl.java | 33 +++++++++++++++- .../bigquery/BigQueryServicesImplTest.java | 38 +++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index face2ef5841a..9a0b61170808 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -197,6 +197,16 @@ public interface BigQueryOptions void setBigQueryProject(String value); + @Description( + "GCP project ID used for quota and billing attribution of BigQuery API requests " + + "(sets the X-Goog-User-Project header), if different from the project the data " + + "resides in. If unspecified, the project associated with the credentials is used. " + + "The credentials used must have the serviceusage.services.use permission on this " + + "project.") + String getBigQueryQuotaProjectId(); + + void setBigQueryQuotaProjectId(String value); + @Description("Maximum (best effort) size of a single append to the storage API.") @Default.Integer(2 * 1024 * 1024) Integer getStorageApiAppendThresholdBytes(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 14765a65ff0b..04c22527c436 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -61,6 +61,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.auth.Credentials; import com.google.auth.http.HttpCredentialsAdapter; +import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; @@ -1738,6 +1739,27 @@ private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws Inte } } + /** + * Returns credentials with the quota project applied, if one is configured and the credentials + * support it. The quota project sets the {@code X-Goog-User-Project} header so that BigQuery API + * requests are billed against that project's quota. + */ + @VisibleForTesting + static @Nullable Credentials maybeWithQuotaProjectId( + @Nullable Credentials credential, @Nullable String quotaProjectId) { + if (Strings.isNullOrEmpty(quotaProjectId) || credential == null) { + return credential; + } + if (credential instanceof GoogleCredentials) { + return ((GoogleCredentials) credential).createWithQuotaProject(quotaProjectId); + } + LOG.warn( + "Credentials of type {} do not support a quota project. " + + "The bigQueryQuotaProjectId option will be ignored.", + credential.getClass().getName()); + return credential; + } + /** Returns a BigQuery client builder using the specified {@link BigQueryOptions}. */ private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) { // Do not log 404. It clutters the output and is possibly even required by the @@ -1748,7 +1770,8 @@ private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) { httpRequestInitializer.setReadTimeout(options.getHTTPReadTimeout()); httpRequestInitializer.setWriteTimeout(options.getHTTPWriteTimeout()); ImmutableList.Builder initBuilder = ImmutableList.builder(); - Credentials credential = options.getGcpCredential(); + Credentials credential = + maybeWithQuotaProjectId(options.getGcpCredential(), options.getBigQueryQuotaProjectId()); initBuilder.add( credential == null ? new NullCredentialInitializer() @@ -1787,6 +1810,10 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option if (!Strings.isNullOrEmpty(endpoint)) { builder.setEndpoint(trimSchemaIfNecessary(endpoint)); } + @Nullable String quotaProjectId = options.getBigQueryQuotaProjectId(); + if (!Strings.isNullOrEmpty(quotaProjectId)) { + builder.setQuotaProjectId(quotaProjectId); + } return BigQueryWriteClient.create( builder .setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential()) @@ -1911,6 +1938,10 @@ public void onRetryAttempt(Status status, Metadata metadata) { if (!Strings.isNullOrEmpty(endpoint)) { settingsBuilder.setEndpoint(trimSchemaIfNecessary(endpoint)); } + @Nullable String quotaProjectId = options.getBigQueryQuotaProjectId(); + if (!Strings.isNullOrEmpty(quotaProjectId)) { + settingsBuilder.setQuotaProjectId(quotaProjectId); + } UnaryCallSettings.Builder createReadSessionSettings = settingsBuilder.getStubSettingsBuilder().createReadSessionSettings(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 3902fb1fca33..6870ab15a27e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -67,6 +67,10 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.auth.Credentials; +import com.google.auth.oauth2.AccessToken; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.QuotaProjectIdProvider; import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; @@ -2173,6 +2177,40 @@ public RetryInfo parseBytes(byte[] serialized) { assertEquals(123456, (long) container.getCounter(metricName).getCumulative()); } + @Test + public void testQuotaProjectIdOverrides() throws IOException { + BigQueryOptions options = PipelineOptionsFactory.create().as(BigQueryOptions.class); + options.setBigQueryQuotaProjectId("my-quota-project"); + + assertEquals( + "my-quota-project", + new BigQueryServicesImpl.StorageClientImpl(options) + .getClient() + .getSettings() + .getQuotaProjectId()); + assertEquals( + "my-quota-project", + new BigQueryServicesImpl.WriteStreamServiceImpl(options) + .getClient() + .getSettings() + .getQuotaProjectId()); + } + + @Test + public void testMaybeWithQuotaProjectId() throws IOException { + GoogleCredentials credentials = + GoogleCredentials.create(AccessToken.newBuilder().setTokenValue("fake-token").build()); + + Credentials withQuota = + BigQueryServicesImpl.maybeWithQuotaProjectId(credentials, "my-quota-project"); + assertEquals("my-quota-project", ((QuotaProjectIdProvider) withQuota).getQuotaProjectId()); + + // No quota project configured: credentials are returned unchanged. + assertEquals(credentials, BigQueryServicesImpl.maybeWithQuotaProjectId(credentials, null)); + // Null credentials pass through. + assertNull(BigQueryServicesImpl.maybeWithQuotaProjectId(null, "my-quota-project")); + } + @Test public void testEndpointOverrides() throws IOException { BigQueryOptions options = PipelineOptionsFactory.create().as(BigQueryOptions.class); From d1a3cc942428fb45b5169cbdea2ed2eb5d2a629b Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Thu, 2 Jul 2026 09:25:47 +0300 Subject: [PATCH 5/7] Add WithQuotaProject read option to Go bigqueryio Adds a QueryOptions field and functional option that attributes quota and billing of BigQuery API calls to a specific GCP project via option.WithQuotaProject. Read now accepts query options. Generated-by: Claude Code (claude-fable-5) Co-Authored-By: Claude Fable 5 --- sdks/go/pkg/beam/io/bigqueryio/bigquery.go | 25 ++++++++++++++++--- .../pkg/beam/io/bigqueryio/bigquery_test.go | 10 ++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go index a80661e22d33..f5ae2df04594 100644 --- a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go +++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go @@ -34,6 +34,7 @@ import ( bq "google.golang.org/api/bigquery/v2" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" + "google.golang.org/api/option" ) // writeSizeLimit is the maximum number of rows allowed by BQ in a write. @@ -88,14 +89,14 @@ func NewQualifiedTableName(s string) (QualifiedTableName, error) { // Read reads all rows from the given table. The table must have a schema // compatible with the given type, t, and Read returns a PCollection. If the // table has more rows than t, then Read is implicitly a projection. -func Read(s beam.Scope, project, table string, t reflect.Type) beam.PCollection { +func Read(s beam.Scope, project, table string, t reflect.Type, options ...func(*QueryOptions) error) beam.PCollection { mustParseTable(table) s = s.Scope("bigquery.Read") stmt := constructSelectStatement(t, bigQueryTag, table) - return query(s, project, stmt, t) + return query(s, project, stmt, t, options...) } func constructSelectStatement(t reflect.Type, tagKey string, table string) string { @@ -114,6 +115,9 @@ func constructSelectStatement(t reflect.Type, tagKey string, table string) strin type QueryOptions struct { // UseStandardSQL enables BigQuery's Standard SQL dialect when executing a query. UseStandardSQL bool + // QuotaProject is the GCP project ID used for quota and billing attribution + // of the BigQuery API calls, if different from the project the data resides in. + QuotaProject string } // UseStandardSQL enables BigQuery's Standard SQL dialect when executing a query. @@ -124,6 +128,17 @@ func UseStandardSQL() func(qo *QueryOptions) error { } } +// WithQuotaProject sets the GCP project ID used for quota and billing +// attribution of the BigQuery API calls, if different from the project the +// data resides in. The credentials used must have the +// serviceusage.services.use permission on that project. +func WithQuotaProject(project string) func(qo *QueryOptions) error { + return func(qo *QueryOptions) error { + qo.QuotaProject = project + return nil + } +} + // Query executes a query. The output must have a schema compatible with the given // type, t. It returns a PCollection. func Query(s beam.Scope, project, q string, t reflect.Type, options ...func(*QueryOptions) error) beam.PCollection { @@ -157,7 +172,11 @@ type queryFn struct { } func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit func(beam.X)) error { - client, err := bigquery.NewClient(ctx, f.Project) + var opts []option.ClientOption + if f.Options.QuotaProject != "" { + opts = append(opts, option.WithQuotaProject(f.Options.QuotaProject)) + } + client, err := bigquery.NewClient(ctx, f.Project, opts...) if err != nil { return err } diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go b/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go index b955136e8335..4bd2cca3a635 100644 --- a/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go +++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go @@ -148,3 +148,13 @@ func Test_mustInferSchema(t *testing.T) { }) } } + +func TestWithQuotaProject(t *testing.T) { + qo := QueryOptions{} + if err := WithQuotaProject("quota-project")(&qo); err != nil { + t.Fatalf("WithQuotaProject() returned err: %v", err) + } + if got, want := qo.QuotaProject, "quota-project"; got != want { + t.Errorf("qo.QuotaProject = %q, want %q", got, want) + } +} From 0f949cce66ec8d24287765512ce41d49c8390571 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Thu, 2 Jul 2026 09:46:01 +0300 Subject: [PATCH 6/7] Pass transform-level quota_project_id to the BigQuery jobs client Live testing against BigQuery showed the quota_project_id parameter on ReadFromBigQuery never reached the apitools jobs client: both sources built the client from pipeline options only, so only the --quota_project_id pipeline option was applied, not the transform-level parameter. Pass the source-level value (which falls back to the option) explicitly at both wrapper construction sites. Also address Gemini review comments: guard with_quota_project when google-auth is unavailable, and broaden the credential-loading fallback in _create_bq_storage_client/_gcp_bigquery_client. Verified live on DirectRunner reading bigquery-public-data: - without quota project: read succeeds - quota project the caller cannot use: 403 USER_PROJECT_DENIED - quota project with the BigQuery API disabled: SERVICE_DISABLED naming the quota project (positive attribution proof) Generated-by: Claude Code (claude-fable-5) Co-Authored-By: Claude Fable 5 --- sdks/python/apache_beam/internal/gcp/auth.py | 3 ++- sdks/python/apache_beam/io/gcp/bigquery.py | 9 +++++---- sdks/python/apache_beam/io/gcp/bigquery_tools.py | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/internal/gcp/auth.py b/sdks/python/apache_beam/internal/gcp/auth.py index 9a848ea430be..aa97ffa186cb 100644 --- a/sdks/python/apache_beam/internal/gcp/auth.py +++ b/sdks/python/apache_beam/internal/gcp/auth.py @@ -99,7 +99,8 @@ def with_quota_project(credentials, quota_project_id): Credentials with the quota project applied, or the original credentials if quota project is not supported or credentials is None. """ - if credentials is None or quota_project_id is None: + if not _GOOGLE_AUTH_AVAILABLE or credentials is None or (quota_project_id + is None): return credentials # Get the underlying google-auth credentials if wrapped diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 7e53a594db5c..07f8936f335b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -812,7 +812,8 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): bq = bigquery_tools.BigQueryWrapper( temp_dataset_id=( self.temp_dataset.datasetId if self.temp_dataset else None), - client=bigquery_tools.BigQueryWrapper._bigquery_client(self.options), + client=bigquery_tools.BigQueryWrapper._bigquery_client( + self.options, quota_project_id=self._get_quota_project_id()), quota_project_id=self._get_quota_project_id()) if self.query is not None: @@ -958,11 +959,10 @@ def _create_bq_storage_client(quota_project_id=None): if quota_project_id: try: import google.auth - from google.auth import exceptions as auth_exceptions credentials, _ = google.auth.default() credentials = auth.with_quota_project(credentials, quota_project_id) return bq_storage.BigQueryReadClient(credentials=credentials) - except (auth_exceptions.DefaultCredentialsError, AttributeError) as e: + except Exception as e: _LOGGER.warning( 'Failed to apply quota project %s to BigQuery Storage client: %s. ' 'Falling back to default client.', @@ -1221,7 +1221,8 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): bq = bigquery_tools.BigQueryWrapper( temp_table_ref=(self.temp_table if self.temp_table else None), client=bigquery_tools.BigQueryWrapper._bigquery_client( - self.pipeline_options), + self.pipeline_options, + quota_project_id=self._get_quota_project_id()), quota_project_id=self._get_quota_project_id()) if self.query is not None: diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 5243e841a57d..7fab1ec022ab 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -1461,10 +1461,10 @@ def _gcp_bigquery_client(quota_project_id: str = None): # Get default credentials and apply quota project try: import google.auth - from google.auth import exceptions as auth_exceptions credentials, _ = google.auth.default() credentials = auth.with_quota_project(credentials, quota_project_id) - except (auth_exceptions.DefaultCredentialsError, AttributeError) as e: + except Exception as e: + credentials = None _LOGGER.warning( 'Failed to apply quota project %s to gcp-bigquery client: %s. ' 'Falling back to default client.', From 0a82c973fa967307c4f286878c4edf9ad98ea65b Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Thu, 2 Jul 2026 10:05:51 +0300 Subject: [PATCH 7/7] Improve test coverage for quota project support across SDKs - Go: extract the client-option building into a testable clientOptions helper (the branch Codecov flagged as uncovered) and unit test it. The remaining uncovered lines are the bigquery.NewClient IO call itself. - Python: add regression tests that _CustomBigQuerySource.split and _CustomBigQueryStorageSource.split pass the transform-level quota_project_id to _bigquery_client. These fail on the pre-fix code, covering the bug found during live testing. - Java: cover the maybeWithQuotaProjectId branches for an empty quota project and for credentials that don't support a quota project. Generated-by: Claude Code (claude-fable-5) Co-Authored-By: Claude Fable 5 --- sdks/go/pkg/beam/io/bigqueryio/bigquery.go | 15 ++++++--- .../pkg/beam/io/bigqueryio/bigquery_test.go | 9 ++++++ .../bigquery/BigQueryServicesImplTest.java | 32 +++++++++++++++++++ .../apache_beam/io/gcp/bigquery_test.py | 28 ++++++++++++++++ 4 files changed, 79 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go index f5ae2df04594..ae6c2e4e57d2 100644 --- a/sdks/go/pkg/beam/io/bigqueryio/bigquery.go +++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery.go @@ -139,6 +139,15 @@ func WithQuotaProject(project string) func(qo *QueryOptions) error { } } +// clientOptions returns the BigQuery client options implied by qo. +func clientOptions(qo QueryOptions) []option.ClientOption { + var opts []option.ClientOption + if qo.QuotaProject != "" { + opts = append(opts, option.WithQuotaProject(qo.QuotaProject)) + } + return opts +} + // Query executes a query. The output must have a schema compatible with the given // type, t. It returns a PCollection. func Query(s beam.Scope, project, q string, t reflect.Type, options ...func(*QueryOptions) error) beam.PCollection { @@ -172,11 +181,7 @@ type queryFn struct { } func (f *queryFn) ProcessElement(ctx context.Context, _ []byte, emit func(beam.X)) error { - var opts []option.ClientOption - if f.Options.QuotaProject != "" { - opts = append(opts, option.WithQuotaProject(f.Options.QuotaProject)) - } - client, err := bigquery.NewClient(ctx, f.Project, opts...) + client, err := bigquery.NewClient(ctx, f.Project, clientOptions(f.Options)...) if err != nil { return err } diff --git a/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go b/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go index 4bd2cca3a635..5f0ae8d85e19 100644 --- a/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go +++ b/sdks/go/pkg/beam/io/bigqueryio/bigquery_test.go @@ -158,3 +158,12 @@ func TestWithQuotaProject(t *testing.T) { t.Errorf("qo.QuotaProject = %q, want %q", got, want) } } + +func TestClientOptions(t *testing.T) { + if got := clientOptions(QueryOptions{}); len(got) != 0 { + t.Errorf("clientOptions(no quota) = %d options, want 0", len(got)) + } + if got := clientOptions(QueryOptions{QuotaProject: "quota-project"}); len(got) != 1 { + t.Errorf("clientOptions(with quota) = %d options, want 1", len(got)) + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 6870ab15a27e..ea2e0431bece 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -91,6 +91,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; @@ -2207,8 +2208,39 @@ public void testMaybeWithQuotaProjectId() throws IOException { // No quota project configured: credentials are returned unchanged. assertEquals(credentials, BigQueryServicesImpl.maybeWithQuotaProjectId(credentials, null)); + // Empty quota project is treated as unset. + assertEquals(credentials, BigQueryServicesImpl.maybeWithQuotaProjectId(credentials, "")); // Null credentials pass through. assertNull(BigQueryServicesImpl.maybeWithQuotaProjectId(null, "my-quota-project")); + + // Credentials that don't support a quota project are returned unchanged. + Credentials unsupported = + new Credentials() { + @Override + public String getAuthenticationType() { + return "test"; + } + + @Override + public Map> getRequestMetadata(java.net.URI uri) { + return Collections.emptyMap(); + } + + @Override + public boolean hasRequestMetadata() { + return false; + } + + @Override + public boolean hasRequestMetadataOnly() { + return true; + } + + @Override + public void refresh() {} + }; + assertEquals( + unsupported, BigQueryServicesImpl.maybeWithQuotaProjectId(unsupported, "my-quota-project")); } @Test diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index c09cecd7bff9..fa89c75fccee 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -850,6 +850,34 @@ def test_storage_source_gets_quota_from_options(self): pipeline_options=options) self.assertEqual(source._get_quota_project_id(), 'my-billing-project') + @mock.patch.object(bigquery_tools.BigQueryWrapper, '_bigquery_client') + def test_export_source_split_passes_quota_to_client(self, mock_bq_client): + """split() must pass the transform-level quota_project_id to the BigQuery + client, not only the --quota_project_id pipeline option.""" + mock_bq_client.side_effect = RuntimeError('stop') + source = beam_bq._CustomBigQuerySource( + method=ReadFromBigQuery.Method.EXPORT, + table='project:dataset.table', + quota_project_id='my-billing-project') + with self.assertRaises(RuntimeError): + list(source.split(desired_bundle_size=0)) + mock_bq_client.assert_called_once_with( + source.options, quota_project_id='my-billing-project') + + @mock.patch.object(bigquery_tools.BigQueryWrapper, '_bigquery_client') + def test_storage_source_split_passes_quota_to_client(self, mock_bq_client): + """split() must pass the transform-level quota_project_id to the BigQuery + client, not only the --quota_project_id pipeline option.""" + mock_bq_client.side_effect = RuntimeError('stop') + source = beam_bq._CustomBigQueryStorageSource( + method=ReadFromBigQuery.Method.DIRECT_READ, + table='project:dataset.table', + quota_project_id='my-billing-project') + with self.assertRaises(RuntimeError): + list(source.split(desired_bundle_size=0)) + mock_bq_client.assert_called_once_with( + source.pipeline_options, quota_project_id='my-billing-project') + def test_quota_project_id_in_export_source_display_data(self): """Test that quota_project_id appears in display data for export source.""" source = beam_bq._CustomBigQuerySource(