PYTHON-5668 - Merge backpressure branch into mainline#2729
PYTHON-5668 - Merge backpressure branch into mainline#2729
Conversation
PYTHON-5505 Prototype system overload retry loop for all operations (#2497) All commands that fail with the "Retryable" error label will be retried up to 3 times. When the error includes the "SystemOverloaded" error label we apply exponential backoff with jitter before attempting a retry. PYTHON-5506 Prototype adaptive token bucket retry (#2501) Add adaptive token bucket based retry policy. Successfully completed commands deposit 0.1 token. Failed retry attempts consume 1 token. A retry is only permitted if there is an available token. Token bucket starts full with the maximum 1000 tokens. PYTHON-5505 Use proper RetryableError and SystemOverloadedError labels
…tion rate limiter triggers (#2509) Co-authored-by: Iris <58442094+sleepyStick@users.noreply.github.com> Co-authored-by: Noah Stapp <noah.stapp@mongodb.com> Co-authored-by: Shane Harvey <shnhrv@gmail.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> PYTHON-5629 Increase max overload retries from 3 to 5 and initial delay from 50ms to 100ms (#2599) PYTHON-5517 Simplify pool backpressure behavior (#2611) synchro update network_layer update pool shared update pool shared update run-tests
Co-authored-by: Shane Harvey <shnhrv@gmail.com> Co-authored-by: Steven Silvester <steven.silvester@ieee.org> Co-authored-by: Noah Stapp <noah.stapp@mongodb.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…loop for server overloaded errors (#2635) Co-authored-by: Kevin Albertson <kevin.albertson@mongodb.com> Co-authored-by: Casey Clements <caseyclements@users.noreply.github.com>
…ple retries occur (#2707)
Co-authored-by: Sergey Zelenov <mail@zelenov.su>
There was a problem hiding this comment.
Pull request overview
Merges the backpressure work into mainline by introducing adaptive retry behavior (token bucket + exponential backoff) for server overload conditions, updating handshake metadata, and adding/adjusting spec + integration tests for these behaviors.
Changes:
- Add adaptive retry policy (token bucket + backoff) and apply it to retry loops when
SystemOverloadedErroris encountered. - Include
backpressure: truein handshake commands and adjust SDAM/pool behavior to avoid clearing pools / changing server descriptions on overload-labeled connection failures. - Add new unified/spec tests and prose/integration tests covering backpressure retries across reads, writes, transactions, and getMore.
Reviewed changes
Copilot reviewed 50 out of 52 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tools/synchro.py | Ensures new backpressure test file is included in synchro conversions; improves async sleep translation regex. |
| test/uri_options/client-backpressure-options.json | Adds URI parsing coverage for adaptiveRetries. |
| test/transactions/unified/backpressure-retryable-writes.json | Adds unified transaction tests validating overload-triggered retry behavior for writes. |
| test/transactions/unified/backpressure-retryable-reads.json | Adds unified transaction tests validating overload-triggered retry behavior for reads. |
| test/transactions/unified/backpressure-retryable-commit.json | Adds unified transaction tests validating overload-triggered retry behavior for commit. |
| test/transactions/unified/backpressure-retryable-abort.json | Adds unified transaction tests validating overload-triggered retry behavior for abort. |
| test/test_transactions.py | Updates/extends transaction convenient API tests for timeout/backoff-related behavior. |
| test/test_retryable_writes.py | Adds tests for error propagation across multiple retryable write errors / labels. |
| test/test_pooling.py | Adds integration test asserting pool backpressure does not disrupt existing connections. |
| test/test_discovery_and_monitoring.py | Adds pool backpressure test asserting checkout failures occur without pool-cleared events; adjusts overload labeling in a specific handshake failure path. |
| test/test_client_metadata.py | Asserts handshake documents include backpressure: true. |
| test/test_client_backpressure.py | Adds backpressure prose + unified-format tests for retries/backoff/token-bucket behavior. |
| test/test_client.py | Adds basic client option parsing tests for adaptive_retries / adaptiveRetries. |
| test/discovery_and_monitoring/unified/backpressure-server-description-unchanged-on-min-pool-size-population-error.json | Adds unified SDAM test around minPoolSize population error behavior under backpressure scenarios. |
| test/discovery_and_monitoring/unified/backpressure-network-timeout-fail-single.json | Adds unified SDAM test applying backpressure semantics on connection-establishment network timeouts (single). |
| test/discovery_and_monitoring/unified/backpressure-network-timeout-fail-replicaset.json | Adds unified SDAM test applying backpressure semantics on connection-establishment network timeouts (replicaset). |
| test/discovery_and_monitoring/unified/backpressure-network-error-fail-single.json | Adds unified SDAM test applying backpressure semantics on connection-establishment network errors (single). |
| test/discovery_and_monitoring/unified/backpressure-network-error-fail-replicaset.json | Adds unified SDAM test applying backpressure semantics on connection-establishment network errors (replicaset). |
| test/discovery_and_monitoring/errors/error_handling_handshake.json | Updates expected SDAM outcome for handshake error handling (topology/server type/generation changes). |
| test/connection_monitoring/pool-create-min-size-error.json | Adjusts failpoint mode to alwaysOn for minPoolSize population error test. |
| test/client-backpressure/getMore-retried.json | Adds unified-format tests ensuring getMore retries under overload errors. |
| test/client-backpressure/backpressure-connection-checkin.json | Adds unified-format test ensuring connections are checked back in across overload retry attempts. |
| test/asynchronous/test_transactions.py | Async equivalents of updated/extended convenient transaction API timeout/backoff tests. |
| test/asynchronous/test_retryable_writes.py | Async equivalents for multi-error propagation retryable writes tests. |
| test/asynchronous/test_pooling.py | Async equivalent of pool backpressure “preserve existing connections” test. |
| test/asynchronous/test_discovery_and_monitoring.py | Async equivalent of pool backpressure test and overload label adjustment. |
| test/asynchronous/test_client_metadata.py | Async equivalent of handshake backpressure: true assertion. |
| test/asynchronous/test_client_backpressure.py | Async equivalents of backpressure prose + unified-format tests. |
| test/asynchronous/test_client.py | Async equivalent of adaptive_retries option parsing test. |
| pymongo/synchronous/topology.py | Avoids SDAM server description/pool reset behavior for overload-labeled connection failures. |
| pymongo/synchronous/pool.py | Adds handshake backpressure: true and synthesizes overload labels for certain connection establishment failures. |
| pymongo/synchronous/mongo_client.py | Introduces retry policy usage in retry loop; extends retry internals with runCommand/aggregate-write flags; updates command/drop paths to reuse retry machinery. |
| pymongo/synchronous/helpers.py | Adds token bucket + retry policy helpers for adaptive retries and backoff calculations. |
| pymongo/synchronous/database.py | Routes Database.command and cursor commands through retryable read machinery (incl. runCommand flag). |
| pymongo/synchronous/collection.py | Routes various write helper methods through retryable write machinery; marks aggregate-writes for retry gating. |
| pymongo/synchronous/client_session.py | Adds transaction retry backoff and CSOT-aware timeout error conversion in convenient API. |
| pymongo/synchronous/client_bulk.py | Treats overload-labeled errors as retryable in client bulk write execution. |
| pymongo/monitoring.py | Adds a new connection closed reason for “pool backoff”. |
| pymongo/logger.py | Adds logging message/reason mapping for pool backoff. |
| pymongo/common.py | Introduces ADAPTIVE_RETRIES default and registers adaptiveRetries / adaptive_retries options. |
| pymongo/client_options.py | Plumbs adaptive_retries option into ClientOptions with a new property. |
| pymongo/asynchronous/topology.py | Async equivalent of overload-labeled connection failure SDAM behavior change. |
| pymongo/asynchronous/pool.py | Async equivalent of handshake backpressure: true and overload label synthesis in connection establishment. |
| pymongo/asynchronous/mongo_client.py | Async equivalent of retry policy integration and runCommand/aggregate-write gating. |
| pymongo/asynchronous/helpers.py | Async equivalent of token bucket + retry policy helpers. |
| pymongo/asynchronous/database.py | Async equivalent of routing Database.command/cursor commands through retryable read machinery. |
| pymongo/asynchronous/collection.py | Async equivalent of routing various write helper methods through retryable write machinery; aggregate-write marking. |
| pymongo/asynchronous/client_session.py | Async equivalent of convenient transaction API backoff + timeout conversion. |
| pymongo/asynchronous/client_bulk.py | Async equivalent of treating overload-labeled errors as retryable in client bulk write execution. |
| .evergreen/resync-specs.sh | Adds spec resync support for the client-backpressure spec tests. |
| # Mock a session establishment overload. | ||
| mock_connection_fail = { | ||
| "configureFailPoint": "failCommand", | ||
| "mode": {"times": 1}, | ||
| "data": { | ||
| "closeConnection": True, | ||
| }, | ||
| } | ||
|
|
||
| async with self.fail_point(mock_connection_fail): | ||
| await coll.find_one({}) | ||
|
|
There was a problem hiding this comment.
This failCommand configuration only sets closeConnection: True but omits failCommands (and does not scope by appName), which is inconsistent with other failCommand usages and may make configureFailPoint fail or apply too broadly. Include an explicit failCommands list (e.g., find) and scope it to the client via appName so the test is deterministic.
| def adaptive_retries(self) -> bool: | ||
| """The configured adaptiveRetries option. | ||
|
|
||
| .. versionadded:: 4.XX |
There was a problem hiding this comment.
The adaptive_retries option docstring uses a placeholder version (".. versionadded:: 4.XX"). This should be replaced with the actual version number used elsewhere in the PR (e.g., the MongoClient docstring mentions 4.17) to avoid publishing incomplete docs.
| .. versionadded:: 4.XX | |
| .. versionadded:: 4.17 |
| if self._is_run_command and not ( | ||
| self._client.options.retry_reads and self._client.options.retry_writes | ||
| ): | ||
| raise |
There was a problem hiding this comment.
The overload-retry gating for is_run_command currently requires both retry_reads and retry_writes to be enabled. Since Database.command() is executed through the read retry path, disabling retryWrites would unexpectedly disable overload/backpressure retries for read-only commands issued via Database.command(). Consider gating based on the operation type (read vs write), e.g. require only retry_reads here for read-path runCommand operations.
| if self._is_run_command and not ( | |
| self._client.options.retry_reads and self._client.options.retry_writes | |
| ): | |
| raise | |
| if self._is_run_command: | |
| if self._is_read and not self._client.options.retry_reads: | |
| raise | |
| if not self._is_read and not self._client.options.retry_writes: | |
| raise |
| if self._is_run_command and not ( | ||
| self._client.options.retry_reads and self._client.options.retry_writes | ||
| ): | ||
| raise |
There was a problem hiding this comment.
The overload-retry gating for is_run_command currently requires both retry_reads and retry_writes to be enabled. Since Database.command() is executed through the read retry path, disabling retryWrites would unexpectedly disable overload/backpressure retries for read-only commands issued via Database.command(). Consider gating based on the operation type (read vs write), e.g. require only retry_reads here for read-path runCommand operations.
| if self._is_run_command and not ( | |
| self._client.options.retry_reads and self._client.options.retry_writes | |
| ): | |
| raise | |
| if self._is_run_command: | |
| if self._is_read and not self._client.options.retry_reads: | |
| raise | |
| if not self._is_read and not self._client.options.retry_writes: | |
| raise |
| @client_context.require_failCommand_appName | ||
| def test_retry_overload_error_insert_one(self): | ||
| self.db.t.insert_one({"x": 1}) | ||
|
|
||
| # Ensure command is retried on overload error. | ||
| fail_many = mock_overload_error.copy() | ||
| fail_many["mode"] = {"times": _MAX_RETRIES} | ||
| with self.fail_point(fail_many): | ||
| self.db.t.find_one() | ||
|
|
||
| # Ensure command stops retrying after _MAX_RETRIES. | ||
| fail_too_many = mock_overload_error.copy() | ||
| fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} | ||
| with self.fail_point(fail_too_many): | ||
| with self.assertRaises(PyMongoError) as error: | ||
| self.db.t.find_one() | ||
|
|
There was a problem hiding this comment.
test_retry_overload_error_insert_one is supposed to exercise insert-one retry behavior, but it currently calls find_one() for both the success and failure paths. This makes the test duplicate test_retry_overload_error_find and leaves insert_one overload retry behavior untested. Update the test to run insert_one(...) under the fail point (and in the MAX_RETRIES+1 case) so it actually validates insert retries.
| await self.db.t.find_one() | ||
|
|
||
| # Ensure command stops retrying after _MAX_RETRIES. | ||
| fail_too_many = mock_overload_error.copy() | ||
| fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} | ||
| async with self.fail_point(fail_too_many): | ||
| with self.assertRaises(PyMongoError) as error: | ||
| await self.db.t.find_one() |
There was a problem hiding this comment.
test_retry_overload_error_insert_one is intended to validate retries for insert_one, but it currently calls find_one() in both the success and failure paths, duplicating the previous find test and not covering insert retries. Update the test to call insert_one(...) under the fail point (including the _MAX_RETRIES + 1 case).
| await self.db.t.find_one() | |
| # Ensure command stops retrying after _MAX_RETRIES. | |
| fail_too_many = mock_overload_error.copy() | |
| fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} | |
| async with self.fail_point(fail_too_many): | |
| with self.assertRaises(PyMongoError) as error: | |
| await self.db.t.find_one() | |
| await self.db.t.insert_one({"x": 2}) | |
| # Ensure command stops retrying after _MAX_RETRIES. | |
| fail_too_many = mock_overload_error.copy() | |
| fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} | |
| async with self.fail_point(fail_too_many): | |
| with self.assertRaises(PyMongoError) as error: | |
| await self.db.t.insert_one({"x": 3}) |
There was a problem hiding this comment.
This seems like a legitmate issue.
There was a problem hiding this comment.
Yup, good job 🤖 !
| # Mock a session establishment overload. | ||
| mock_connection_fail = { | ||
| "configureFailPoint": "failCommand", | ||
| "mode": {"times": 1}, | ||
| "data": { | ||
| "closeConnection": True, | ||
| }, | ||
| } | ||
|
|
||
| with self.fail_point(mock_connection_fail): | ||
| coll.find_one({}) |
There was a problem hiding this comment.
The failCommand configuration used here only sets closeConnection: True but does not specify failCommands (and does not scope by appName), which is inconsistent with other failCommand usages in this repo and may cause the configureFailPoint command to fail or apply too broadly. Include an explicit failCommands list (e.g., find) and scope it to the client via appName to keep the test deterministic.
ShaneHarvey
left a comment
There was a problem hiding this comment.
@NoahStapp anything you'd like to call out in the merge?
pymongo/asynchronous/client_bulk.py
Outdated
| and "errorLabels" in error.details | ||
| and isinstance(error.details["errorLabels"], list) | ||
| and "RetryableError" in error.details["errorLabels"] | ||
| and "SystemOverloadedError" in error.details["errorLabels"] |
There was a problem hiding this comment.
Isn't "RetryableError" enough to say it's retryable? "SystemOverloadedError" only indicates that we backoff before the retry.
There was a problem hiding this comment.
Good catch: the spec doesn't include this as a requirement, and the tests pass with the "SystemOverloadedError" check removed.
| await self.db.t.find_one() | ||
|
|
||
| # Ensure command stops retrying after _MAX_RETRIES. | ||
| fail_too_many = mock_overload_error.copy() | ||
| fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} | ||
| async with self.fail_point(fail_too_many): | ||
| with self.assertRaises(PyMongoError) as error: | ||
| await self.db.t.find_one() |
There was a problem hiding this comment.
This seems like a legitmate issue.
The main thing I'd like eyes on is a sanity check that none of the non-test changes appear obviously incorrect. I'm doing a pass today to ensure that all of the Python backpressure work is present in this PR. |
| # If we need to apply backpressure to the first command, | ||
| # we will need to revert back to starting state. | ||
| if self._session is not None and self._session.in_transaction: | ||
| self._session._transaction.has_completed_command = True |
There was a problem hiding this comment.
There's a subtle behavior change here. For example:
coll.insert_one({"_id": 1})
with client.start_session() as s, s.start_transaction():
try:
coll.insert_one({"_id": 1}, session=s)
except <DuplicateKeyError>:
pass
# An overload error on this find will incorrectly(?) cause the transaction to be restarted.
coll.find_one(session=s)Is that intentional?
There was a problem hiding this comment.
We only restart the transaction if transaction.has_completed_command is False. The first insert_one within the transaction block should set transaction.has_completed_command to True and make the overload error find_one retry itself only. Unless you're saying there's a bug in that logic?
There was a problem hiding this comment.
Wouldn't an exception being raised on the first operation cause the has_completed_command=True line to be skipped?
| """The pool was closed, making the connection no longer valid.""" | ||
|
|
||
| POOL_BACKOFF = "poolBackoff" | ||
| """The pool is in backoff mode.""" |
There was a problem hiding this comment.
This can be removed right?
| _VERBOSE_CONNECTION_ERROR_REASONS = { | ||
| ConnectionClosedReason.POOL_CLOSED: "Connection pool was closed", | ||
| ConnectionCheckOutFailedReason.POOL_CLOSED: "Connection pool was closed", | ||
| ConnectionClosedReason.POOL_BACKOFF: "Connection pool is in backoff", |
There was a problem hiding this comment.
This can be removed right?
| "servermonitoringmode", common.SERVER_MONITORING_MODE | ||
| ) | ||
| self.__adaptive_retries = ( | ||
| options.get("adaptive_retries", common.ADAPTIVE_RETRIES) |
There was a problem hiding this comment.
Why do we accept both underscore and camelcase here? The norm for spec uri options is to only accept camelCase, like retryWrites, serverMonitoringMode, etc...
PYTHON-5668
Changes in this PR
Merge backpressure branch into master.
Test Plan
Checklist
Checklist for Author
Checklist for Reviewer