Skip to content

Fix flaky PubSub integration test by deduplicating messages#39134

Open
shunping wants to merge 2 commits into
apache:masterfrom
shunping:fix-pubsub-flaky-test
Open

Fix flaky PubSub integration test by deduplicating messages#39134
shunping wants to merge 2 commits into
apache:masterfrom
shunping:fix-pubsub-flaky-test

Conversation

@shunping

Copy link
Copy Markdown
Collaborator

PubSub guarantees at-least-once delivery, which can result in duplicate messages being pulled during the test. Deduplicate the received messages by message ID to prevent assertion failures on message count, while still acknowledging all received delivery attempts.

Failed test example:
https://github.com/apache/beam/actions/runs/28270530203/job/83766777493?pr=39130

Traceback:

___________ PubSubIntegrationTest.test_batch_write_with_ordering_key ___________
[gw1] linux -- Python 3.12.13 /runner/_work/beam/beam/build/gradleenv/417525525/bin/python3

self = <apache_beam.io.gcp.pubsub_integration_test.PubSubIntegrationTest testMethod=test_batch_write_with_ordering_key>

    @pytest.mark.it_postcommit
    def test_batch_write_with_ordering_key(self):
      """Test WriteToPubSub in batch mode with ordering keys.
    
      Dataflow's Native Pub/Sub Sink does not support ordering_key
      (see https://github.com/apache/beam/issues/36201), so this test
      only applies to runners using Beam's Python WriteToPubSub Sink.
      Dataflow users should use the XLang WriteToPubSub path instead
      (apache_beam.io.external.gcp.pubsub.WriteToPubSub with
      publish_with_ordering_key=True).
      """
      if self.runner_name == 'TestDataflowRunner':
        self.skipTest(
            'Dataflow Native PubSub Sink does not support ordering_key '
            '(see https://github.com/apache/beam/issues/36201).')
      from google.pubsub_v1.types import Subscription
    
      from apache_beam.options.pipeline_options import PipelineOptions
      from apache_beam.options.pipeline_options import StandardOptions
      from apache_beam.transforms import Create
    
      ordering_topic = self.pub_client.create_topic(
          name=self.pub_client.topic_path(
              self.project, 'psit_topic_ordering' + self.uuid))
      ordering_sub = self.sub_client.create_subscription(
          request=Subscription(
              name=self.sub_client.subscription_path(
                  self.project, 'psit_sub_ordering' + self.uuid),
              topic=ordering_topic.name,
              enable_message_ordering=True,
          ))
      time.sleep(10)
    
      try:
        test_messages = [
            PubsubMessage(
                b'order_data001', {'attr': 'value1'}, ordering_key='key1'),
            PubsubMessage(
                b'order_data002', {'attr': 'value2'}, ordering_key='key1'),
            PubsubMessage(
                b'order_data003', {'attr': 'value3'}, ordering_key='key2'),
        ]
    
        pipeline_options = PipelineOptions()
        pipeline_options.view_as(StandardOptions).streaming = False
    
        with TestPipeline(options=pipeline_options) as p:
          messages = p | 'CreateMessages' >> Create(test_messages)
          _ = messages | 'WriteToPubSub' >> WriteToPubSub(
              ordering_topic.name,
              with_attributes=True,
              publish_with_ordering_key=True)
    
        time.sleep(10)
    
        # Retry pulling to handle PubSub delivery delays
        received_messages = []
        deadline = time.time() + 60  # wait up to 60 seconds
        while time.time() < deadline:
          response = self.sub_client.pull(
              request={
                  'subscription': ordering_sub.name,
                  'max_messages': 10,
              })
          received_messages.extend(response.received_messages)
          if len(received_messages) >= len(test_messages):
            break
          time.sleep(5)
    
>       self.assertEqual(len(received_messages), len(test_messages))
E       AssertionError: 4 != 3

apache_beam/io/gcp/pubsub_integration_test.py:377: AssertionError

PubSub guarantees at-least-once delivery, which can result in duplicate
messages being pulled during the test. Deduplicate the received messages
by message ID to prevent assertion failures on message count, while still
acknowledging all received delivery attempts.
@shunping shunping marked this pull request as ready for review June 27, 2026 09:57
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses flakiness in the PubSub integration tests caused by duplicate message deliveries. By introducing a deduplication mechanism based on message IDs, the test now correctly validates the expected number of unique messages received, ensuring stable execution in environments where Pub/Sub may deliver messages more than once.

Highlights

  • Message Deduplication: Implemented message deduplication using message IDs to account for Pub/Sub's at-least-once delivery guarantee, preventing test failures due to duplicate message counts.
  • Robust Acknowledgement: Updated the acknowledgement logic to collect all received message IDs and conditionally perform the acknowledgement only if messages were retrieved.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the Pub/Sub integration test to deduplicate received messages by message_id and handle potential duplicate deliveries. The reviewer suggests acknowledging messages immediately inside the pull loop to prevent redeliveries caused by the 10-second acknowledgment deadline, which also allows for the removal of the redundant post-loop acknowledgment block.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +366 to 383
ack_ids = []
deadline = time.time() + 60 # wait up to 60 seconds
while time.time() < deadline:
response = self.sub_client.pull(
request={
'subscription': ordering_sub.name,
'max_messages': 10,
})
received_messages.extend(response.received_messages)
for msg in response.received_messages:
ack_ids.append(msg.ack_id)
# Pub/Sub guarantees at-least-once delivery, so we must deduplicate
# messages by message_id to handle potential duplicate deliveries.
if msg.message.message_id not in received_message_ids:
received_message_ids.add(msg.message.message_id)
received_messages.append(msg)
if len(received_messages) >= len(test_messages):
break
time.sleep(5)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Acknowledging received messages immediately inside the pull loop is highly recommended. Since the retry loop can run for up to 60 seconds, which exceeds the default Pub/Sub acknowledgment deadline of 10 seconds, any unacknowledged messages will be redelivered by Pub/Sub. This leads to unnecessary duplicate deliveries and potential test flakiness.

Acknowledging immediately inside the loop avoids these redeliveries and simplifies the code by removing the need to accumulate ack_ids and perform a bulk acknowledgment at the end.

      deadline = time.time() + 60  # wait up to 60 seconds
      while time.time() < deadline:
        response = self.sub_client.pull(
            request={
                'subscription': ordering_sub.name,
                'max_messages': 10,
            })
        if response.received_messages:
          self.sub_client.acknowledge(
              request={
                  'subscription': ordering_sub.name,
                  'ack_ids': [msg.ack_id for msg in response.received_messages],
              })
          for msg in response.received_messages:
            # Pub/Sub guarantees at-least-once delivery, so we must deduplicate
            # messages by message_id to handle potential duplicate deliveries.
            if msg.message.message_id not in received_message_ids:
              received_message_ids.add(msg.message.message_id)
              received_messages.append(msg)
        if len(received_messages) >= len(test_messages):
          break
        time.sleep(5)

Comment on lines +395 to +400
if ack_ids:
self.sub_client.acknowledge(
request={
'subscription': ordering_sub.name,
'ack_ids': ack_ids,
})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since messages are now acknowledged immediately inside the pull loop, this post-loop acknowledgment block is redundant and can be safely removed.

@github-actions

Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @tvalentyn for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@shunping shunping changed the title Deduplicate messages in PubSub ordering key integration test Fix flaky PubSub integration test by deduplicating messages in Jun 27, 2026
@shunping shunping changed the title Fix flaky PubSub integration test by deduplicating messages in Fix flaky PubSub integration test by deduplicating messages Jun 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant