From 89dabcfd2698eadf7256329fba4604cd1eea4f75 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Sun, 5 Apr 2026 18:31:55 +0300 Subject: [PATCH] perf: lazy-init _callbacks/_errbacks in ResponseFuture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Defer list allocation for _callbacks and _errbacks from __init__ to first use in add_callback()/add_errback(). On the synchronous execute path (session.execute()), no callbacks are registered, so both lists are never allocated — saving 112 bytes per request. All access is under _callback_lock; _set_final_result and _set_final_exception use 'or ()' guard to iterate safely when None. Benchmark: 2.2x faster init (0.06 -> 0.03 us), 112 bytes saved/request. --- benchmarks/bench_lazy_init_callbacks.py | 79 +++++++++++++++ cassandra/cluster.py | 16 +-- tests/unit/test_lazy_init_callbacks.py | 128 ++++++++++++++++++++++++ 3 files changed, 217 insertions(+), 6 deletions(-) create mode 100644 benchmarks/bench_lazy_init_callbacks.py create mode 100644 tests/unit/test_lazy_init_callbacks.py diff --git a/benchmarks/bench_lazy_init_callbacks.py b/benchmarks/bench_lazy_init_callbacks.py new file mode 100644 index 0000000000..53bdd6412a --- /dev/null +++ b/benchmarks/bench_lazy_init_callbacks.py @@ -0,0 +1,79 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Micro-benchmark: lazy initialization of _callbacks/_errbacks. + +Measures the allocation savings from deferring list creation in +ResponseFuture.__init__() for the common case where no callbacks +are registered (synchronous execute path). + +Run: + python benchmarks/bench_lazy_init_callbacks.py +""" +import timeit +import sys + + +def bench_lazy_init(): + """Compare allocation cost of [] vs None initialization.""" + n = 1_000_000 + + # Simulate the __init__ allocation pattern + def init_with_lists(): + callbacks = [] + errbacks = [] + return callbacks, errbacks + + def init_with_none(): + callbacks = None + errbacks = None + return callbacks, errbacks + + t_lists = timeit.timeit(init_with_lists, number=n) + t_none = timeit.timeit(init_with_none, number=n) + + print(f"Init with [] x2 ({n} iters): {t_lists / n * 1e9:.1f} ns/call") + print(f"Init with None x2 ({n} iters): {t_none / n * 1e9:.1f} ns/call") + print(f"Speedup: {t_lists / t_none:.1f}x") + print(f"Memory per empty list: {sys.getsizeof([])} bytes") + print(f"Saved per request (no callbacks): {sys.getsizeof([]) * 2} bytes") + + # Benchmark the happy path: _set_final_result with no callbacks + # This is the hot path - iterating None vs empty list + def iter_empty_list(): + callbacks = [] + for fn, args, kwargs in callbacks: + pass + + def iter_none_with_guard(): + callbacks = None + for fn, args, kwargs in callbacks or (): + pass + + t_list_iter = timeit.timeit(iter_empty_list, number=n) + t_none_iter = timeit.timeit(iter_none_with_guard, number=n) + + print(f"\nHappy-path iteration (no callbacks):") + print(f" Iterate empty []: {t_list_iter / n * 1e9:.1f} ns/call") + print(f" Guard None or (): {t_none_iter / n * 1e9:.1f} ns/call") + print(f" Speedup: {t_list_iter / t_none_iter:.2f}x") + + +def main(): + bench_lazy_init() + + +if __name__ == '__main__': + main() diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 9eace8810d..9c53045f36 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -4463,8 +4463,8 @@ def __init__(self, session, message, query, timeout, metrics=None, prepared_stat self._make_query_plan() self._event = Event() self._errors = {} - self._callbacks = [] - self._errbacks = [] + self._callbacks = None + self._errbacks = None self.attempted_hosts = [] self._start_timer() self._continuous_paging_state = continuous_paging_state @@ -4969,7 +4969,7 @@ def _set_final_result(self, response): # registered callback to_call = tuple( partial(fn, response, *args, **kwargs) - for (fn, args, kwargs) in self._callbacks + for (fn, args, kwargs) in self._callbacks or () ) self._event.set() @@ -4991,7 +4991,7 @@ def _set_final_exception(self, response): # registered errback to_call = tuple( partial(fn, response, *args, **kwargs) - for (fn, args, kwargs) in self._errbacks + for (fn, args, kwargs) in self._errbacks or () ) self._event.set() @@ -5167,6 +5167,8 @@ def add_callback(self, fn, *args, **kwargs): # Always add fn to self._callbacks, even when we're about to # execute it, to prevent races with functions like # start_fetching_next_page that reset _final_result + if self._callbacks is None: + self._callbacks = [] self._callbacks.append((fn, args, kwargs)) if self._final_result is not _NOT_SET: run_now = True @@ -5185,6 +5187,8 @@ def add_errback(self, fn, *args, **kwargs): # Always add fn to self._errbacks, even when we're about to execute # it, to prevent races with functions like start_fetching_next_page # that reset _final_exception + if self._errbacks is None: + self._errbacks = [] self._errbacks.append((fn, args, kwargs)) if self._final_exception: run_now = True @@ -5222,8 +5226,8 @@ def add_callbacks(self, callback, errback, def clear_callbacks(self): with self._callback_lock: - self._callbacks = [] - self._errbacks = [] + self._callbacks = None + self._errbacks = None def __str__(self): result = "(no result yet)" if self._final_result is _NOT_SET else self._final_result diff --git a/tests/unit/test_lazy_init_callbacks.py b/tests/unit/test_lazy_init_callbacks.py new file mode 100644 index 0000000000..94da07948a --- /dev/null +++ b/tests/unit/test_lazy_init_callbacks.py @@ -0,0 +1,128 @@ +""" +Unit tests for lazy initialization of _callbacks/_errbacks in ResponseFuture. +""" +import unittest +from unittest.mock import Mock, patch, PropertyMock +from threading import Lock, Event + +from cassandra.cluster import ResponseFuture, _NOT_SET +from cassandra.query import SimpleStatement +from cassandra.policies import RetryPolicy + + +def make_response_future(): + """Create a minimal ResponseFuture for testing.""" + session = Mock() + session.cluster._default_load_balancing_policy = Mock() + session.cluster._default_load_balancing_policy.make_query_plan.return_value = iter([]) + session.row_factory = Mock() + session.cluster.connect_timeout = 5 + session._create_clock.return_value = None + + message = Mock() + query = SimpleStatement("SELECT 1") + return ResponseFuture(session, message, query, timeout=10.0, + retry_policy=RetryPolicy()) + + +class TestLazyInitCallbacks(unittest.TestCase): + + def test_callbacks_initially_none(self): + """_callbacks and _errbacks should be None after __init__.""" + rf = make_response_future() + self.assertIsNone(rf._callbacks) + self.assertIsNone(rf._errbacks) + + def test_add_callback_lazy_inits(self): + """add_callback should create the list on first use.""" + rf = make_response_future() + self.assertIsNone(rf._callbacks) + rf.add_callback(lambda result: None) + self.assertIsNotNone(rf._callbacks) + self.assertEqual(len(rf._callbacks), 1) + + def test_add_errback_lazy_inits(self): + """add_errback should create the list on first use.""" + rf = make_response_future() + self.assertIsNone(rf._errbacks) + rf.add_errback(lambda exc: None) + self.assertIsNotNone(rf._errbacks) + self.assertEqual(len(rf._errbacks), 1) + + def test_set_final_result_no_callbacks(self): + """_set_final_result should work when _callbacks is None.""" + rf = make_response_future() + self.assertIsNone(rf._callbacks) + # Should not raise + rf._set_final_result("some result") + self.assertEqual(rf._final_result, "some result") + self.assertTrue(rf._event.is_set()) + + def test_set_final_exception_no_errbacks(self): + """_set_final_exception should work when _errbacks is None.""" + rf = make_response_future() + self.assertIsNone(rf._errbacks) + exc = Exception("test error") + # Should not raise + rf._set_final_exception(exc) + self.assertIs(rf._final_exception, exc) + self.assertTrue(rf._event.is_set()) + + def test_set_final_result_with_callbacks(self): + """_set_final_result should invoke registered callbacks.""" + rf = make_response_future() + results = [] + rf.add_callback(lambda result: results.append(result)) + rf._set_final_result("data") + self.assertEqual(results, ["data"]) + + def test_set_final_exception_with_errbacks(self): + """_set_final_exception should invoke registered errbacks.""" + rf = make_response_future() + errors = [] + rf.add_errback(lambda exc: errors.append(exc)) + exc = Exception("fail") + rf._set_final_exception(exc) + self.assertEqual(errors, [exc]) + + def test_multiple_callbacks(self): + """Multiple callbacks should all be invoked.""" + rf = make_response_future() + r1, r2 = [], [] + rf.add_callback(lambda result: r1.append(result)) + rf.add_callback(lambda result: r2.append(result)) + rf._set_final_result("ok") + self.assertEqual(r1, ["ok"]) + self.assertEqual(r2, ["ok"]) + + def test_clear_callbacks_resets_to_none(self): + """clear_callbacks should set both back to None.""" + rf = make_response_future() + rf.add_callback(lambda r: None) + rf.add_errback(lambda e: None) + self.assertIsNotNone(rf._callbacks) + self.assertIsNotNone(rf._errbacks) + rf.clear_callbacks() + self.assertIsNone(rf._callbacks) + self.assertIsNone(rf._errbacks) + + def test_add_callback_after_result(self): + """add_callback after _set_final_result should run immediately.""" + rf = make_response_future() + rf._set_final_result("data") + results = [] + rf.add_callback(lambda result: results.append(result)) + self.assertEqual(results, ["data"]) + + def test_add_errback_after_exception(self): + """add_errback after _set_final_exception should run immediately.""" + rf = make_response_future() + exc = Exception("fail") + rf._set_final_exception(exc) + errors = [] + rf.add_errback(lambda e: errors.append(e)) + self.assertEqual(errors, [exc]) + + +if __name__ == '__main__': + unittest.main()