Fix flaky PubSub integration test by deduplicating messages#39134
Fix flaky PubSub integration test by deduplicating messages#39134shunping wants to merge 2 commits into
Conversation
PubSub guarantees at-least-once delivery, which can result in duplicate messages being pulled during the test. Deduplicate the received messages by message ID to prevent assertion failures on message count, while still acknowledging all received delivery attempts.
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses flakiness in the PubSub integration tests caused by duplicate message deliveries. By introducing a deduplication mechanism based on message IDs, the test now correctly validates the expected number of unique messages received, ensuring stable execution in environments where Pub/Sub may deliver messages more than once. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request updates the Pub/Sub integration test to deduplicate received messages by message_id and handle potential duplicate deliveries. The reviewer suggests acknowledging messages immediately inside the pull loop to prevent redeliveries caused by the 10-second acknowledgment deadline, which also allows for the removal of the redundant post-loop acknowledgment block.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| ack_ids = [] | ||
| deadline = time.time() + 60 # wait up to 60 seconds | ||
| while time.time() < deadline: | ||
| response = self.sub_client.pull( | ||
| request={ | ||
| 'subscription': ordering_sub.name, | ||
| 'max_messages': 10, | ||
| }) | ||
| received_messages.extend(response.received_messages) | ||
| for msg in response.received_messages: | ||
| ack_ids.append(msg.ack_id) | ||
| # Pub/Sub guarantees at-least-once delivery, so we must deduplicate | ||
| # messages by message_id to handle potential duplicate deliveries. | ||
| if msg.message.message_id not in received_message_ids: | ||
| received_message_ids.add(msg.message.message_id) | ||
| received_messages.append(msg) | ||
| if len(received_messages) >= len(test_messages): | ||
| break | ||
| time.sleep(5) |
There was a problem hiding this comment.
Acknowledging received messages immediately inside the pull loop is highly recommended. Since the retry loop can run for up to 60 seconds, which exceeds the default Pub/Sub acknowledgment deadline of 10 seconds, any unacknowledged messages will be redelivered by Pub/Sub. This leads to unnecessary duplicate deliveries and potential test flakiness.
Acknowledging immediately inside the loop avoids these redeliveries and simplifies the code by removing the need to accumulate ack_ids and perform a bulk acknowledgment at the end.
deadline = time.time() + 60 # wait up to 60 seconds
while time.time() < deadline:
response = self.sub_client.pull(
request={
'subscription': ordering_sub.name,
'max_messages': 10,
})
if response.received_messages:
self.sub_client.acknowledge(
request={
'subscription': ordering_sub.name,
'ack_ids': [msg.ack_id for msg in response.received_messages],
})
for msg in response.received_messages:
# Pub/Sub guarantees at-least-once delivery, so we must deduplicate
# messages by message_id to handle potential duplicate deliveries.
if msg.message.message_id not in received_message_ids:
received_message_ids.add(msg.message.message_id)
received_messages.append(msg)
if len(received_messages) >= len(test_messages):
break
time.sleep(5)| if ack_ids: | ||
| self.sub_client.acknowledge( | ||
| request={ | ||
| 'subscription': ordering_sub.name, | ||
| 'ack_ids': ack_ids, | ||
| }) |
|
Assigning reviewers: R: @tvalentyn for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
PubSub guarantees at-least-once delivery, which can result in duplicate messages being pulled during the test. Deduplicate the received messages by message ID to prevent assertion failures on message count, while still acknowledging all received delivery attempts.
Failed test example:
https://github.com/apache/beam/actions/runs/28270530203/job/83766777493?pr=39130
Traceback: