Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,32 @@ catalog:
| snapshot-loading-mode | refs | The snapshots to return in the body of the metadata. Setting the value to `all` would return the full set of snapshots currently valid for the table. Setting the value to `refs` would load all snapshots referenced by branches or tags. |
| `header.X-Iceberg-Access-Delegation` | `vended-credentials` | Signal to the server that the client supports delegated access via a comma-separated list of access mechanisms. The server may choose to supply access via any or none of the requested mechanisms. When using `vended-credentials`, the server provides temporary credentials to the client. When using `remote-signing`, the server signs requests on behalf of the client. (default: `vended-credentials`) |

#### Retry and timeout

The REST Catalog uses `requests` with no retries and no timeout by default, so transient
5xx / network failures bubble up immediately and slow servers can hang the client indefinitely.
Set a `connection:` block on the catalog to opt in to a per-request timeout and a retry policy.
Every key is optional; when none are set, the default `requests` behavior is preserved.

```yaml
catalog:
default:
uri: http://rest-catalog/ws/
connection:
timeout: 60 # seconds, applied to every HTTP call
retries: 5 # number of retry attempts on transient failures
backoff-factor: 1.0 # exponential backoff between retries
```

| Key | Example | Description |
| ---------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------ |
| connection.timeout | 60 | Per-request timeout in seconds. Must be a positive number. |
| connection.retries | 5 | Number of retry attempts for transient failures. Must be non-negative. |
| connection.backoff-factor | 1.0 | Backoff factor between retry attempts. Must be non-negative. See [`urllib3` Retry docs](https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry) for the formula. |

Retries are applied to idempotent methods only (`GET`, `HEAD`, `OPTIONS`) and to the
transient HTTP status codes `429`, `500`, `502`, `503`, `504`. Other failures are not retried.

#### Headers in REST Catalog

To configure custom headers in REST Catalog, include them in the catalog properties with `header.<Header-Name>`. This
Expand Down
104 changes: 101 additions & 3 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

from collections import deque
from collections.abc import Mapping
from enum import Enum
from typing import (
TYPE_CHECKING,
Expand All @@ -25,9 +26,11 @@
from urllib.parse import quote, unquote

from pydantic import ConfigDict, Field, TypeAdapter, field_validator
from requests import HTTPError, Session
from requests import HTTPError, PreparedRequest, Response, Session
from requests.adapters import HTTPAdapter
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt
from typing_extensions import override
from urllib3.util.retry import Retry

from pyiceberg import __version__
from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
Expand Down Expand Up @@ -255,6 +258,14 @@ class ScanPlanningMode(Enum):
SIGV4_SERVICE = "rest.signing-name"
SIGV4_MAX_RETRIES = "rest.sigv4.max-retries"
SIGV4_MAX_RETRIES_DEFAULT = 10
CONNECTION = "connection"
CONNECTION_TIMEOUT = "timeout"
CONNECTION_RETRIES = "retries"
CONNECTION_BACKOFF_FACTOR = "backoff-factor"
# Hard-coded internally so users cannot misconfigure the retry policy
# (e.g. setting raise_on_status=False would swallow 4xx errors silently).
_CONNECTION_RETRY_STATUS_FORCELIST = (429, 500, 502, 503, 504)
_CONNECTION_RETRY_ALLOWED_METHODS = frozenset({"GET", "HEAD", "OPTIONS"})
EMPTY_BODY_SHA256: str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
OAUTH2_SERVER_URI = "oauth2-server-uri"
SNAPSHOT_LOADING_MODE = "snapshot-loading-mode"
Expand Down Expand Up @@ -392,6 +403,89 @@ class ListViewsResponse(IcebergBaseModel):
_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)


class _RetryTimeoutHTTPAdapter(HTTPAdapter):
"""HTTPAdapter that applies a default per-request timeout.

requests does not provide a way to set a default timeout on a Session;
without this adapter, every call would have to thread `timeout=` through.
The adapter applies `self._timeout` whenever a per-call timeout is not set.
"""

def __init__(self, timeout: float | None = None, max_retries: Retry | int | None = None) -> None:
self._timeout = timeout
if max_retries is not None:
super().__init__(max_retries=max_retries)
else:
super().__init__()

def send(
self,
request: PreparedRequest,
stream: bool = False,
timeout: None | float | tuple[float, float] | tuple[float, None] = None,
verify: bool | str = True,
cert: None | bytes | str | tuple[bytes | str, bytes | str] = None,
proxies: Mapping[str, str] | None = None,
) -> Response:
if timeout is None:
timeout = self._timeout
return super().send(request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies)


def _create_connection_adapter(properties: Properties) -> _RetryTimeoutHTTPAdapter | None:
"""Build a connection adapter from the optional `connection.*` properties.

Returns None when no `connection` block is supplied, leaving the default
Session behavior unchanged. Raises ValueError on invalid input.
"""
connection_config = properties.get(CONNECTION)
if not connection_config:
return None
if not isinstance(connection_config, dict):
raise ValueError(f"`{CONNECTION}` must be a mapping, got: {type(connection_config).__name__}")

timeout: float | None = None
if (raw_timeout := connection_config.get(CONNECTION_TIMEOUT)) is not None:
try:
timeout = float(raw_timeout)
except (TypeError, ValueError) as e:
raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a number, got: {raw_timeout!r}") from e
if timeout <= 0:
raise ValueError(f"`{CONNECTION}.{CONNECTION_TIMEOUT}` must be a positive number, got: {timeout}")

retries: int | None = None
if (raw_retries := connection_config.get(CONNECTION_RETRIES)) is not None:
try:
retries = int(raw_retries)
except (TypeError, ValueError) as e:
raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRIES}` must be an integer, got: {raw_retries!r}") from e
if retries < 0:
raise ValueError(f"`{CONNECTION}.{CONNECTION_RETRIES}` must be non-negative, got: {retries}")

backoff_factor: float | None = None
if (raw_backoff := connection_config.get(CONNECTION_BACKOFF_FACTOR)) is not None:
try:
backoff_factor = float(raw_backoff)
except (TypeError, ValueError) as e:
raise ValueError(f"`{CONNECTION}.{CONNECTION_BACKOFF_FACTOR}` must be a number, got: {raw_backoff!r}") from e
if backoff_factor < 0:
raise ValueError(f"`{CONNECTION}.{CONNECTION_BACKOFF_FACTOR}` must be non-negative, got: {backoff_factor}")

max_retries: Retry | None = None
if retries is not None or backoff_factor is not None:
max_retries = Retry(
total=retries if retries is not None else 0,
backoff_factor=backoff_factor if backoff_factor is not None else 0,
status_forcelist=list(_CONNECTION_RETRY_STATUS_FORCELIST),
allowed_methods=_CONNECTION_RETRY_ALLOWED_METHODS,
)

if timeout is None and max_retries is None:
return None

return _RetryTimeoutHTTPAdapter(timeout=timeout, max_retries=max_retries)


class RestCatalog(Catalog):
uri: str
_session: Session
Expand All @@ -418,6 +512,12 @@ def _create_session(self) -> Session:
"""Create a request session with provided catalog configuration."""
session = Session()

# Mount the retry/timeout adapter when `connection.*` properties are set.
# SigV4's adapter mounted below at `self.uri` is a longer prefix and still wins for that host.
if (connection_adapter := _create_connection_adapter(self.properties)) is not None:
session.mount("http://", connection_adapter)
session.mount("https://", connection_adapter)

# Set HTTP headers
self._config_headers(session)

Expand Down Expand Up @@ -763,8 +863,6 @@ def _init_sigv4(self, session: Session) -> None:
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from requests import PreparedRequest
from requests.adapters import HTTPAdapter

class SigV4Adapter(HTTPAdapter):
def __init__(self, **properties: str):
Expand Down
130 changes: 130 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import pyiceberg
from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog
from pyiceberg.catalog.rest import (
CONNECTION,
CONNECTION_BACKOFF_FACTOR,
CONNECTION_RETRIES,
CONNECTION_TIMEOUT,
DEFAULT_ENDPOINTS,
EMPTY_BODY_SHA256,
OAUTH2_SERVER_URI,
Expand All @@ -43,6 +47,7 @@
HttpMethod,
RestCatalog,
ScanPlanningMode,
_RetryTimeoutHTTPAdapter,
)
from pyiceberg.exceptions import (
AuthorizationExpiredError,
Expand Down Expand Up @@ -2019,6 +2024,131 @@ def test_request_session_with_ssl_client_cert() -> None:
assert "Could not find the TLS certificate file, invalid path: path_to_client_cert" in str(e.value)


def test_session_without_connection_config_uses_default_adapter(rest_mock: Mocker) -> None:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get a test where we set the retry logic and then see the retries occur? We should be able to simulate this with mock HTTP calls and then see that X number of HTTP calls were made afterwards.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test_session_retries_on_transient_5xx_then_succeeds in 6fb87ff. requests_mock actually replaces the HTTPAdapter on the session, which bypasses our retry logic, so the test instead stands up a real http.server on a loopback port. The handler returns three 503s followed by a 200, and the test asserts both that list_namespaces succeeds and that the handler saw 4 requests.

catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
for adapter in catalog._session.adapters.values():
assert not isinstance(adapter, _RetryTimeoutHTTPAdapter)


def test_session_with_connection_timeout_and_retries(rest_mock: Mocker) -> None:
catalog_properties = {
"uri": TEST_URI,
"token": TEST_TOKEN,
CONNECTION: {
CONNECTION_TIMEOUT: 60,
CONNECTION_RETRIES: 5,
CONNECTION_BACKOFF_FACTOR: 1.0,
},
}
catalog = RestCatalog("rest", **catalog_properties) # type: ignore

https_adapter = catalog._session.adapters["https://"]
http_adapter = catalog._session.adapters["http://"]
assert isinstance(https_adapter, _RetryTimeoutHTTPAdapter)
assert https_adapter is http_adapter
assert https_adapter._timeout == 60.0
assert https_adapter.max_retries.total == 5
assert https_adapter.max_retries.backoff_factor == 1.0
# Internal retry policy: transient codes and idempotent methods only.
assert https_adapter.max_retries.status_forcelist == [429, 500, 502, 503, 504]
allowed_methods = https_adapter.max_retries.allowed_methods or frozenset()
assert set(allowed_methods) == {"GET", "HEAD", "OPTIONS"}


def test_session_with_connection_timeout_only(rest_mock: Mocker) -> None:
catalog_properties = {
"uri": TEST_URI,
"token": TEST_TOKEN,
CONNECTION: {CONNECTION_TIMEOUT: "30"},
}
catalog = RestCatalog("rest", **catalog_properties) # type: ignore
adapter = catalog._session.adapters["https://"]
assert isinstance(adapter, _RetryTimeoutHTTPAdapter)
assert adapter._timeout == 30.0
# No retry options set, so no Retry object is configured.
assert adapter.max_retries.total == 0


def test_session_retries_on_transient_5xx_then_succeeds() -> None:
"""Three real 503 responses followed by a 200; the catalog should make all four attempts.

`requests_mock` would replace our HTTPAdapter, bypassing the retry logic we want to exercise,
so this test stands up an actual `http.server` on a loopback port instead.
"""
import json
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer

state = {"namespace_calls": 0}
config_body = json.dumps(
{"defaults": {}, "overrides": {}, "endpoints": [str(endpoint) for endpoint in TEST_SUPPORTED_ENDPOINTS]}
).encode()

class _Handler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
if self.path.endswith("/v1/config"):
self._respond(200, config_body)
elif self.path.endswith("/v1/namespaces"):
state["namespace_calls"] += 1
if state["namespace_calls"] <= 3:
self._respond(503, b"")
else:
self._respond(200, json.dumps({"namespaces": [["foo"]]}).encode())
else:
self._respond(404, b"")

def _respond(self, status: int, body: bytes) -> None:
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
if body:
self.wfile.write(body)

def log_message(self, format: str, *args: Any) -> None: # silence default access logs
pass

server = HTTPServer(("127.0.0.1", 0), _Handler)
port = server.server_address[1]
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
try:
catalog = RestCatalog(
"rest",
**{ # type: ignore
"uri": f"http://127.0.0.1:{port}/",
"token": TEST_TOKEN,
# backoff-factor=0 keeps the test fast; retries=3 covers three 503s + the eventual 200.
CONNECTION: {CONNECTION_RETRIES: 3, CONNECTION_BACKOFF_FACTOR: 0},
},
)
assert catalog.list_namespaces() == [("foo",)]
assert state["namespace_calls"] == 4
finally:
server.shutdown()
server.server_close()


def test_session_with_invalid_connection_timeout_raises(rest_mock: Mocker) -> None:
catalog_properties = {
"uri": TEST_URI,
"token": TEST_TOKEN,
CONNECTION: {CONNECTION_TIMEOUT: -1},
}
with pytest.raises(ValueError, match="`connection.timeout` must be a positive number"):
RestCatalog("rest", **catalog_properties) # type: ignore


def test_session_with_invalid_connection_retries_raises(rest_mock: Mocker) -> None:
catalog_properties = {
"uri": TEST_URI,
"token": TEST_TOKEN,
CONNECTION: {CONNECTION_RETRIES: -1},
}
with pytest.raises(ValueError, match="`connection.retries` must be non-negative"):
RestCatalog("rest", **catalog_properties) # type: ignore


def test_rest_catalog_with_basic_auth_type(rest_mock: Mocker) -> None:
# Given
rest_mock.get(
Expand Down