diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index e67c5f2a3708..12b200764578 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -362,6 +362,8 @@ def test_batch_write_with_ordering_key(self): # Retry pulling to handle PubSub delivery delays received_messages = [] + received_message_ids = set() + ack_ids = [] deadline = time.time() + 60 # wait up to 60 seconds while time.time() < deadline: response = self.sub_client.pull( @@ -369,7 +371,13 @@ def test_batch_write_with_ordering_key(self): 'subscription': ordering_sub.name, 'max_messages': 10, }) - received_messages.extend(response.received_messages) + for msg in response.received_messages: + ack_ids.append(msg.ack_id) + # Pub/Sub guarantees at-least-once delivery, so we must deduplicate + # messages by message_id to handle potential duplicate deliveries. + if msg.message.message_id not in received_message_ids: + received_message_ids.add(msg.message.message_id) + received_messages.append(msg) if len(received_messages) >= len(test_messages): break time.sleep(5) @@ -384,12 +392,12 @@ def test_batch_write_with_ordering_key(self): self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1') self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2') - ack_ids = [msg.ack_id for msg in received_messages] - self.sub_client.acknowledge( - request={ - 'subscription': ordering_sub.name, - 'ack_ids': ack_ids, - }) + if ack_ids: + self.sub_client.acknowledge( + request={ + 'subscription': ordering_sub.name, + 'ack_ids': ack_ids, + }) finally: self.sub_client.delete_subscription( request={'subscription': ordering_sub.name})