2424#include " trampoline_scheduler.hpp"
2525#include " sequence.hpp"
2626
27- #include " ../stdexec/__detail/__atomic.hpp"
2827#include < exception>
2928#include < type_traits>
3029
@@ -82,7 +81,7 @@ namespace exec {
8281 using __child_op_t = stdexec::connect_result_t <__child_on_sched_sender_t , __receiver_t >;
8382
8483 __child_t __child_;
85- __std::atomic_flag __started_{} ;
84+ bool __has_child_op_ = false ;
8685 stdexec::__manual_lifetime<__child_op_t > __child_op_;
8786 trampoline_scheduler __sched_;
8887
@@ -93,11 +92,7 @@ namespace exec {
9392 }
9493
9594 ~__repeat_effect_state () {
96- if (!__started_.test (__std::memory_order_acquire)) {
97- __std::atomic_thread_fence (__std::memory_order_release);
98- // TSan does not support __std::atomic_thread_fence, so we
99- // need to use the TSan-specific __tsan_release instead:
100- STDEXEC_WHEN (STDEXEC_TSAN (), __tsan_release (&__started_));
95+ if (__has_child_op_) {
10196 __child_op_.__destroy ();
10297 }
10398 }
@@ -107,30 +102,42 @@ namespace exec {
107102 return stdexec::connect (
108103 exec::sequence (stdexec::schedule (__sched_), __child_), __receiver_t {this });
109104 });
105+ __has_child_op_ = true ;
106+ }
107+
108+ void __destroy () noexcept {
109+ __child_op_.__destroy ();
110+ __has_child_op_ = false ;
110111 }
111112
112113 void __start () noexcept {
113- const bool __already_started [[maybe_unused]]
114- = __started_.test_and_set (__std::memory_order_relaxed);
115- STDEXEC_ASSERT (!__already_started);
114+ STDEXEC_ASSERT (__has_child_op_);
116115 stdexec::start (__child_op_.__get ());
117116 }
118117
119118 template <class _Tag , class ... _Args>
120- void __complete (_Tag, _Args... __args) noexcept { // Intentionally by value...
121- __child_op_.__destroy (); // ... because this could potentially invalidate them.
119+ void __complete (_Tag, _Args &&...__args) noexcept {
122120 if constexpr (same_as<_Tag, set_value_t >) {
123121 // If the sender completed with true, we're done
124122 STDEXEC_TRY {
125- const bool __done = (static_cast <bool >(__args) && ...);
123+ const bool __done = (static_cast <bool >(static_cast <_Args &&>( __args) ) && ...);
126124 if (__done) {
127125 stdexec::set_value (static_cast <_Receiver &&>(this ->__receiver ()));
128- } else {
126+ return ;
127+ }
128+ __destroy ();
129+ STDEXEC_TRY {
129130 __connect ();
130- stdexec::start (__child_op_.__get ());
131131 }
132+ STDEXEC_CATCH_ALL {
133+ stdexec::set_error (
134+ static_cast <_Receiver &&>(this ->__receiver ()), std::current_exception ());
135+ return ;
136+ }
137+ stdexec::start (__child_op_.__get ());
132138 }
133139 STDEXEC_CATCH_ALL {
140+ __destroy ();
134141 stdexec::set_error (
135142 static_cast <_Receiver &&>(this ->__receiver ()), std::current_exception ());
136143 }
@@ -160,20 +167,14 @@ namespace exec {
160167 __mexception<_INVALID_ARGUMENT_TO_REPEAT_EFFECT_UNTIL_<>, _WITH_SENDER_<_Sender>>
161168 >;
162169
163- template <class _Error >
164- using __error_t = completion_signatures<set_error_t (__decay_t <_Error>)>;
165-
166170 template <class _Sender , class ... _Env>
167171 using __completions_t = stdexec::transform_completion_signatures<
168172 __completion_signatures_of_t <__decay_t <_Sender> &, _Env...>,
169173 stdexec::transform_completion_signatures<
170174 __completion_signatures_of_t <stdexec::schedule_result_t <exec::trampoline_scheduler>, _Env...>,
171- __eptr_completion,
172- __sigs::__default_set_value,
173- __error_t
175+ __eptr_completion
174176 >,
175- __mbind_front_q<__values_t , _Sender>::template __f,
176- __error_t
177+ __mbind_front_q<__values_t , _Sender>::template __f
177178 >;
178179
179180 struct __repeat_effect_tag { };
0 commit comments