diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index c03ecf71f04d..98bf4bf95003 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", "pr": "37345", - "modification": 50 + "modification": 51 } diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 4ef6c392254b..4e45d0324ee2 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -491,8 +491,6 @@ class TriggerCopyJobs(beam.DoFn): """ TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables' - # https://docs.cloud.google.com/bigquery/quotas#copy_jobs - MAX_SOURCES_PER_COPY_JOB = 1200 def __init__( self, @@ -530,90 +528,96 @@ def process( self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None): if isinstance(element_list, tuple): # Allow this for streaming update compatibility while fixing BEAM-24535. - element_list = [element_list] + self.process_one(element_list, job_name_prefix) + else: + for element in element_list: + self.process_one(element, job_name_prefix) - if not element_list: - return + def process_one(self, element, job_name_prefix): + destination, job_reference = element - first_destination = element_list[0][0] - copy_to_reference = bigquery_tools.parse_table_reference(first_destination) + copy_to_reference = bigquery_tools.parse_table_reference(destination) if copy_to_reference.projectId is None: copy_to_reference.projectId = vp.RuntimeValueProvider.get_value( 'project', str, '') or self.project - copy_from_references = [] - for destination, job_reference in element_list: - copy_from_reference = bigquery_tools.parse_table_reference(destination) - copy_from_reference.tableId = job_reference.jobId - if copy_from_reference.projectId is None: - copy_from_reference.projectId = vp.RuntimeValueProvider.get_value( - 'project', str, '') or self.project - copy_from_references.append(copy_from_reference) + copy_from_reference = bigquery_tools.parse_table_reference(destination) + copy_from_reference.tableId = job_reference.jobId + if copy_from_reference.projectId is None: + copy_from_reference.projectId = vp.RuntimeValueProvider.get_value( + 'project', str, '') or self.project - full_table_ref = bigquery_tools.get_hashable_destination(copy_to_reference) + _LOGGER.info( + "Triggering copy job from %s to %s", + copy_from_reference, + copy_to_reference) - is_first_time = full_table_ref not in self._observed_tables - if is_first_time: - self._observed_tables.add(full_table_ref) - if self.bq_io_metadata: - Lineage.sinks().add( - 'bigquery', - copy_to_reference.projectId, - copy_to_reference.datasetId, - copy_to_reference.tableId) - - # Split into chunks of MAX_SOURCES_PER_COPY_JOB - chunks = [ - copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB] - for i in range( - 0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB) - ] - - copy_job_name_base = '%s_%s' % ( - job_name_prefix, - _bq_uuid(bigquery_tools.get_hashable_destination(copy_to_reference))) + wait_for_job, write_disposition = ( + self._determine_write_disposition(copy_to_reference)) + + if not self.bq_io_metadata: + self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) project_id = ( copy_to_reference.projectId if self.load_job_project_id is None else self.load_job_project_id) + copy_job_name = '%s_%s' % ( + job_name_prefix, + _bq_uuid( + '%s:%s.%s' % ( + copy_from_reference.projectId, + copy_from_reference.datasetId, + copy_from_reference.tableId))) + job_reference = self.bq_wrapper._insert_copy_job( + project_id, + copy_job_name, + copy_from_reference, + copy_to_reference, + create_disposition=self.create_disposition, + write_disposition=write_disposition, + job_labels=self.bq_io_metadata.add_additional_bq_job_labels()) + + if wait_for_job: + self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) + self.pending_jobs.append( + GlobalWindows.windowed_value((destination, job_reference))) - for i, chunk in enumerate(chunks): - if i == 0 and is_first_time: - write_disposition = self.write_disposition - # Wait inline only if we have multiple chunks and write disposition is WRITE_TRUNCATE or WRITE_EMPTY. - # This ensures the first chunk initializes the table, and subsequent chunks (WRITE_APPEND) append to it. - wait_for_job = ( - self.write_disposition in ('WRITE_TRUNCATE', 'WRITE_EMPTY') and - len(chunks) > 1) - else: - write_disposition = 'WRITE_APPEND' - wait_for_job = False - - chunk_job_name = copy_job_name_base - if len(chunks) > 1: - chunk_job_name = f"{copy_job_name_base}_{i}" - - _LOGGER.info( - "Triggering copy job %s from %s to %s (write_disposition: %s)", - chunk_job_name, [str(r) for r in chunk], - copy_to_reference, - write_disposition) - - job_reference = self.bq_wrapper._insert_copy_job( - project_id, - chunk_job_name, - chunk, - copy_to_reference, - create_disposition=self.create_disposition, - write_disposition=write_disposition, - job_labels=self.bq_io_metadata.add_additional_bq_job_labels() - if self.bq_io_metadata else None) - - if wait_for_job: - self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) - - self.pending_jobs.append( - GlobalWindows.windowed_value((first_destination, job_reference))) + def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]: + """ + Determines the write disposition for a BigQuery copy job, + based on destination. + + When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs + to the same destination can interfere with each other, truncate data, and + write to the BigQuery table repeatedly. To prevent this, the first copy job + runs with the user's specified write_disposition, but subsequent jobs must + always use WRITE_APPEND. This ensures that subsequent copy jobs do not + clear out data appended by previous jobs. + + Args: + copy_to_reference: The reference to the destination table. + + Returns: + A tuple containing a boolean indicating whether to wait for the job to + complete and the write disposition to use for the job. + """ + full_table_ref = '%s:%s.%s' % ( + copy_to_reference.projectId, + copy_to_reference.datasetId, + copy_to_reference.tableId) + if full_table_ref not in self._observed_tables: + write_disposition = self.write_disposition + wait_for_job = True + self._observed_tables.add(full_table_ref) + Lineage.sinks().add( + 'bigquery', + copy_to_reference.projectId, + copy_to_reference.datasetId, + copy_to_reference.tableId) + else: + wait_for_job = False + write_disposition = 'WRITE_APPEND' + return wait_for_job, write_disposition def finish_bundle(self): for windowed_value in self.pending_jobs: @@ -740,7 +744,7 @@ def process( else: try: schema = bigquery_tools.table_schema_to_dict( - self.bq_wrapper.get_table( + bigquery_tools.BigQueryWrapper().get_table( project_id=table_reference.projectId, dataset_id=table_reference.datasetId, table_id=table_reference.tableId).schema) @@ -851,8 +855,7 @@ def process(self, element): if latest_partition.can_accept(file_size): latest_partition.add(file_path, file_size) else: - if latest_partition.files: - partitions.append(latest_partition.files) + partitions.append(latest_partition.files) latest_partition = PartitionFiles.Partition( self.max_partition_size, self.max_files_per_partition) latest_partition.add(file_path, file_size) @@ -1178,13 +1181,12 @@ def _load_data( # the truncation happens only once. See # https://github.com/apache/beam/issues/24535. finished_temp_tables_load_job_ids_list_pc = ( - finished_temp_tables_load_job_ids_pc - | beam.MapTuple( + finished_temp_tables_load_job_ids_pc | beam.MapTuple( lambda destination, job_reference: ( - bigquery_tools.get_hashable_destination(destination), + bigquery_tools.parse_table_reference(destination).tableId, (destination, job_reference))) | beam.GroupByKey() - | beam.MapTuple(lambda dest, batch: list(batch))) + | beam.MapTuple(lambda tableId, batch: list(batch))) else: # Loads can happen in parallel. finished_temp_tables_load_job_ids_list_pc = ( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 47c1ce5ea1bb..191719e6a208 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -924,180 +924,69 @@ def dynamic_destination_resolver(element, *side_inputs): write_disposition=BigQueryDisposition.WRITE_TRUNCATE)) from apache_beam.io.gcp.internal.clients.bigquery import TableReference - mock_insert_copy_job.assert_has_calls([ - call( - 'project1', - mock.ANY, - [ + mock_insert_copy_job.assert_has_calls( + [ + call( + 'project1', + mock.ANY, TableReference( datasetId='dataset1', projectId='project1', tableId='job_name1'), + TableReference( + datasetId='dataset1', + projectId='project1', + tableId='table1'), + create_disposition=None, + write_disposition='WRITE_TRUNCATE', + job_labels={'step_name': 'bigquerybatchfileloads'}), + call( + 'project1', + mock.ANY, TableReference( datasetId='dataset1', projectId='project1', tableId='job_name1'), - ], - TableReference( - datasetId='dataset1', projectId='project1', tableId='table1'), - create_disposition=None, - write_disposition='WRITE_TRUNCATE', - job_labels={'step_name': 'bigquerybatchfileloads'}), - call( - 'project1', - mock.ANY, - [ + TableReference( + datasetId='dataset1', + projectId='project1', + tableId='table1'), + create_disposition=None, + write_disposition='WRITE_APPEND', + job_labels={'step_name': 'bigquerybatchfileloads'}), + call( + 'project1', + mock.ANY, TableReference( datasetId='dataset2', projectId='project1', tableId='job_name1'), - ], - TableReference( - datasetId='dataset2', projectId='project1', tableId='table1'), - create_disposition=None, - write_disposition='WRITE_TRUNCATE', - job_labels={'step_name': 'bigquerybatchfileloads'}), - call( - 'project1', - mock.ANY, - [ + TableReference( + datasetId='dataset2', + projectId='project1', + tableId='table1'), + create_disposition=None, + # Previously this was `WRITE_APPEND`. + write_disposition='WRITE_TRUNCATE', + job_labels={'step_name': 'bigquerybatchfileloads'}), + call( + 'project1', + mock.ANY, TableReference( datasetId='dataset3', projectId='project1', tableId='job_name1'), - ], - TableReference( - datasetId='dataset3', projectId='project1', tableId='table1'), - create_disposition=None, - write_disposition='WRITE_TRUNCATE', - job_labels={'step_name': 'bigquerybatchfileloads'}), - ], - any_order=True) - self.assertEqual(3, mock_insert_copy_job.call_count) - - @mock.patch( - 'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper.wait_for_bq_job') - @mock.patch( - 'apache_beam.io.gcp.bigquery_tools.BigQueryWrapper._insert_copy_job') - def test_copy_jobs_splitting( - self, mock_insert_copy_job, mock_wait_for_bq_job): - destination = 'project1:dataset1.table1' - - from apache_beam.io.gcp.bigquery_file_loads import TriggerCopyJobs - original_max_sources = TriggerCopyJobs.MAX_SOURCES_PER_COPY_JOB - TriggerCopyJobs.MAX_SOURCES_PER_COPY_JOB = 2 - - try: - job_reference = bigquery_api.JobReference() - job_reference.projectId = 'project1' - job_reference.jobId = 'job_name1' - result_job = mock.Mock() - result_job.jobReference = job_reference - - mock_job = mock.Mock() - mock_job.status.state = 'DONE' - mock_job.status.errorResult = None - mock_job.jobReference = job_reference - - bq_client = mock.Mock() - bq_client.jobs.Get.return_value = mock_job - bq_client.jobs.Insert.return_value = result_job - bq_client.tables.Delete.return_value = None - mock_insert_copy_job.return_value = job_reference - temp_dir = self._new_tempdir() - - with TestPipeline('FnApiRunner') as p: - _ = ( - p - | beam.Create([ - { - 'name': 'a' - }, - { - 'name': 'b' - }, - { - 'name': 'c' - }, - { - 'name': 'd' - }, - { - 'name': 'e' - }, - ], - reshuffle=False) - | bqfl.BigQueryBatchFileLoads( - destination, - custom_gcs_temp_location=temp_dir, - test_client=bq_client, - validate=False, - temp_file_format=bigquery_tools.FileFormat.JSON, - max_file_size=10, - max_partition_size=10, - max_files_per_partition=1, - write_disposition=BigQueryDisposition.WRITE_TRUNCATE)) - - self.assertEqual(3, mock_insert_copy_job.call_count) - - from apache_beam.io.gcp.internal.clients.bigquery import TableReference - expected_calls = [ - call( - 'project1', - mock.ANY, - [ - TableReference( - datasetId='dataset1', - projectId='project1', - tableId='job_name1'), - TableReference( - datasetId='dataset1', - projectId='project1', - tableId='job_name1'), - ], - TableReference( - datasetId='dataset1', projectId='project1', tableId='table1'), - create_disposition=None, - write_disposition='WRITE_TRUNCATE', - job_labels=mock.ANY), - call( - 'project1', - mock.ANY, - [ - TableReference( - datasetId='dataset1', - projectId='project1', - tableId='job_name1'), - TableReference( - datasetId='dataset1', - projectId='project1', - tableId='job_name1'), - ], - TableReference( - datasetId='dataset1', projectId='project1', tableId='table1'), - create_disposition=None, - write_disposition='WRITE_APPEND', - job_labels=mock.ANY), - call( - 'project1', - mock.ANY, - [ - TableReference( - datasetId='dataset1', - projectId='project1', - tableId='job_name1'), - ], - TableReference( - datasetId='dataset1', projectId='project1', tableId='table1'), - create_disposition=None, - write_disposition='WRITE_APPEND', - job_labels=mock.ANY), - ] - mock_insert_copy_job.assert_has_calls(expected_calls, any_order=True) - self.assertEqual(9, mock_wait_for_bq_job.call_count) - - finally: - TriggerCopyJobs.MAX_SOURCES_PER_COPY_JOB = original_max_sources + TableReference( + datasetId='dataset3', + projectId='project1', + tableId='table1'), + create_disposition=None, + # Previously this was `WRITE_APPEND`. + write_disposition='WRITE_TRUNCATE', + job_labels={'step_name': 'bigquerybatchfileloads'}), + ], + any_order=True) + self.assertEqual(4, mock_insert_copy_job.call_count) @parameterized.expand([ param(is_streaming=False, with_auto_sharding=False, compat_version=None), diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 491b7a39b0b7..8dd58cd55a01 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -506,22 +506,16 @@ def _insert_copy_job( reference = bigquery.JobReference() reference.jobId = job_id reference.projectId = project_id - - copy_config = bigquery.JobConfigurationTableCopy( - destinationTable=to_table_reference, - createDisposition=create_disposition, - writeDisposition=write_disposition, - ) - if isinstance(from_table_reference, list): - copy_config.sourceTables = from_table_reference - else: - copy_config.sourceTable = from_table_reference - request = bigquery.BigqueryJobsInsertRequest( projectId=project_id, job=bigquery.Job( configuration=bigquery.JobConfiguration( - copy=copy_config, + copy=bigquery.JobConfigurationTableCopy( + destinationTable=to_table_reference, + sourceTable=from_table_reference, + createDisposition=create_disposition, + writeDisposition=write_disposition, + ), labels=_build_job_labels(job_labels), ), jobReference=reference,