-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(bigtable): add client side metric instrumentation to basic rpcs #16712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8a3af45
cf58c57
00618f8
ead3bf1
af0beba
d0c03d0
01f1739
d40fa0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -63,7 +63,11 @@ | |
| _validate_timeouts, | ||
| _WarmedInstanceKey, | ||
| ) | ||
| from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController | ||
| from google.cloud.bigtable.data._metrics import ( | ||
| BigtableClientSideMetricsController, | ||
| OperationType, | ||
| tracked_retry, | ||
| ) | ||
| from google.cloud.bigtable.data.exceptions import ( | ||
| FailedQueryShardError, | ||
| ShardedReadRowsExceptionGroup, | ||
|
|
@@ -1431,26 +1435,28 @@ async def sample_row_keys( | |
| retryable_excs = _get_retryable_errors(retryable_errors, self) | ||
| predicate = retries.if_exception_type(*retryable_excs) | ||
|
|
||
| sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) | ||
|
|
||
| @CrossSync.convert | ||
| async def execute_rpc(): | ||
| results = await self.client._gapic_client.sample_row_keys( | ||
| request=SampleRowKeysRequest( | ||
| app_profile_id=self.app_profile_id, **self._request_path | ||
| ), | ||
| timeout=next(attempt_timeout_gen), | ||
| retry=None, | ||
| with self._metrics.create_operation( | ||
| OperationType.SAMPLE_ROW_KEYS | ||
| ) as operation_metric: | ||
|
|
||
| @CrossSync.convert | ||
| async def execute_rpc(): | ||
| results = await self.client._gapic_client.sample_row_keys( | ||
| request=SampleRowKeysRequest( | ||
| app_profile_id=self.app_profile_id, **self._request_path | ||
| ), | ||
| timeout=next(attempt_timeout_gen), | ||
| retry=None, | ||
| ) | ||
| return [(s.row_key, s.offset_bytes) async for s in results] | ||
|
|
||
| return await tracked_retry( | ||
| retry_fn=CrossSync.retry_target, | ||
| operation=operation_metric, | ||
| target=execute_rpc, | ||
| predicate=predicate, | ||
| timeout=operation_timeout, | ||
| ) | ||
|
daniel-sanche marked this conversation as resolved.
|
||
| return [(s.row_key, s.offset_bytes) async for s in results] | ||
|
|
||
| return await CrossSync.retry_target( | ||
| execute_rpc, | ||
| predicate, | ||
| sleep_generator, | ||
| operation_timeout, | ||
| exception_factory=_retry_exception_factory, | ||
| ) | ||
|
|
||
| @CrossSync.convert(replace_symbols={"MutationsBatcherAsync": "MutationsBatcher"}) | ||
| def mutations_batcher( | ||
|
|
@@ -1561,28 +1567,29 @@ async def mutate_row( | |
| # mutations should not be retried | ||
| predicate = retries.if_exception_type() | ||
|
|
||
| sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60) | ||
|
|
||
| target = partial( | ||
| self.client._gapic_client.mutate_row, | ||
| request=MutateRowRequest( | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| mutations=[mutation._to_pb() for mutation in mutations_list], | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=attempt_timeout, | ||
| retry=None, | ||
| ) | ||
| return await CrossSync.retry_target( | ||
| target, | ||
| predicate, | ||
| sleep_generator, | ||
| operation_timeout, | ||
| exception_factory=_retry_exception_factory, | ||
| ) | ||
| with self._metrics.create_operation( | ||
| OperationType.MUTATE_ROW | ||
| ) as operation_metric: | ||
| target = partial( | ||
| self.client._gapic_client.mutate_row, | ||
| request=MutateRowRequest( | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| mutations=[mutation._to_pb() for mutation in mutations_list], | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=attempt_timeout, | ||
| retry=None, | ||
| ) | ||
| return await tracked_retry( | ||
| retry_fn=CrossSync.retry_target, | ||
| operation=operation_metric, | ||
| target=target, | ||
| predicate=predicate, | ||
| timeout=operation_timeout, | ||
| ) | ||
|
daniel-sanche marked this conversation as resolved.
|
||
|
|
||
| @CrossSync.convert | ||
| async def bulk_mutate_rows( | ||
|
|
@@ -1693,21 +1700,25 @@ async def check_and_mutate_row( | |
| ): | ||
| false_case_mutations = [false_case_mutations] | ||
| false_case_list = [m._to_pb() for m in false_case_mutations or []] | ||
| result = await self.client._gapic_client.check_and_mutate_row( | ||
| request=CheckAndMutateRowRequest( | ||
| true_mutations=true_case_list, | ||
| false_mutations=false_case_list, | ||
| predicate_filter=predicate._to_pb() if predicate is not None else None, | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=operation_timeout, | ||
| retry=None, | ||
| ) | ||
| return result.predicate_matched | ||
|
|
||
| with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE): | ||
| result = await self.client._gapic_client.check_and_mutate_row( | ||
| request=CheckAndMutateRowRequest( | ||
| true_mutations=true_case_list, | ||
| false_mutations=false_case_list, | ||
| predicate_filter=predicate._to_pb() | ||
| if predicate is not None | ||
| else None, | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=operation_timeout, | ||
| retry=None, | ||
| ) | ||
| return result.predicate_matched | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not wrapped in tracked_retry. Will the attempt level metrics (attempt latencies, server latencies and connectivity error count) still be recorded?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. check_and_mutate shouldn't have retries, right? But yes, a sngle attempt is will be recorded when this completes. The duration/gfe_latency data is captured, and will be exported as those metrics in the follow-up PR |
||
|
|
||
| @CrossSync.convert | ||
| async def read_modify_write_row( | ||
|
|
@@ -1747,20 +1758,22 @@ async def read_modify_write_row( | |
| rules = [rules] | ||
| if not rules: | ||
| raise ValueError("rules must contain at least one item") | ||
| result = await self.client._gapic_client.read_modify_write_row( | ||
| request=ReadModifyWriteRowRequest( | ||
| rules=[rule._to_pb() for rule in rules], | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=operation_timeout, | ||
| retry=None, | ||
| ) | ||
| # construct Row from result | ||
| return Row._from_pb(result.row) | ||
|
|
||
| with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE): | ||
| result = await self.client._gapic_client.read_modify_write_row( | ||
| request=ReadModifyWriteRowRequest( | ||
| rules=[rule._to_pb() for rule in rules], | ||
| row_key=row_key.encode("utf-8") | ||
| if isinstance(row_key, str) | ||
| else row_key, | ||
| app_profile_id=self.app_profile_id, | ||
| **self._request_path, | ||
| ), | ||
| timeout=operation_timeout, | ||
| retry=None, | ||
| ) | ||
| # construct Row from result | ||
| return Row._from_pb(result.row) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same question as check and mutate.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, those are also captured here |
||
|
|
||
| @CrossSync.convert | ||
| async def close(self): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -105,6 +105,7 @@ def _retry_exception_factory( | |
| tuple[Exception, Exception|None]: | ||
| tuple of the exception to raise, and a cause exception if applicable | ||
| """ | ||
| exc_list = exc_list.copy() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we need to copy the exception list now?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is cleaner, we wouldn't expect a factory method like this to modify its input arguments. This creates an isolated reference IIRC I think this only came up in test code though |
||
| if reason == RetryFailureReason.TIMEOUT: | ||
| timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else "" | ||
| # if failed due to timeout, raise deadline exceeded as primary exception | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where did sleep_generator and exception_factory go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracked_retry contains them both. needed a custom versions of sleep_generator to report backoff, and exception_factory to report terminal errors metrics module. See go/bigtable-csm-python