Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "37345",
"modification": 50
"modification": 51
}
162 changes: 82 additions & 80 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,6 @@ class TriggerCopyJobs(beam.DoFn):
"""

TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables'
# https://docs.cloud.google.com/bigquery/quotas#copy_jobs
MAX_SOURCES_PER_COPY_JOB = 1200

def __init__(
self,
Expand Down Expand Up @@ -530,90 +528,96 @@ def process(
self, element_list, job_name_prefix=None, unused_schema_mod_jobs=None):
if isinstance(element_list, tuple):
# Allow this for streaming update compatibility while fixing BEAM-24535.
element_list = [element_list]
self.process_one(element_list, job_name_prefix)
else:
for element in element_list:
self.process_one(element, job_name_prefix)

if not element_list:
return
def process_one(self, element, job_name_prefix):
destination, job_reference = element

first_destination = element_list[0][0]
copy_to_reference = bigquery_tools.parse_table_reference(first_destination)
copy_to_reference = bigquery_tools.parse_table_reference(destination)
if copy_to_reference.projectId is None:
copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '') or self.project

copy_from_references = []
for destination, job_reference in element_list:
copy_from_reference = bigquery_tools.parse_table_reference(destination)
copy_from_reference.tableId = job_reference.jobId
if copy_from_reference.projectId is None:
copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '') or self.project
copy_from_references.append(copy_from_reference)
copy_from_reference = bigquery_tools.parse_table_reference(destination)
copy_from_reference.tableId = job_reference.jobId
if copy_from_reference.projectId is None:
copy_from_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '') or self.project

full_table_ref = bigquery_tools.get_hashable_destination(copy_to_reference)
_LOGGER.info(
"Triggering copy job from %s to %s",
copy_from_reference,
copy_to_reference)

is_first_time = full_table_ref not in self._observed_tables
if is_first_time:
self._observed_tables.add(full_table_ref)
if self.bq_io_metadata:
Lineage.sinks().add(
'bigquery',
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)

# Split into chunks of MAX_SOURCES_PER_COPY_JOB
chunks = [
copy_from_references[i:i + self.MAX_SOURCES_PER_COPY_JOB]
for i in range(
0, len(copy_from_references), self.MAX_SOURCES_PER_COPY_JOB)
]

copy_job_name_base = '%s_%s' % (
job_name_prefix,
_bq_uuid(bigquery_tools.get_hashable_destination(copy_to_reference)))
wait_for_job, write_disposition = (
self._determine_write_disposition(copy_to_reference))

if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)

project_id = (
copy_to_reference.projectId
if self.load_job_project_id is None else self.load_job_project_id)
copy_job_name = '%s_%s' % (
job_name_prefix,
_bq_uuid(
'%s:%s.%s' % (
copy_from_reference.projectId,
copy_from_reference.datasetId,
copy_from_reference.tableId)))
job_reference = self.bq_wrapper._insert_copy_job(
project_id,
copy_job_name,
copy_from_reference,
copy_to_reference,
create_disposition=self.create_disposition,
write_disposition=write_disposition,
job_labels=self.bq_io_metadata.add_additional_bq_job_labels())

if wait_for_job:
self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
self.pending_jobs.append(
GlobalWindows.windowed_value((destination, job_reference)))

for i, chunk in enumerate(chunks):
if i == 0 and is_first_time:
write_disposition = self.write_disposition
# Wait inline only if we have multiple chunks and write disposition is WRITE_TRUNCATE or WRITE_EMPTY.
# This ensures the first chunk initializes the table, and subsequent chunks (WRITE_APPEND) append to it.
wait_for_job = (
self.write_disposition in ('WRITE_TRUNCATE', 'WRITE_EMPTY') and
len(chunks) > 1)
else:
write_disposition = 'WRITE_APPEND'
wait_for_job = False

chunk_job_name = copy_job_name_base
if len(chunks) > 1:
chunk_job_name = f"{copy_job_name_base}_{i}"

_LOGGER.info(
"Triggering copy job %s from %s to %s (write_disposition: %s)",
chunk_job_name, [str(r) for r in chunk],
copy_to_reference,
write_disposition)

job_reference = self.bq_wrapper._insert_copy_job(
project_id,
chunk_job_name,
chunk,
copy_to_reference,
create_disposition=self.create_disposition,
write_disposition=write_disposition,
job_labels=self.bq_io_metadata.add_additional_bq_job_labels()
if self.bq_io_metadata else None)

if wait_for_job:
self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)

self.pending_jobs.append(
GlobalWindows.windowed_value((first_destination, job_reference)))
def _determine_write_disposition(self, copy_to_reference) -> tuple[bool, str]:
"""
Determines the write disposition for a BigQuery copy job,
based on destination.

When the write_disposition for a job is WRITE_TRUNCATE, multiple copy jobs
to the same destination can interfere with each other, truncate data, and
write to the BigQuery table repeatedly. To prevent this, the first copy job
runs with the user's specified write_disposition, but subsequent jobs must
always use WRITE_APPEND. This ensures that subsequent copy jobs do not
clear out data appended by previous jobs.

Args:
copy_to_reference: The reference to the destination table.

Returns:
A tuple containing a boolean indicating whether to wait for the job to
complete and the write disposition to use for the job.
"""
full_table_ref = '%s:%s.%s' % (
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
if full_table_ref not in self._observed_tables:
write_disposition = self.write_disposition
wait_for_job = True
self._observed_tables.add(full_table_ref)
Lineage.sinks().add(
'bigquery',
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)
else:
wait_for_job = False
write_disposition = 'WRITE_APPEND'
return wait_for_job, write_disposition

def finish_bundle(self):
for windowed_value in self.pending_jobs:
Expand Down Expand Up @@ -740,7 +744,7 @@ def process(
else:
try:
schema = bigquery_tools.table_schema_to_dict(
self.bq_wrapper.get_table(
bigquery_tools.BigQueryWrapper().get_table(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instantiating BigQueryWrapper inline on every element/iteration inside process is inefficient because it initializes HTTP clients and credentials repeatedly. It is better to reuse the existing self.bq_wrapper instance.

Suggested change
bigquery_tools.BigQueryWrapper().get_table(
self.bq_wrapper.get_table(

project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId).schema)
Expand Down Expand Up @@ -851,8 +855,7 @@ def process(self, element):
if latest_partition.can_accept(file_size):
latest_partition.add(file_path, file_size)
else:
if latest_partition.files:
partitions.append(latest_partition.files)
partitions.append(latest_partition.files)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Without checking if latest_partition.files is non-empty, an empty list could be appended to partitions if the first file cannot be accepted. Adding a guard prevents empty partitions from being created.

        if latest_partition.files:
          partitions.append(latest_partition.files)

latest_partition = PartitionFiles.Partition(
self.max_partition_size, self.max_files_per_partition)
latest_partition.add(file_path, file_size)
Expand Down Expand Up @@ -1178,13 +1181,12 @@ def _load_data(
# the truncation happens only once. See
# https://github.com/apache/beam/issues/24535.
finished_temp_tables_load_job_ids_list_pc = (
finished_temp_tables_load_job_ids_pc
| beam.MapTuple(
finished_temp_tables_load_job_ids_pc | beam.MapTuple(
lambda destination, job_reference: (
bigquery_tools.get_hashable_destination(destination),
bigquery_tools.parse_table_reference(destination).tableId,
(destination, job_reference)))
| beam.GroupByKey()
| beam.MapTuple(lambda dest, batch: list(batch)))
| beam.MapTuple(lambda tableId, batch: list(batch)))
Comment on lines +1184 to +1189

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using only tableId as the grouping key can cause collisions if the pipeline writes to multiple datasets or projects that contain tables with the same name (e.g., dataset_A.my_table and dataset_B.my_table). Using get_hashable_destination ensures that the full destination (project, dataset, and table) is used as the key, preventing incorrect grouping and potential data corruption.

Suggested change
finished_temp_tables_load_job_ids_pc | beam.MapTuple(
lambda destination, job_reference: (
bigquery_tools.get_hashable_destination(destination),
bigquery_tools.parse_table_reference(destination).tableId,
(destination, job_reference)))
| beam.GroupByKey()
| beam.MapTuple(lambda dest, batch: list(batch)))
| beam.MapTuple(lambda tableId, batch: list(batch)))
finished_temp_tables_load_job_ids_pc | beam.MapTuple(
lambda destination, job_reference: (
bigquery_tools.get_hashable_destination(destination),
(destination, job_reference)))
| beam.GroupByKey()
| beam.MapTuple(lambda dest, batch: list(batch)))

else:
# Loads can happen in parallel.
finished_temp_tables_load_job_ids_list_pc = (
Expand Down
Loading
Loading