diff --git a/doc/strand-rationale.md b/doc/strand-rationale.md new file mode 100644 index 000000000..37ebf082a --- /dev/null +++ b/doc/strand-rationale.md @@ -0,0 +1,146 @@ +# Strand: Why Per-Strand Implementation + +A strand has two reasonable internal designs. The simpler one pools +serialization state across strands; the correct one allocates state +per-strand. Capy uses the per-strand design. This document explains why +the simpler design is wrong and what the per-strand design costs. + +## The previous design + +Capy's original strand service held a fixed array of `strand_impl` +objects, 211 slots, allocated inline in the service and never freed +individually. When a user constructed a new strand, the service +incremented a counter and returned a pointer to `impls_[counter % 211]`. + +```cpp +strand_impl impls_[211]; +std::size_t salt_; + +strand_impl* get_implementation() +{ + std::lock_guard lock(mutex_); + return &impls_[salt_++ % 211]; +} +``` + +This is pure round-robin: the 1st strand gets slot 0, the 212th strand +gets slot 0 again. Two strands that map to the same slot share the same +`strand_impl` object. + +Each `strand_impl` holds: + +- a mutex (`mutex_`) +- a pending operation queue (`pending_`) +- a locked flag (`locked_`) +- the executor identity used by whichever invoker is currently + dispatching + +Two strands that share a slot share all of this. + +## What sharing actually shares + +Sharing a mutex is not inherently a problem. Two strands that hold the +same mutex contend on push and pop operations, which are brief. They +still proceed independently afterward. + +Sharing a queue and a locked flag is a different matter. Those are the +state machine that determines which work runs, in what order, and +through which executor. When two logically independent strands share +this state, the following become possible: + +**Cross-strand blocking.** Strand A is mid-dispatch, so `locked_` is +true. Strand B posts a new operation. B's post sees `locked_` already +set and adds its work to the shared queue without posting a new +invoker. B's work now waits behind A's entire dispatch cycle, even +though A and B are supposed to be independent. + +**Wrong executor dispatch.** The invoker that won the unlocked-to-locked +transition captures the executor of the strand that triggered it. Call +this strand A. If strand B later enqueues work into the shared state, +that work runs through A's executor, not B's. For strands that wrap +the same underlying thread pool, this is invisible. For strands that +wrap different executor layers (a metrics wrapper, a type-erased +`any_executor`, a test shim), operations execute through the wrong +executor, violating the invariants the user associated with B's +executor. + +**False equality.** `operator==` on two distinct strands returns true +when they map to the same slot, because equality is defined as pointer +identity of the impl. + +## Why per-strand is the right choice + +The correctness argument is simple: strand isolation is part of the +contract. The word "strand" implies a serialization domain that is +independent of all other strands. A user who writes code against two +strands is justified in expecting that progress on one does not depend +on progress on the other, and that work posted to one runs through +that strand's executor, not a neighbor's. + +The pooled design cannot provide this guarantee for more than 211 +strands from the same context. + +One possible response is randomization: instead of pure round-robin, +use a hash of the strand's address mixed with a salt counter. This +spreads collisions across time so that (0, 211), (1, 212) are no longer +the deterministic collision pairs. It does not remove collisions. With +1000 strands from one context, roughly five collision pairs exist +somewhere in the set. The bug surface is narrower and harder to trigger +reproducibly, but the class of bug is identical. + +Randomization fixed a performance symptom (deterministic starvation) +without fixing the correctness problem (shared state between independent +strands). Treating these as the same fix is a category error. + +The per-strand design removes the impl pool entirely. Each strand +allocates its own `strand_impl` via `make_shared`. Two strands never +share a queue, a locked flag, or an invoker. Isolation is unconditional. + +The mutex pool stays. 193 mutexes for any number of strands is a real +saving over allocating a mutex per strand. Unlike the impl pool, mutex +sharing has no semantic consequence: the critical sections guarded by +the mutex cover only push/pop and the locked flag check. Two strands +that briefly contend on a shared mutex wait for each other's push/pop +then proceed independently. No state crosses the boundary. + +The key insight is that isolation and contention are not the same +problem. The impl pool conflated them. Removing the impl pool eliminates +the isolation problem; keeping the mutex pool manages the contention +cost without reintroducing the isolation problem. + +## What the per-strand design costs + +**One allocation per strand.** `make_shared` allocates +roughly 80-96 bytes on typical allocators with per-thread arenas +(glibc, jemalloc, tcmalloc). For any strand that posts at least one +operation, this is negligible against the work being dispatched. + +**One pointer of additional size per strand handle.** The strand object +holds a `shared_ptr` rather than a raw pointer. A +`shared_ptr` is two pointers wide; a raw pointer is one. Strand objects +grow by one pointer (typically 8 bytes). + +**Two atomic refcount operations per invoker creation/destruction.** The +invoker coroutine frame holds a copy of the `shared_ptr`, so the +reference count increments when the invoker starts and decrements when +it finishes. These are not on the hot post path; they happen at the +unlocked-to-locked transition (once per dispatch batch), not on every +enqueue. + +The mutex pool bounds memory growth at 193 mutexes regardless of how +many strands exist. A program that creates 10,000 strands does not get +10,000 mutexes; it gets at most 193. + +## Tradeoffs we did not take + +**Per-strand mutex.** Allocating a mutex per strand would eliminate the +mutex pool entirely and remove all cross-strand contention. The cost is +roughly 40 extra bytes per strand. The benefit is marginal: the +critical sections that use the pool mutex are brief, and contention +between unrelated strands is unlikely in practice. This option remains +open if benchmarks show real contention under specific workloads. + +The chosen design (per-strand impl, shared mutex pool) matches the +strategy used by current executor-aware strand implementations in the +C++ library space, which provides confidence that the tradeoffs are +well understood. diff --git a/doc/strand-spec.md b/doc/strand-spec.md new file mode 100644 index 000000000..6b48d532e --- /dev/null +++ b/doc/strand-spec.md @@ -0,0 +1,303 @@ +# Strand + +Each strand allocates a private serialization state via +`shared_ptr`. Strands sharing an execution context borrow +mutexes from a 193-entry pool but never share their queues, `locked_` +flag, or invoker. This document is the design contract; see +`strand-rationale.md` for the motivation. + +## Goals + +- Strand isolation is absolute, not probabilistic. Two distinct strands + never share a queue, a `locked_` flag, or a dispatcher executor. +- Public API of `strand` is unchanged: same operations, same + equality semantics, same `running_in_this_thread`. +- Construction cost: one `std::make_shared` per strand. +- Capy's existing performance optimizations are preserved: the + `strand_queue` per-post wrapper recycler stays per-impl; the invoker + coroutine frame cache moves to the service. + +## Non-goals + +- Per-strand mutex. The shared mutex pool stays. Revisit if benchmarks + show contention from shared mutexes. +- Performance tuning of the mutex pool size or salt function. +- Lock-free hot path. The per-impl mutex is taken under + `post`/`dispatch` for queue mutation and `locked_` flag check. +- Allocator plumbing for `allocate_shared`. Default-construct now; can + add an allocator parameter later without changing this design. +- Changes to `strand_queue`. Its free-list stays; lifetime is bounded by + the impl that owns it. + +## Design + +### Data structures + +```cpp +struct strand_impl + : intrusive_list::node +{ + std::mutex* mutex_ = nullptr; // borrowed from service pool + strand_queue pending_; + bool locked_ = false; + std::atomic dispatch_thread_{}; + + std::atomic service_{nullptr}; + + ~strand_impl(); +}; + +class strand_service_impl : public strand_service +{ + static constexpr std::size_t num_mutexes = 193; + + std::mutex mutex_; + std::size_t salt_ = 0; + std::shared_ptr mutexes_[num_mutexes]; + intrusive_list impl_list_; + std::atomic invoker_frame_cache_{nullptr}; +}; + +template +class strand +{ + std::shared_ptr impl_; + Ex ex_; +}; +``` + +Key design choices: + +- Each strand owns its `strand_impl` via `shared_ptr` (no pooling of + impls). +- `strand` holds `shared_ptr` rather than a raw pointer + (size grows by one pointer). +- The invoker frame cache lives on the service, not the impl. The cache + slot always points at a structure that lives for the execution + context's lifetime, removing the lifetime hazard that would otherwise + affect per-strand impls. +- `strand_impl` holds a borrowed `mutex_` pointer, an intrusive list + base class (via `intrusive_list::node`), and a + back-pointer to the service. +- The service holds a 193-entry mutex pool, the head of the live-impl + linked list, and the invoker frame cache slot. +- 193 is a prime large enough that hash collisions are rare in practice + while keeping the static mutex array small. + +### Public detail-header surface + +```cpp +class BOOST_CAPY_DECL strand_service + : public execution_context::service +{ +public: + virtual ~strand_service(); + + // Returns shared_ptr instead of raw pointer. + virtual std::shared_ptr + create_implementation() = 0; + + static bool + running_in_this_thread(strand_impl& impl) noexcept; + + // Takes shared_ptr by const-ref so post_invoker can capture + // lifetime on the unlocked-to-locked transition without paying an + // atomic refcount on every post when the invoker is already running. + static std::coroutine_handle<> + dispatch( + std::shared_ptr const& impl, + executor_ref ex, + std::coroutine_handle<> h); + + static void + post( + std::shared_ptr const& impl, + executor_ref ex, + std::coroutine_handle<> h); +}; +``` + +The strand constructor calls `create_implementation()` and stores the +returned `shared_ptr`. Public-API surface of `strand` does not +change. The `running_in_this_thread` query is non-mutating and does +not extend lifetime, so it stays as `strand_impl&`. + +### Construction + +```cpp +std::shared_ptr +strand_service_impl::create_implementation() +{ + auto new_impl = std::make_shared(); + + std::lock_guard lock(mutex_); + + std::size_t s = salt_++; + std::size_t idx = reinterpret_cast(new_impl.get()); + idx += idx >> 3; + idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2); + idx %= num_mutexes; + if(!mutexes_[idx]) + mutexes_[idx] = std::make_shared(); + new_impl->mutex_ = mutexes_[idx].get(); + + impl_list_.push_back(new_impl.get()); + new_impl->service_ = this; + + return new_impl; +} +``` + +The hash mixes the impl's address with a monotonic salt and the golden +ratio constant. The salt prevents deterministic collision sequences +when the allocator returns predictable addresses; the address bits +spread otherwise-correlated allocations. Mutex slots are allocated +lazily: a program that creates few strands never instantiates all 193 +mutexes. The impl is appended to `impl_list_` via `push_back`; order +does not matter since shutdown drains the entire list. + +### Dispatch / post + +State machine is unchanged from the previous design. The key +differences: + +- `enqueue`, `dispatch_pending`, `try_unlock` operate on a strand's own + `pending_` and `locked_` (no cross-strand sharing). +- The mutex they take is `*impl.mutex_`, which may be shared with other + impls that hashed to the same pool slot. Critical sections cover only + brief queue push/pop and the `locked_` flag check. +- The static `post`/`dispatch` entry points take + `shared_ptr const&`. When the unlocked-to-locked + transition wins, they copy the shared_ptr into `post_invoker`, which + passes it as the coroutine parameter held in the coroutine frame. + That keeps the impl alive for the duration of the dispatch cycle, + even if the user drops their last strand handle. When the transition + does not win (work is enqueued onto an already-running invoker), no + shared_ptr copy is made. The existing invoker's frame already holds + a reference. The hot path adds zero atomic refcount operations versus + the previous raw-pointer code. + +### Invoker frame allocation + +```cpp +void* operator new(std::size_t n, strand_impl& impl) +{ + auto* svc = impl.service_; + constexpr auto A = alignof(strand_service_impl*); + std::size_t padded = (n + A - 1) & ~(A - 1); + std::size_t total = padded + sizeof(strand_service_impl*); + + void* p = svc->invoker_frame_cache_.exchange( + nullptr, std::memory_order_acquire); + if(!p || p == kCacheClosed) + p = ::operator new(total); + + *reinterpret_cast( + static_cast(p) + padded) = svc; + return p; +} + +void operator delete(void* p, std::size_t n) noexcept +{ + constexpr auto A = alignof(strand_service_impl*); + std::size_t padded = (n + A - 1) & ~(A - 1); + auto* svc = *reinterpret_cast( + static_cast(p) + padded); + + void* expected = nullptr; + if(!svc->invoker_frame_cache_.compare_exchange_strong( + expected, p, std::memory_order_release)) + ::operator delete(p); +} +``` + +The trailer holds a service pointer (lifetime: execution context), +not an impl pointer (lifetime: per-strand). The invoker's `make_invoker` +parameter is a shared_ptr stored in the coroutine frame; that one copy +keeps the impl alive past any user-side strand drop. `operator delete` +reads only the trailer (service-scoped), so impl may be dead at delete +time without consequence. + +### Destruction + +```cpp +strand_impl::~strand_impl() +{ + auto* svc = service_.load(std::memory_order_acquire); + if(!svc) return; + std::lock_guard lock(svc->mutex_); + svc->impl_list_.remove(this); +} +``` + +`~strand_queue` (already implemented) destroys any pending wrappers +without resuming them. That covers the case where work was queued but +the inner executor never invoked the invoker before context teardown. + +### Shutdown + +```cpp +void strand_service_impl::shutdown() override +{ + std::lock_guard lock(mutex_); + while(auto* p = impl_list_.pop_front()) + { + std::lock_guard impl_lock(*p->mutex_); + p->locked_ = true; + p->service_.store(nullptr, std::memory_order_release); + } + + void* fp = invoker_frame_cache_.exchange( + kCacheClosed, std::memory_order_acq_rel); + if(fp) ::operator delete(fp); +} +``` + +After shutdown, user-held strands still own their impls via +`shared_ptr`. When they drop, `~strand_impl` sees `service_ == nullptr` +and short-circuits without touching service state, which may have been +freed. + +### Lifetime cases + +1. **User drops strand, no work in flight.** Last `shared_ptr` drops; + `~strand_impl` unlinks; impl freed. `~strand_queue` discards any + wrappers (edge case only; `enqueue` posts the invoker on the + unlocked-to-locked transition, so wrappers are normally drained + before the strand becomes inactive). + +2. **User drops strand, invoker still running.** The invoker promise + holds the last `shared_ptr`; impl stays alive; invoker drains and + exits at `final_suspend`. Frame deletion order: promise destructor + (releases shared_ptr, runs `~strand_impl`), then `operator delete` + (recycles frame to service cache; service is still alive). Safe. + +3. **Service shutdown while user holds strand.** Shutdown unlinks the + impl from the list, marks it locked, and nulls its `service_` + back-pointer. When the user later drops the strand, `~strand_impl` + sees `service_ == nullptr` and short-circuits without touching + service state, which may have been freed. + +4. **Service shutdown with invoker queued but never invoked.** The + inner executor's destructor drops the queued continuation; the + coroutine handle is never destroyed; the promise's `shared_ptr` + never releases; impl and frame leak. Pre-existing behavior, not + introduced by this design. + +5. **Service shutdown with invoker mid-execution.** The invoker accesses + the service only via the trailer in `operator delete` (cache-slot + recycle). Shutdown sets the cache to `kCacheClosed`; concurrent + invokers see the sentinel and call `::operator delete` instead. The + service object itself must outlive any in-flight invoker. Capy's + `execution_context` teardown is responsible for stopping the inner + executor (which drains queued continuations) before destroying + services. This matches the contract the previous implementation + relied on. + +### Move semantics + +The documented contract is unchanged: "a moved-from strand is only safe +to destroy or reassign." The moved-from `shared_ptr` is nullptr; calls +on it dereference nullptr, which enforces the contract rather than +merely documenting it. The previous design left the moved-from strand +silently pointing at the same impl as the moved-to strand. diff --git a/doc/unlisted/execution-strand.adoc b/doc/unlisted/execution-strand.adoc index 74f081445..453da73c3 100644 --- a/doc/unlisted/execution-strand.adoc +++ b/doc/unlisted/execution-strand.adoc @@ -220,15 +220,14 @@ void callback() == Implementation Notes -Capy's strand uses a fixed pool of 211 implementation objects. New strands -hash to select an impl from the pool. Strands that hash to the same index -share serialization: - -* This is harmless — just extra serialization -* Rare with 211 buckets -* No allocation for strand creation - -This design trades minimal extra serialization for zero per-strand allocation. +Each strand owns a private serialization state. Strands sharing an +execution context draw from a small pool of mutexes (193 entries) for +their internal critical sections; mutex sharing causes only brief +contention on push/pop, never cross-strand state sharing. + +* Construction cost: one `std::make_shared` per strand +* Two distinct strands always serialize independently +* The mutex pool keeps memory bounded as strand count grows == When NOT to Use Strands diff --git a/include/boost/capy/ex/detail/strand_service.hpp b/include/boost/capy/ex/detail/strand_service.hpp index 7e1960a29..6f60c2c2f 100644 --- a/include/boost/capy/ex/detail/strand_service.hpp +++ b/include/boost/capy/ex/detail/strand_service.hpp @@ -15,7 +15,7 @@ #include #include -#include +#include namespace boost { namespace capy { @@ -32,13 +32,11 @@ struct is_strand : std::false_type {}; template struct is_strand> : std::true_type {}; -//---------------------------------------------------------- +/** Service that manages strand implementations. -/** Service that manages pooled strand implementations. - - This service maintains a fixed pool of strand_impl objects. - When a strand is constructed, it obtains a pointer to one - of these pooled implementations based on a hash. + Allocates one `strand_impl` per strand. Maintains a shared pool of + mutexes that strand_impls borrow, sized to keep memory bounded as + strand count grows. @par Thread Safety The service operations are thread-safe. @@ -51,16 +49,16 @@ class BOOST_CAPY_DECL strand_service */ virtual ~strand_service(); - /** Return a pointer to a pooled implementation. + /** Allocate a new strand implementation. - Uses a hash to select an implementation from the pool. - The salt is incremented after each call to distribute - strands across the pool. + Each call returns a fresh `strand_impl` owned by the returned + `shared_ptr`. The implementation borrows a mutex from the + service's shared pool. - @return Pointer to a strand_impl from the pool. + @return shared_ptr to the new strand_impl. */ - virtual strand_impl* - get_implementation() = 0; + virtual std::shared_ptr + create_implementation() = 0; /** Check if THIS thread is currently executing in the strand. */ static bool @@ -68,11 +66,17 @@ class BOOST_CAPY_DECL strand_service /** Dispatch through strand; returns handle for symmetric transfer. */ static std::coroutine_handle<> - dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h); + dispatch( + std::shared_ptr const& impl, + executor_ref ex, + std::coroutine_handle<> h); /** Post to strand queue. */ static void - post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h); + post( + std::shared_ptr const& impl, + executor_ref ex, + std::coroutine_handle<> h); protected: strand_service(); diff --git a/include/boost/capy/ex/strand.hpp b/include/boost/capy/ex/strand.hpp index 1f7753a44..f639db8c2 100644 --- a/include/boost/capy/ex/strand.hpp +++ b/include/boost/capy/ex/strand.hpp @@ -36,11 +36,12 @@ namespace capy { Coroutines resumed through a strand shall not run concurrently. @par Implementation - The strand uses a service-based architecture with a fixed pool - of 211 implementation objects. New strands hash to select an - impl from the pool. Strands that hash to the same index share - serialization, which is harmless (just extra serialization) - and rare with 211 buckets. + Each strand allocates a private serialization state. Strands + constructed from the same execution context share a small pool + of mutexes (193 entries) selected by hash; mutex sharing causes + only brief contention on the push/pop critical section, never + cross-strand state sharing. Construction cost: one + `std::make_shared` per strand. @par Executor Concept This class satisfies the `Executor` concept, providing: @@ -73,9 +74,11 @@ namespace capy { template class strand { - detail::strand_impl* impl_; + std::shared_ptr impl_; Ex ex_; + friend struct strand_test; + public: /** The type of the underlying executor. */ @@ -83,9 +86,8 @@ class strand /** Construct a strand for the specified executor. - Obtains a strand implementation from the service associated - with the executor's context. The implementation is selected - from a fixed pool using a hash function. + Allocates a fresh strand implementation from the service + associated with the executor's context. @param ex The inner executor to wrap. Coroutines will ultimately be dispatched through this executor. @@ -101,7 +103,7 @@ class strand explicit strand(Ex1&& ex) : impl_(detail::get_strand_service(ex.context()) - .get_implementation()) + .create_implementation()) , ex_(std::forward(ex)) { } @@ -198,7 +200,7 @@ class strand bool operator==(strand const& other) const noexcept { - return impl_ == other.impl_; + return impl_.get() == other.impl_.get(); } /** Post a continuation to the strand. @@ -218,7 +220,7 @@ class strand void post(continuation& c) const { - detail::strand_service::post(*impl_, executor_ref(ex_), c.h); + detail::strand_service::post(impl_, executor_ref(ex_), c.h); } /** Dispatch a continuation through the strand. @@ -241,7 +243,7 @@ class strand std::coroutine_handle<> dispatch(continuation& c) const { - return detail::strand_service::dispatch(*impl_, executor_ref(ex_), c.h); + return detail::strand_service::dispatch(impl_, executor_ref(ex_), c.h); } }; diff --git a/src/ex/detail/strand_impl.hpp b/src/ex/detail/strand_impl.hpp new file mode 100644 index 000000000..fa976b763 --- /dev/null +++ b/src/ex/detail/strand_impl.hpp @@ -0,0 +1,48 @@ +// +// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_IMPL_HPP +#define BOOST_CAPY_SRC_EX_DETAIL_STRAND_IMPL_HPP + +#include "src/ex/detail/strand_queue.hpp" +#include +#include +#include +#include + +namespace boost { +namespace capy { +namespace detail { + +class strand_service_impl; + +/** Implementation state for a single strand. + + Each strand owns one of these via shared_ptr. The mutex is borrowed + from the service's shared pool. The intrusive_list base links this + impl into the service's list of live impls for shutdown traversal. +*/ +struct strand_impl + : intrusive_list::node +{ + std::mutex* mutex_ = nullptr; + strand_queue pending_; + bool locked_ = false; + std::atomic dispatch_thread_{}; + + std::atomic service_{nullptr}; + + ~strand_impl(); +}; + +} // namespace detail +} // namespace capy +} // namespace boost + +#endif diff --git a/src/ex/detail/strand_service.cpp b/src/ex/detail/strand_service.cpp index 829329251..788696e60 100644 --- a/src/ex/detail/strand_service.cpp +++ b/src/ex/detail/strand_service.cpp @@ -7,148 +7,73 @@ // Official repository: https://github.com/cppalliance/capy // -#include "src/ex/detail/strand_queue.hpp" +#include "src/ex/detail/strand_impl.hpp" #include #include -#include #include -#include -#include +#include #include namespace boost { namespace capy { namespace detail { -//---------------------------------------------------------- - -/** Implementation state for a strand. - - Each strand_impl provides serialization for coroutines - dispatched through strands that share it. -*/ -// Sentinel stored in cached_frame_ after shutdown to prevent +// Sentinel stored in invoker_frame_cache_ after shutdown to prevent // in-flight invokers from repopulating a freed cache slot. inline void* const kCacheClosed = reinterpret_cast(1); -struct strand_impl -{ - std::mutex mutex_; - strand_queue pending_; - bool locked_ = false; - std::atomic dispatch_thread_{}; - std::atomic cached_frame_{nullptr}; -}; - -//---------------------------------------------------------- - -/** Invoker coroutine for strand dispatch. - - Uses custom allocator to recycle frame - one allocation - per strand_impl lifetime, stored in trailer for recovery. -*/ -struct strand_invoker -{ - struct promise_type - { - // Used to post the invoker through the inner executor. - // Lives in the coroutine frame (heap-allocated), so has - // a stable address for the duration of the queue residency. - continuation self_; - - void* operator new(std::size_t n, strand_impl& impl) - { - constexpr auto A = alignof(strand_impl*); - std::size_t padded = (n + A - 1) & ~(A - 1); - std::size_t total = padded + sizeof(strand_impl*); - - void* p = impl.cached_frame_.exchange( - nullptr, std::memory_order_acquire); - if(!p || p == kCacheClosed) - p = ::operator new(total); - - // Trailer lets delete recover impl - *reinterpret_cast( - static_cast(p) + padded) = &impl; - return p; - } - - void operator delete(void* p, std::size_t n) noexcept - { - constexpr auto A = alignof(strand_impl*); - std::size_t padded = (n + A - 1) & ~(A - 1); - - auto* impl = *reinterpret_cast( - static_cast(p) + padded); - - void* expected = nullptr; - if(!impl->cached_frame_.compare_exchange_strong( - expected, p, std::memory_order_release)) - ::operator delete(p); - } - - strand_invoker get_return_object() noexcept - { return {std::coroutine_handle::from_promise(*this)}; } - - std::suspend_always initial_suspend() noexcept { return {}; } - std::suspend_never final_suspend() noexcept { return {}; } - void return_void() noexcept {} - void unhandled_exception() { std::terminate(); } - }; - - std::coroutine_handle h_; -}; - -//---------------------------------------------------------- +/** Concrete strand_service. -/** Concrete implementation of strand_service. + Holds a shared mutex pool (193 entries), a linked list of live + impls (for shutdown traversal), and a single-slot invoker + coroutine frame cache shared across all strands of this service. - Holds the fixed pool of strand_impl objects. + The dispatch helpers (`enqueue`, `dispatch_pending`, etc.) are + public so the namespace-scope `make_invoker` coroutine and the + `strand_service` static methods can call them without friendship. */ class strand_service_impl : public strand_service { - static constexpr std::size_t num_impls = 211; +public: + static constexpr std::size_t num_mutexes = 193; - strand_impl impls_[num_impls]; - std::size_t salt_ = 0; std::mutex mutex_; + std::size_t salt_ = 0; + std::shared_ptr mutexes_[num_mutexes]; + intrusive_list impl_list_; + std::atomic invoker_frame_cache_{nullptr}; -public: explicit strand_service_impl(execution_context&) { } - strand_impl* - get_implementation() override + std::shared_ptr + create_implementation() override { + auto new_impl = std::make_shared(); + std::lock_guard lock(mutex_); - std::size_t index = salt_++; - index = index % num_impls; - return &impls_[index]; - } -protected: - void - shutdown() override - { - for(std::size_t i = 0; i < num_impls; ++i) - { - std::lock_guard lock(impls_[i].mutex_); - impls_[i].locked_ = true; + std::size_t s = salt_++; + std::size_t idx = reinterpret_cast(new_impl.get()); + idx += idx >> 3; + idx ^= s + 0x9e3779b9 + (idx << 6) + (idx >> 2); + idx %= num_mutexes; + if(!mutexes_[idx]) + mutexes_[idx] = std::make_shared(); + new_impl->mutex_ = mutexes_[idx].get(); - void* p = impls_[i].cached_frame_.exchange( - kCacheClosed, std::memory_order_acquire); - if(p) - ::operator delete(p); - } + impl_list_.push_back(new_impl.get()); + new_impl->service_.store(this, std::memory_order_release); + + return new_impl; } -private: static bool enqueue(strand_impl& impl, std::coroutine_handle<> h) { - std::lock_guard lock(impl.mutex_); + std::lock_guard lock(*impl.mutex_); impl.pending_.push(h); if(!impl.locked_) { @@ -163,7 +88,7 @@ class strand_service_impl : public strand_service { strand_queue::taken_batch batch; { - std::lock_guard lock(impl.mutex_); + std::lock_guard lock(*impl.mutex_); batch = impl.pending_.take_all(); } impl.pending_.dispatch_batch(batch); @@ -172,7 +97,7 @@ class strand_service_impl : public strand_service static bool try_unlock(strand_impl& impl) { - std::lock_guard lock(impl.mutex_); + std::lock_guard lock(*impl.mutex_); if(impl.pending_.empty()) { impl.locked_ = false; @@ -193,37 +118,135 @@ class strand_service_impl : public strand_service impl.dispatch_thread_.store(std::thread::id{}); } - // Loops until queue empty (aggressive). Alternative: per-batch fairness - // (repost after each batch to let other work run) - explore if starvation observed. - static strand_invoker - make_invoker(strand_impl& impl) + // Defined below; needs strand_invoker complete. + static void + post_invoker(std::shared_ptr impl, executor_ref ex); + +protected: + void + shutdown() override { - strand_impl* p = &impl; - for(;;) + std::lock_guard lock(mutex_); + while(auto* p = impl_list_.pop_front()) { - set_dispatch_thread(*p); - dispatch_pending(*p); - if(try_unlock(*p)) - { - clear_dispatch_thread(*p); - co_return; - } + std::lock_guard impl_lock(*p->mutex_); + p->locked_ = true; + p->service_.store(nullptr, std::memory_order_release); } + + void* fp = invoker_frame_cache_.exchange( + kCacheClosed, std::memory_order_acq_rel); + if(fp) ::operator delete(fp); } +}; - static void - post_invoker(strand_impl& impl, executor_ref ex) +/** Invoker coroutine that drains a strand's pending queue. + + Runs once the strand transitions from unlocked to locked. Holds + the impl alive via the coroutine parameter (a shared_ptr in the + coroutine frame), so user code may drop its strand handle while + the invoker is mid-flight. + + The frame's allocator recycles a single per-service slot. The + trailer points at the service (lifetime: execution_context), + NOT the impl (lifetime: per-strand), so operator delete is + safe even after the impl has been destroyed. +*/ +struct strand_invoker +{ + struct promise_type { - auto invoker = make_invoker(impl); - auto& self = invoker.h_.promise().self_; - self.h = invoker.h_; - ex.post(self); - } + // Stored in the coroutine frame so its address is stable for + // posting to the inner executor. + continuation self_; + + void* + operator new( + std::size_t n, + std::shared_ptr const& impl) + { + auto* svc = impl->service_.load(std::memory_order_acquire); + constexpr auto A = alignof(strand_service_impl*); + std::size_t padded = (n + A - 1) & ~(A - 1); + std::size_t total = padded + sizeof(strand_service_impl*); + + void* p = svc->invoker_frame_cache_.exchange( + nullptr, std::memory_order_acquire); + if(!p || p == kCacheClosed) + p = ::operator new(total); - friend class strand_service; + *reinterpret_cast( + static_cast(p) + padded) = svc; + return p; + } + + void + operator delete(void* p, std::size_t n) noexcept + { + constexpr auto A = alignof(strand_service_impl*); + std::size_t padded = (n + A - 1) & ~(A - 1); + auto* svc = *reinterpret_cast( + static_cast(p) + padded); + + void* expected = nullptr; + if(!svc->invoker_frame_cache_.compare_exchange_strong( + expected, p, std::memory_order_release)) + ::operator delete(p); + } + + strand_invoker + get_return_object() noexcept + { + return {std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() noexcept {} + void unhandled_exception() { std::terminate(); } + }; + + std::coroutine_handle h_; }; -//---------------------------------------------------------- +// The by-value parameter lives in the coroutine frame for the +// invoker's lifetime, keeping the impl alive past any user-side +// strand drop. +static +strand_invoker +make_invoker(std::shared_ptr impl) +{ + auto* p = impl.get(); + for(;;) + { + strand_service_impl::set_dispatch_thread(*p); + strand_service_impl::dispatch_pending(*p); + if(strand_service_impl::try_unlock(*p)) + { + strand_service_impl::clear_dispatch_thread(*p); + co_return; + } + } +} + +void +strand_service_impl::post_invoker( + std::shared_ptr impl, + executor_ref ex) +{ + auto invoker = make_invoker(std::move(impl)); + auto& self = invoker.h_.promise().self_; + self.h = invoker.h_; + ex.post(self); +} + +strand_impl::~strand_impl() +{ + auto* svc = service_.load(std::memory_order_acquire); + if(!svc) return; + std::lock_guard lock(svc->mutex_); + svc->impl_list_.remove(this); +} strand_service:: strand_service() @@ -243,21 +266,27 @@ running_in_this_thread(strand_impl& impl) noexcept std::coroutine_handle<> strand_service:: -dispatch(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) +dispatch( + std::shared_ptr const& impl, + executor_ref ex, + std::coroutine_handle<> h) { - if(running_in_this_thread(impl)) + if(running_in_this_thread(*impl)) return h; - if(strand_service_impl::enqueue(impl, h)) + if(strand_service_impl::enqueue(*impl, h)) strand_service_impl::post_invoker(impl, ex); return std::noop_coroutine(); } void strand_service:: -post(strand_impl& impl, executor_ref ex, std::coroutine_handle<> h) +post( + std::shared_ptr const& impl, + executor_ref ex, + std::coroutine_handle<> h) { - if(strand_service_impl::enqueue(impl, h)) + if(strand_service_impl::enqueue(*impl, h)) strand_service_impl::post_invoker(impl, ex); } diff --git a/test/unit/ex/strand.cpp b/test/unit/ex/strand.cpp index d3795ab89..fc92e4ab5 100644 --- a/test/unit/ex/strand.cpp +++ b/test/unit/ex/strand.cpp @@ -10,6 +10,9 @@ // Test that header file is self-contained. #include +// Full strand_impl definition for white-box collision tests. +#include "src/ex/detail/strand_impl.hpp" + #include #include #include @@ -231,6 +234,51 @@ make_order_coro(std::vector& log, std::mutex& log_mutex, int id) }(&log, &log_mutex, id); } +struct lifetime_coro +{ + struct promise_type + { + lifetime_coro + get_return_object() noexcept + { + return lifetime_coro{ + std::coroutine_handle::from_promise(*this)}; + } + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() noexcept {} + void unhandled_exception() { std::terminate(); } + }; + + std::coroutine_handle h_; + + ~lifetime_coro() { if(h_) h_.destroy(); } + lifetime_coro(lifetime_coro&& other) noexcept : h_(other.h_) { other.h_ = nullptr; } + lifetime_coro& operator=(lifetime_coro&& other) noexcept + { + if(h_) h_.destroy(); + h_ = other.h_; + other.h_ = nullptr; + return *this; + } + + std::coroutine_handle handle() const noexcept { return h_; } + void release() noexcept { h_ = nullptr; } + +private: + explicit lifetime_coro(std::coroutine_handle h) : h_(h) {} + friend lifetime_coro make_lifetime_coro(std::atomic&); +}; + +inline lifetime_coro +make_lifetime_coro(std::atomic& flag) +{ + return [](std::atomic* f) -> lifetime_coro { + f->store(true); + co_return; + }(&flag); +} + } // namespace struct strand_test @@ -328,13 +376,244 @@ struct strand_test auto s1 = strand(pool.get_executor()); auto s2 = s1; - // Copies are equal + // Copies share the same impl. BOOST_TEST(s1 == s2); - // Different strands from same pool may or may not be equal - // depending on internal hash collision + // Distinct strands have distinct impls. auto s3 = strand(pool.get_executor()); - (void)s3; + BOOST_TEST(!(s1 == s3)); + } + + void + testNoEqualityCollisions() + { + thread_pool pool(1); + constexpr int N = 1000; + + std::vector> strands; + strands.reserve(N); + for(int i = 0; i < N; ++i) + strands.push_back(strand(pool.get_executor())); + + int collisions = 0; + for(int i = 0; i < N; ++i) + for(int j = i + 1; j < N; ++j) + if(strands[i] == strands[j]) + ++collisions; + + BOOST_TEST_EQ(collisions, 0); + } + + void + testStrandsAreIndependent() + { + // Two threads so two strands can run concurrently. Construct + // enough strands that the first and last would have shared an + // impl under the previous 211-slot pooled design; verify the + // new per-strand design lets them run in parallel. + thread_pool pool(2); + + constexpr int N = 212; // > 211 forces a hash-pool collision pre-refactor + std::vector> strands; + strands.reserve(N); + for(int i = 0; i < N; ++i) + strands.push_back(strand(pool.get_executor())); + + auto& sA = strands.front(); + auto& sB = strands.back(); + + std::atomic a_started{false}; + std::atomic a_done{false}; + std::atomic b_done{false}; + + struct latched_coro + { + struct promise_type + { + latched_coro + get_return_object() noexcept + { + return latched_coro{ + std::coroutine_handle::from_promise(*this)}; + } + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() noexcept {} + void unhandled_exception() { std::terminate(); } + }; + std::coroutine_handle h_; + }; + + auto make_latched = + [](std::atomic* started, + std::atomic& done, + std::chrono::milliseconds delay) -> latched_coro + { + if(started) started->store(true); + std::this_thread::sleep_for(delay); + done.store(true); + co_return; + }; + + auto coro_a = make_latched( + &a_started, a_done, std::chrono::milliseconds(200)); + continuation ca{coro_a.h_}; + sA.post(ca); + coro_a.h_ = nullptr; + + // Wait until A is actively sleeping + BOOST_TEST(wait_for([&]{ return a_started.load(); })); + + auto coro_b = make_latched( + nullptr, b_done, std::chrono::milliseconds(0)); + continuation cb{coro_b.h_}; + sB.post(cb); + coro_b.h_ = nullptr; + + // B should complete while A is still sleeping + BOOST_TEST(wait_for( + [&]{ return b_done.load(); }, + std::chrono::milliseconds(150))); + BOOST_TEST(!a_done.load()); + + // Let A finish so the test cleans up + BOOST_TEST(wait_for([&]{ return a_done.load(); })); + } + + void + testTransientStrandLifetime() + { + thread_pool pool(1); + std::atomic done{false}; + std::weak_ptr impl_weak; + + { + auto s = strand(pool.get_executor()); + impl_weak = s.impl_; + auto coro = make_lifetime_coro(done); + continuation c{coro.handle()}; + s.post(c); + coro.release(); + } // strand handle dropped here + + BOOST_TEST(wait_for([&]{ return done.load(); })); + // After the invoker drains and exits, the impl shared_ptr in + // its coroutine frame releases. The weak_ptr should expire. + BOOST_TEST(wait_for([&]{ return impl_weak.expired(); })); + } + + void + testManyStrandsStress() + { + thread_pool pool(4); + constexpr int num_strands = 10000; + constexpr int posts_per_strand = 3; + + std::atomic total{0}; + + std::vector> strands; + strands.reserve(num_strands); + for(int i = 0; i < num_strands; ++i) + strands.push_back(strand(pool.get_executor())); + + std::vector coros; + coros.reserve(num_strands * posts_per_strand); + + for(int i = 0; i < num_strands; ++i) + { + for(int j = 0; j < posts_per_strand; ++j) + { + coros.push_back(make_counter_coro(total)); + continuation c{coros.back().handle()}; + strands[i].post(c); + coros.back().release(); + } + } + + BOOST_TEST(wait_for( + [&]{ return total.load() >= num_strands * posts_per_strand; }, + std::chrono::milliseconds(30000))); + BOOST_TEST_EQ(total.load(), num_strands * posts_per_strand); + } + + void + testMutexPoolCollisionIsolation() + { + // 193 mutexes in the service pool. With > 193 strands, at least + // two must share a mutex. Scan to find a colliding pair, then + // verify they run concurrently when posted to in parallel. + thread_pool pool(2); + + constexpr int N = 200; + std::vector> strands; + strands.reserve(N); + for(int i = 0; i < N; ++i) + strands.push_back(strand(pool.get_executor())); + + // Find a colliding pair via the borrowed mutex pointer. + int idx_a = -1, idx_b = -1; + for(int i = 0; i < N && idx_b < 0; ++i) + { + for(int j = i + 1; j < N; ++j) + { + if(strands[i].impl_->mutex_ == strands[j].impl_->mutex_) + { + idx_a = i; + idx_b = j; + break; + } + } + } + BOOST_TEST(idx_a >= 0); // pigeonhole guarantees a hit + if(idx_a < 0) + return; + + auto& sA = strands[idx_a]; + auto& sB = strands[idx_b]; + + std::atomic max_active{0}; + std::atomic active{0}; + std::atomic done{0}; + + // Each coroutine increments active, then waits at a rendezvous + // until both have arrived (or timeout). If colliding strands run + // in parallel, both observe active==2; if they serialize, the + // first waits the full timeout and max_active never reaches 2. + auto make_busy = [&]() -> counter_coro { + return [](std::atomic* a, + std::atomic* m, + std::atomic* d) -> counter_coro + { + int cur = ++(*a); + int prev = m->load(); + while(cur > prev && !m->compare_exchange_weak(prev, cur)) {} + auto deadline = std::chrono::steady_clock::now() + + std::chrono::seconds(2); + while(a->load() < 2 && + std::chrono::steady_clock::now() < deadline) + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + int cur2 = a->load(); + int prev2 = m->load(); + while(cur2 > prev2 && !m->compare_exchange_weak(prev2, cur2)) {} + --(*a); + ++(*d); + co_return; + }(&active, &max_active, &done); + }; + + auto coroA = make_busy(); + auto coroB = make_busy(); + continuation cA{coroA.handle()}; + continuation cB{coroB.handle()}; + sA.post(cA); + sB.post(cB); + coroA.release(); + coroB.release(); + + BOOST_TEST(wait_for( + [&]{ return done.load() >= 2; }, + std::chrono::seconds(10))); + BOOST_TEST_EQ(max_active.load(), 2); } void @@ -658,6 +937,11 @@ struct strand_test testContext(); testWorkTracking(); testEquality(); + testNoEqualityCollisions(); + testStrandsAreIndependent(); + testTransientStrandLifetime(); + testManyStrandsStress(); + testMutexPoolCollisionIsolation(); testRunningInThisThread(); testPost(); testDispatch();