-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Description
Environment
google-cloud-pubsub>= 2.36.0- Python 3.12
- StreamingPull with
await_callbacks_on_shutdown=True - Source:
packages/google-cloud-pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
Description
During stream reconnection (server-initiated UNAVAILABLE), two threads race to execute _shutdown, and both crash:
Thread-RegularStreamShutdown (~line 1053):
File "streaming_pull_manager.py", line 1053, in _shutdown
msg.nack()
AttributeError: 'tuple' object has no attribute 'nack'
Items in the internal message queue are raw tuples (not yet wrapped as Message objects), so calling .nack() fails.
Thread-OnRpcTerminated (~line 1026):
File "streaming_pull_manager.py", line 1026, in _shutdown
assert self._scheduler is not None
AssertionError
The scheduler was already set to None by the other shutdown thread.
Impact
When both shutdown threads crash, pending messages in the internal queue are never nacked. They remain leased until expiry. Each lease expiry burns a delivery attempt against maxDeliveryAttempts, which can cause premature dead-lettering under autoscaling (frequent scale-up/down cycles).
Relation to googleapis/python-pubsub#997 / PR#1244
PR #1244 fixed the same assert self._scheduler is not None race in _on_response, but the identical pattern in _shutdown was not addressed. The tuple/nack issue in _shutdown is a separate bug not covered by that fix.
Suggested fix
- Replace the
assert self._scheduler is not Nonein_shutdownwith aNonecheck + early return (same pattern as PR Adding list clusters request to Bigtable client. #1244). - In the message nack loop, guard with
hasattr(msg, 'nack')orisinstancebefore callingmsg.nack()to handle raw tuples in the queue.
Reproduction
High-throughput StreamingPull subscriber with flow_control.max_messages=50-100 under frequent stream reconnections. The error appears in Python thread exception logs approximately once every few hours.