Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/bigtable/data/_async/metrics_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ async def intercept_unary_stream(
- SampleRowKeys
"""
try:
for k,v in operation.routing_cookie.items():
client_call_details.metadata.add(k, v)
Comment on lines +133 to +134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The client_call_details object is immutable (it's a namedtuple). While metadata might be a mutable object in the async version, you should not modify it in place. The idiomatic way to add metadata in an interceptor is to create a new ClientCallDetails object with the updated metadata.

Furthermore, the add method does not exist on the metadata list in the synchronous version of the client, which will cause an AttributeError. The synchronous metadata is a list of tuples.

To fix this, you should create a copy of the metadata, append the new headers, and then create a new ClientCallDetails object using _replace. This new object will then be used in the continuation call.

Suggested change
for k,v in operation.routing_cookie.items():
client_call_details.metadata.add(k, v)
metadata = list(client_call_details.metadata or []) + list(operation.routing_cookie.items())
client_call_details = client_call_details._replace(metadata=metadata)

return self._streaming_generator_wrapper(
operation, await continuation(client_call_details, request)
)
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigtable/data/_metrics/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class ActiveOperationMetric:
first_response_latency_ns: int | None = None
# time waiting on flow control, in nanoseconds
flow_throttling_time_ns: int = 0
# routing cookie, passed in between retry attempts
routing_cookie: dict[str, str|bytes] = field(default_factory=dict)

_active_operation_context: ClassVar[
contextvars.ContextVar[ActiveOperationMetric]
Expand Down
7 changes: 6 additions & 1 deletion google/cloud/bigtable/data/_metrics/tracked_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ def wrapper(exc: Exception) -> None:
metadata = list(rpc_error.trailing_metadata()) + list(
rpc_error.initial_metadata()
)
operation.add_response_metadata({k: v for k, v in metadata})
metadata_dict = {k: v for k, v in metadata}
operation.add_response_metadata(metadata_dict)
# check for routing cookie:
cookie_headers = {k:v for k,v in metadata_dict.items() if k.startswith("x-goog-cbt-cookie")}
if cookie_headers:
operation.routing_cookie = cookie_headers
Comment on lines +61 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic for processing metadata and extracting the routing cookie is also needed in _track_terminal_error. To avoid code duplication and ensure consistency, consider extracting this block into a helper function that can be called from both _track_retryable_error and _track_terminal_error.

except Exception:
# ignore errors in metadata collection
pass
Expand Down
Loading