diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 2e255c8f3cf6..2bb052d5f715 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", "pr": "37345", - "modification": 52 + "modification": 53 } diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 7cc68fd1e4a2..179525de2cb0 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -35,6 +35,7 @@ from typing import Optional from typing import Union +from google.api_core.exceptions import Conflict from google.api_core.exceptions import RetryError from google.cloud import storage from google.cloud.exceptions import NotFound @@ -143,7 +144,17 @@ def get_or_create_default_gcs_bucket(options): 'Creating default GCS bucket for project %s: gs://%s', project, bucket_name) - return gcs.create_bucket(bucket_name, project, location=region) + try: + return gcs.create_bucket(bucket_name, project, location=region) + except Conflict: + bucket = gcs.get_bucket(bucket_name) + if bucket: + _validate_bucket_project( + bucket, + project, + credentials=getattr(gcs.client, '_credentials', None)) + return bucket + raise def create_storage_client(pipeline_options, use_credentials=True): diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py index f5da9b60dbd6..a68b1179faf0 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py @@ -33,6 +33,7 @@ import unittest import uuid import zlib +from hashlib import blake2b import mock import pytest @@ -207,19 +208,17 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name): # requires this option unset. google_cloud_options.dataflow_kms_key = None - import random - from hashlib import blake2b - - # Add a random number to avoid collision if multiple test instances - # are run at the same time. To avoid too many dangling buckets if bucket - # removal fails, we limit the max number of possible bucket names in this - # test to 1000. - overridden_bucket_name = 'gcsio-it-%d-%s-%s-%d' % ( - random.randint(0, 999), + # Add a unique uuid and the parameterized test options to the bucket name + # to avoid collisions when multiple parameterized instances run in parallel + # or concurrent CI jobs run at the same time. + overridden_bucket_name = 'gcsio-it-%s-%s-%s-%d-%s-%s' % ( + uuid.uuid4().hex[:6], google_cloud_options.region, blake2b(google_cloud_options.project.encode('utf8'), - digest_size=4).hexdigest(), - int(time.time())) + digest_size=2).hexdigest(), + int(time.time()), + '1' if self.no_gcsio_throttling_counter else '0', + '1' if self.enable_gcsio_blob_generation else '0') mock_default_gcs_bucket_name.return_value = overridden_bucket_name diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index 9c4414175e48..ddb94f9c17fa 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -487,6 +487,46 @@ def test_get_or_create_default_gcs_bucket_ownership_mock_project_number( self.assertEqual(bucket, mock_bucket) mock_crm_class.assert_not_called() + @mock.patch('google.cloud.resourcemanager_v3.ProjectsClient') + @mock.patch('apache_beam.io.gcp.gcsio.GcsIO') + def test_get_or_create_default_gcs_bucket_conflict( + self, mock_gcsio_class, mock_crm_class): + mock_gcsio = mock_gcsio_class.return_value + mock_bucket = mock.Mock() + mock_bucket.project_number = 123456789 + mock_gcsio.get_bucket.side_effect = [None, mock_bucket] + + from google.api_core.exceptions import Conflict + mock_gcsio.create_bucket.side_effect = Conflict("Already owned by you") + + mock_crm_client = mock_crm_class.return_value + mock_project_info = mock.Mock() + mock_project_info.name = 'projects/123456789' + mock_crm_client.get_project.return_value = mock_project_info + + options = SampleOptions(DEFAULT_GCP_PROJECT, 'us-central1') + bucket = gcsio.get_or_create_default_gcs_bucket(options) + + self.assertEqual(bucket, mock_bucket) + self.assertEqual(mock_gcsio.get_bucket.call_count, 2) + mock_gcsio.create_bucket.assert_called_once() + + @mock.patch('google.cloud.resourcemanager_v3.ProjectsClient') + @mock.patch('apache_beam.io.gcp.gcsio.GcsIO') + def test_get_or_create_default_gcs_bucket_conflict_reraise( + self, mock_gcsio_class, mock_crm_class): + mock_gcsio = mock_gcsio_class.return_value + mock_gcsio.get_bucket.side_effect = [None, None] + + from google.api_core.exceptions import Conflict + mock_gcsio.create_bucket.side_effect = Conflict("Bucket name unavailable") + + options = SampleOptions(DEFAULT_GCP_PROJECT, 'us-central1') + with self.assertRaises(Conflict): + gcsio.get_or_create_default_gcs_bucket(options) + + self.assertEqual(mock_gcsio.get_bucket.call_count, 2) + def test_exists(self): file_name = 'gs://gcsio-test/dummy_file' file_size = 1234