From 137d2caee4aa5b36ef472b6402305f9be8903084 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 26 Jun 2026 18:28:22 -0400 Subject: [PATCH 1/4] Fix flaky GcsIOIntegrationTest and handle GCS bucket creation race conditions * Modify get_or_create_default_gcs_bucket in gcsio.py to catch Conflict (409) exceptions, resolving to the existing bucket on concurrent creation requests. * Add unit tests for default bucket creation conflict handling in gcsio_test.py. * Modify test_create_default_bucket in gcsio_integration_test.py to generate unique bucket names with a UUID prefix and test parameters to eliminate parallel test thread collisions. --- sdks/python/apache_beam/io/gcp/gcsio.py | 16 +++++++- .../io/gcp/gcsio_integration_test.py | 17 ++++---- sdks/python/apache_beam/io/gcp/gcsio_test.py | 40 +++++++++++++++++++ 3 files changed, 64 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 7cc68fd1e4a2..79bd2b6ddd04 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -35,6 +35,7 @@ from typing import Optional from typing import Union +from google.api_core.exceptions import Conflict from google.api_core.exceptions import RetryError from google.cloud import storage from google.cloud.exceptions import NotFound @@ -143,7 +144,20 @@ def get_or_create_default_gcs_bucket(options): 'Creating default GCS bucket for project %s: gs://%s', project, bucket_name) - return gcs.create_bucket(bucket_name, project, location=region) + try: + return gcs.create_bucket(bucket_name, project, location=region) + except Conflict: + try: + bucket = gcs.get_bucket(bucket_name) + except Exception: + raise + if bucket: + _validate_bucket_project( + bucket, + project, + credentials=getattr(gcs.client, '_credentials', None)) + return bucket + raise def create_storage_client(pipeline_options, use_credentials=True): diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index f5da9b60dbd6..7e499452d30e 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -207,19 +207,20 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): # requires this option unset. google_cloud_options.dataflow_kms_key = None - import random from hashlib import blake2b + import uuid - # Add a random number to avoid collision if multiple test instances - # are run at the same time. To avoid too many dangling buckets if bucket - # removal fails, we limit the max number of possible bucket names in this - # test to 1000. - overridden_bucket_name = 'gcsio-it-%d-%s-%s-%d' % ( - random.randint(0, 999), + # Add a unique uuid and the parameterized test options to the bucket name + # to avoid collisions when multiple parameterized instances run in parallel + # or concurrent CI jobs run at the same time. + overridden_bucket_name = 'gcsio-it-%s-%s-%s-%d-%s-%s' % ( + uuid.uuid4().hex[:8], google_cloud_options.region, blake2b(google_cloud_options.project.encode('utf8'), digest_size=4).hexdigest(), - int(time.time())) + int(time.time()), + str(self.no_gcsio_throttling_counter).lower(), + str(self.enable_gcsio_blob_generation).lower()) mock_default_gcs_bucket_name.return_value = overridden_bucket_name diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 9c4414175e48..ddb94f9c17fa 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -487,6 +487,46 @@ def test_get_or_create_default_gcs_bucket_ownership_mock_project_number( self.assertEqual(bucket, mock_bucket) mock_crm_class.assert_not_called() + @mock.patch('google.cloud.resourcemanager_v3.ProjectsClient') + @mock.patch('apache_beam.io.gcp.gcsio.GcsIO') + def test_get_or_create_default_gcs_bucket_conflict( + self, mock_gcsio_class, mock_crm_class): + mock_gcsio = mock_gcsio_class.return_value + mock_bucket = mock.Mock() + mock_bucket.project_number = 123456789 + mock_gcsio.get_bucket.side_effect = [None, mock_bucket] + + from google.api_core.exceptions import Conflict + mock_gcsio.create_bucket.side_effect = Conflict("Already owned by you") + + mock_crm_client = mock_crm_class.return_value + mock_project_info = mock.Mock() + mock_project_info.name = 'projects/123456789' + mock_crm_client.get_project.return_value = mock_project_info + + options = SampleOptions(DEFAULT_GCP_PROJECT, 'us-central1') + bucket = gcsio.get_or_create_default_gcs_bucket(options) + + self.assertEqual(bucket, mock_bucket) + self.assertEqual(mock_gcsio.get_bucket.call_count, 2) + mock_gcsio.create_bucket.assert_called_once() + + @mock.patch('google.cloud.resourcemanager_v3.ProjectsClient') + @mock.patch('apache_beam.io.gcp.gcsio.GcsIO') + def test_get_or_create_default_gcs_bucket_conflict_reraise( + self, mock_gcsio_class, mock_crm_class): + mock_gcsio = mock_gcsio_class.return_value + mock_gcsio.get_bucket.side_effect = [None, None] + + from google.api_core.exceptions import Conflict + mock_gcsio.create_bucket.side_effect = Conflict("Bucket name unavailable") + + options = SampleOptions(DEFAULT_GCP_PROJECT, 'us-central1') + with self.assertRaises(Conflict): + gcsio.get_or_create_default_gcs_bucket(options) + + self.assertEqual(mock_gcsio.get_bucket.call_count, 2) + def test_exists(self): file_name = 'gs://gcsio-test/dummy_file' file_size = 1234 From 6c9508d87ebcfa81c8f5e4c6cffd4a82615fa46f Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 26 Jun 2026 19:14:28 -0400 Subject: [PATCH 2/4] Fix lints --- sdks/python/apache_beam/io/gcp/gcsio_integration_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index 7e499452d30e..42e1bd31175a 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -33,6 +33,7 @@ import unittest import uuid import zlib +from hashlib import blake2b import mock import pytest @@ -207,9 +208,6 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): # requires this option unset. google_cloud_options.dataflow_kms_key = None - from hashlib import blake2b - import uuid - # Add a unique uuid and the parameterized test options to the bucket name # to avoid collisions when multiple parameterized instances run in parallel # or concurrent CI jobs run at the same time. From a231ee01efdaf4568b7eb56e72229d8cd414a53e Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 26 Jun 2026 19:15:39 -0400 Subject: [PATCH 3/4] Trigger postcommit tests. --- .github/trigger_files/beam_PostCommit_Python.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 2e255c8f3cf6..2bb052d5f715 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", "pr": "37345", - "modification": 52 + "modification": 53 } From 36c7724704bf637040e94f0f95d58c7655ba88ec Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Sat, 27 Jun 2026 08:07:59 -0400 Subject: [PATCH 4/4] Address reviews --- sdks/python/apache_beam/io/gcp/gcsio.py | 5 +---- sdks/python/apache_beam/io/gcp/gcsio_integration_test.py | 8 ++++---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 79bd2b6ddd04..179525de2cb0 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -147,10 +147,7 @@ def get_or_create_default_gcs_bucket(options): try: return gcs.create_bucket(bucket_name, project, location=region) except Conflict: - try: - bucket = gcs.get_bucket(bucket_name) - except Exception: - raise + bucket = gcs.get_bucket(bucket_name) if bucket: _validate_bucket_project( bucket, diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index 42e1bd31175a..a68b1179faf0 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -212,13 +212,13 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): # to avoid collisions when multiple parameterized instances run in parallel # or concurrent CI jobs run at the same time. overridden_bucket_name = 'gcsio-it-%s-%s-%s-%d-%s-%s' % ( - uuid.uuid4().hex[:8], + uuid.uuid4().hex[:6], google_cloud_options.region, blake2b(google_cloud_options.project.encode('utf8'), - digest_size=4).hexdigest(), + digest_size=2).hexdigest(), int(time.time()), - str(self.no_gcsio_throttling_counter).lower(), - str(self.enable_gcsio_blob_generation).lower()) + '1' if self.no_gcsio_throttling_counter else '0', + '1' if self.enable_gcsio_blob_generation else '0') mock_default_gcs_bucket_name.return_value = overridden_bucket_name