-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Revert "[Python] Optimize BigQuery copy jobs in file loads using multi-source copy" #39106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| { | ||
| "comment": "Modify this file in a trivial way to cause this test suite to run.", | ||
| "pr": "37345", | ||
| "modification": 50 | ||
| "modification": 51 | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -491,8 +491,6 @@ class TriggerCopyJobs(beam.DoFn): | |||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables' | ||||||||||||||||||||||||||||||
| # https://docs.cloud.google.com/bigquery/quotas#copy_jobs | ||||||||||||||||||||||||||||||
| MAX_SOURCES_PER_COPY_JOB = 1200 | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||
|
|
@@ -530,90 +528,96 @@ def process( | |||||||||||||||||||||||||||||
| self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None): | ||||||||||||||||||||||||||||||
| if isinstance(element_list, tuple): | ||||||||||||||||||||||||||||||
| # Allow this for streaming update compatibility while fixing BEAM-24535. | ||||||||||||||||||||||||||||||
| element_list = [element_list] | ||||||||||||||||||||||||||||||
| self.process_one(element_list, job_name_prefix) | ||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||
| for element in element_list: | ||||||||||||||||||||||||||||||
| self.process_one(element, job_name_prefix) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if not element_list: | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
| def process_one(self, element, job_name_prefix): | ||||||||||||||||||||||||||||||
| destination, job_reference = element | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| first_destination = element_list[0][0] | ||||||||||||||||||||||||||||||
| copy_to_reference = bigquery_tools.parse_table_reference(first_destination) | ||||||||||||||||||||||||||||||
| copy_to_reference = bigquery_tools.parse_table_reference(destination) | ||||||||||||||||||||||||||||||
| if copy_to_reference.projectId is None: | ||||||||||||||||||||||||||||||
| copy_to_reference.projectId = vp.RuntimeValueProvider.get_value( | ||||||||||||||||||||||||||||||
| 'project', str, '') or self.project | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| copy_from_references = [] | ||||||||||||||||||||||||||||||
| for destination, job_reference in element_list: | ||||||||||||||||||||||||||||||
| copy_from_reference = bigquery_tools.parse_table_reference(destination) | ||||||||||||||||||||||||||||||
| copy_from_reference.tableId = job_reference.jobId | ||||||||||||||||||||||||||||||
| if copy_from_reference.projectId is None: | ||||||||||||||||||||||||||||||
| copy_from_reference.projectId = vp.RuntimeValueProvider.get_value( | ||||||||||||||||||||||||||||||
| 'project', str, '') or self.project | ||||||||||||||||||||||||||||||
| copy_from_references.append(copy_from_reference) | ||||||||||||||||||||||||||||||
| copy_from_reference = bigquery_tools.parse_table_reference(destination) | ||||||||||||||||||||||||||||||
| copy_from_reference.tableId = job_reference.jobId | ||||||||||||||||||||||||||||||
| if copy_from_reference.projectId is None: | ||||||||||||||||||||||||||||||
| copy_from_reference.projectId = vp.RuntimeValueProvider.get_value( | ||||||||||||||||||||||||||||||
| 'project', str, '') or self.project | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| full_table_ref = bigquery_tools.get_hashable_destination(copy_to_reference) | ||||||||||||||||||||||||||||||
| _LOGGER.info( | ||||||||||||||||||||||||||||||
| "Triggering copy job from %s to %s", | ||||||||||||||||||||||||||||||
| copy_from_reference, | ||||||||||||||||||||||||||||||
| copy_to_reference) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| is_first_time = full_table_ref not in self._observed_tables | ||||||||||||||||||||||||||||||
| if is_first_time: | ||||||||||||||||||||||||||||||
| self._observed_tables.add(full_table_ref) | ||||||||||||||||||||||||||||||
| if self.bq_io_metadata: | ||||||||||||||||||||||||||||||
| Lineage.sinks().add( | ||||||||||||||||||||||||||||||
| 'bigquery', | ||||||||||||||||||||||||||||||
| copy_to_reference.projectId, | ||||||||||||||||||||||||||||||
| copy_to_reference.datasetId, | ||||||||||||||||||||||||||||||
| copy_to_reference.tableId) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Split into chunks of MAX_SOURCES_PER_COPY_JOB | ||||||||||||||||||||||||||||||
| chunks = [ | ||||||||||||||||||||||||||||||
| copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB] | ||||||||||||||||||||||||||||||
| for i in range( | ||||||||||||||||||||||||||||||
| 0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB) | ||||||||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| copy_job_name_base = '%s_%s' % ( | ||||||||||||||||||||||||||||||
| job_name_prefix, | ||||||||||||||||||||||||||||||
| _bq_uuid(bigquery_tools.get_hashable_destination(copy_to_reference))) | ||||||||||||||||||||||||||||||
| wait_for_job, write_disposition = ( | ||||||||||||||||||||||||||||||
| self._determine_write_disposition(copy_to_reference)) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if not self.bq_io_metadata: | ||||||||||||||||||||||||||||||
| self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| project_id = ( | ||||||||||||||||||||||||||||||
| copy_to_reference.projectId | ||||||||||||||||||||||||||||||
| if self.load_job_project_id is None else self.load_job_project_id) | ||||||||||||||||||||||||||||||
| copy_job_name = '%s_%s' % ( | ||||||||||||||||||||||||||||||
| job_name_prefix, | ||||||||||||||||||||||||||||||
| _bq_uuid( | ||||||||||||||||||||||||||||||
| '%s:%s.%s' % ( | ||||||||||||||||||||||||||||||
| copy_from_reference.projectId, | ||||||||||||||||||||||||||||||
| copy_from_reference.datasetId, | ||||||||||||||||||||||||||||||
| copy_from_reference.tableId))) | ||||||||||||||||||||||||||||||
| job_reference = self.bq_wrapper._insert_copy_job( | ||||||||||||||||||||||||||||||
| project_id, | ||||||||||||||||||||||||||||||
| copy_job_name, | ||||||||||||||||||||||||||||||
| copy_from_reference, | ||||||||||||||||||||||||||||||
| copy_to_reference, | ||||||||||||||||||||||||||||||
| create_disposition=self.create_disposition, | ||||||||||||||||||||||||||||||
| write_disposition=write_disposition, | ||||||||||||||||||||||||||||||
| job_labels=self.bq_io_metadata.add_additional_bq_job_labels()) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if wait_for_job: | ||||||||||||||||||||||||||||||
| self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) | ||||||||||||||||||||||||||||||
| self.pending_jobs.append( | ||||||||||||||||||||||||||||||
| GlobalWindows.windowed_value((destination, job_reference))) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| for i, chunk in enumerate(chunks): | ||||||||||||||||||||||||||||||
| if i == 0 and is_first_time: | ||||||||||||||||||||||||||||||
| write_disposition = self.write_disposition | ||||||||||||||||||||||||||||||
| # Wait inline only if we have multiple chunks and write disposition is WRITE_TRUNCATE or WRITE_EMPTY. | ||||||||||||||||||||||||||||||
| # This ensures the first chunk initializes the table, and subsequent chunks (WRITE_APPEND) append to it. | ||||||||||||||||||||||||||||||
| wait_for_job = ( | ||||||||||||||||||||||||||||||
| self.write_disposition in ('WRITE_TRUNCATE', 'WRITE_EMPTY') and | ||||||||||||||||||||||||||||||
| len(chunks) > 1) | ||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||
| write_disposition = 'WRITE_APPEND' | ||||||||||||||||||||||||||||||
| wait_for_job = False | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| chunk_job_name = copy_job_name_base | ||||||||||||||||||||||||||||||
| if len(chunks) > 1: | ||||||||||||||||||||||||||||||
| chunk_job_name = f"{copy_job_name_base}_{i}" | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| _LOGGER.info( | ||||||||||||||||||||||||||||||
| "Triggering copy job %s from %s to %s (write_disposition: %s)", | ||||||||||||||||||||||||||||||
| chunk_job_name, [str(r) for r in chunk], | ||||||||||||||||||||||||||||||
| copy_to_reference, | ||||||||||||||||||||||||||||||
| write_disposition) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| job_reference = self.bq_wrapper._insert_copy_job( | ||||||||||||||||||||||||||||||
| project_id, | ||||||||||||||||||||||||||||||
| chunk_job_name, | ||||||||||||||||||||||||||||||
| chunk, | ||||||||||||||||||||||||||||||
| copy_to_reference, | ||||||||||||||||||||||||||||||
| create_disposition=self.create_disposition, | ||||||||||||||||||||||||||||||
| write_disposition=write_disposition, | ||||||||||||||||||||||||||||||
| job_labels=self.bq_io_metadata.add_additional_bq_job_labels() | ||||||||||||||||||||||||||||||
| if self.bq_io_metadata else None) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if wait_for_job: | ||||||||||||||||||||||||||||||
| self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| self.pending_jobs.append( | ||||||||||||||||||||||||||||||
| GlobalWindows.windowed_value((first_destination, job_reference))) | ||||||||||||||||||||||||||||||
| def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]: | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| Determines the write disposition for a BigQuery copy job, | ||||||||||||||||||||||||||||||
| based on destination. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs | ||||||||||||||||||||||||||||||
| to the same destination can interfere with each other, truncate data, and | ||||||||||||||||||||||||||||||
| write to the BigQuery table repeatedly. To prevent this, the first copy job | ||||||||||||||||||||||||||||||
| runs with the user's specified write_disposition, but subsequent jobs must | ||||||||||||||||||||||||||||||
| always use WRITE_APPEND. This ensures that subsequent copy jobs do not | ||||||||||||||||||||||||||||||
| clear out data appended by previous jobs. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||
| copy_to_reference: The reference to the destination table. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||
| A tuple containing a boolean indicating whether to wait for the job to | ||||||||||||||||||||||||||||||
| complete and the write disposition to use for the job. | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| full_table_ref = '%s:%s.%s' % ( | ||||||||||||||||||||||||||||||
| copy_to_reference.projectId, | ||||||||||||||||||||||||||||||
| copy_to_reference.datasetId, | ||||||||||||||||||||||||||||||
| copy_to_reference.tableId) | ||||||||||||||||||||||||||||||
| if full_table_ref not in self._observed_tables: | ||||||||||||||||||||||||||||||
| write_disposition = self.write_disposition | ||||||||||||||||||||||||||||||
| wait_for_job = True | ||||||||||||||||||||||||||||||
| self._observed_tables.add(full_table_ref) | ||||||||||||||||||||||||||||||
| Lineage.sinks().add( | ||||||||||||||||||||||||||||||
| 'bigquery', | ||||||||||||||||||||||||||||||
| copy_to_reference.projectId, | ||||||||||||||||||||||||||||||
| copy_to_reference.datasetId, | ||||||||||||||||||||||||||||||
| copy_to_reference.tableId) | ||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||
| wait_for_job = False | ||||||||||||||||||||||||||||||
| write_disposition = 'WRITE_APPEND' | ||||||||||||||||||||||||||||||
| return wait_for_job, write_disposition | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def finish_bundle(self): | ||||||||||||||||||||||||||||||
| for windowed_value in self.pending_jobs: | ||||||||||||||||||||||||||||||
|
|
@@ -740,7 +744,7 @@ def process( | |||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| schema = bigquery_tools.table_schema_to_dict( | ||||||||||||||||||||||||||||||
| self.bq_wrapper.get_table( | ||||||||||||||||||||||||||||||
| bigquery_tools.BigQueryWrapper().get_table( | ||||||||||||||||||||||||||||||
| project_id=table_reference.projectId, | ||||||||||||||||||||||||||||||
| dataset_id=table_reference.datasetId, | ||||||||||||||||||||||||||||||
| table_id=table_reference.tableId).schema) | ||||||||||||||||||||||||||||||
|
|
@@ -851,8 +855,7 @@ def process(self, element): | |||||||||||||||||||||||||||||
| if latest_partition.can_accept(file_size): | ||||||||||||||||||||||||||||||
| latest_partition.add(file_path, file_size) | ||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||
| if latest_partition.files: | ||||||||||||||||||||||||||||||
| partitions.append(latest_partition.files) | ||||||||||||||||||||||||||||||
| partitions.append(latest_partition.files) | ||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||
| latest_partition = PartitionFiles.Partition( | ||||||||||||||||||||||||||||||
| self.max_partition_size, self.max_files_per_partition) | ||||||||||||||||||||||||||||||
| latest_partition.add(file_path, file_size) | ||||||||||||||||||||||||||||||
|
|
@@ -1178,13 +1181,12 @@ def _load_data( | |||||||||||||||||||||||||||||
| # the truncation happens only once. See | ||||||||||||||||||||||||||||||
| # https://github.com/apache/beam/issues/24535. | ||||||||||||||||||||||||||||||
| finished_temp_tables_load_job_ids_list_pc = ( | ||||||||||||||||||||||||||||||
| finished_temp_tables_load_job_ids_pc | ||||||||||||||||||||||||||||||
| | beam.MapTuple( | ||||||||||||||||||||||||||||||
| finished_temp_tables_load_job_ids_pc | beam.MapTuple( | ||||||||||||||||||||||||||||||
| lambda destination, job_reference: ( | ||||||||||||||||||||||||||||||
| bigquery_tools.get_hashable_destination(destination), | ||||||||||||||||||||||||||||||
| bigquery_tools.parse_table_reference(destination).tableId, | ||||||||||||||||||||||||||||||
| (destination, job_reference))) | ||||||||||||||||||||||||||||||
| | beam.GroupByKey() | ||||||||||||||||||||||||||||||
| | beam.MapTuple(lambda dest, batch: list(batch))) | ||||||||||||||||||||||||||||||
| | beam.MapTuple(lambda tableId, batch: list(batch))) | ||||||||||||||||||||||||||||||
|
Comment on lines
+1184
to
+1189
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using only
Suggested change
|
||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||
| # Loads can happen in parallel. | ||||||||||||||||||||||||||||||
| finished_temp_tables_load_job_ids_list_pc = ( | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instantiating
BigQueryWrapperinline on every element/iteration insideprocessis inefficient because it initializes HTTP clients and credentials repeatedly. It is better to reuse the existingself.bq_wrapperinstance.