Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/actor/typed_message.hpp>
18 : : #include <hpactor/hpactor_config.hpp>
19 : : #include <hpactor/mailbox/mpsc_actor_mailbox.hpp>
20 : : #include <hpactor/sched/coroutine_task.hpp>
21 : : #include <hpactor/sched/scheduler.hpp>
22 : :
23 : : #include <atomic>
24 : : #include <cstdint>
25 : :
26 : : #if HPACTOR_SUPPORT_COROUTINES
27 : :
28 : : # include <coroutine>
29 : :
30 : : namespace hpactor::sched {
31 : :
32 : : // MailboxAwaiter: awaitable for co_await actor.receive()
33 : : // Suspends when mailbox is empty, resumes when message arrives
34 : : // T is the message type (e.g., Message<MessageVariant>)
35 : : template <typename T> class MailboxAwaiter {
36 : : public:
37 : 22 : explicit MailboxAwaiter(CoroutinePromise& promise,
38 : : mailbox::MPSCActorMailbox<T>* mailbox) noexcept
39 : 22 : : promise_(promise), mailbox_(mailbox) {}
40 : :
41 : : // Return true if message already available (don't suspend)
42 : 20 : bool await_ready() const noexcept {
43 : : // Check if message arrived between last suspension and now
44 : 20 : return !mailbox_->was_empty();
45 : : }
46 : :
47 : : // Called when suspending
48 : 10 : bool await_suspend(std::coroutine_handle<> continuation) noexcept {
49 : : // Check emptiness at this moment — a message may have arrived since
50 : : // await_ready(). If a message arrived while we were deciding, the
51 : : // sender already claimed the wakeup via CAS(true, false) on was_empty —
52 : : // don't suspend.
53 : 10 : bool was_empty = mailbox_->was_empty();
54 : 10 : if (!was_empty) {
55 : 1 : return false;
56 : : }
57 : :
58 : : // Mailbox is still empty — safely reset edge-trigger so the next
59 : : // enqueue (after we suspend) can claim the wakeup.
60 : 9 : mailbox_->set_was_empty(true);
61 : :
62 : : // Transition: Running → Idle
63 : 9 : uint32_t expected = ActorState::kRunning;
64 : 9 : if (promise_.actor_state->cas(expected, ActorState::kIdle)) {
65 : 8 : promise_.continuation = continuation;
66 : 8 : return true; // successfully suspended
67 : : }
68 : : // State was not Running — actor may have already terminated
69 : 1 : return false; // don't suspend
70 : : }
71 : :
72 : : // Called when resuming (message arrived)
73 : 12 : T await_resume() noexcept {
74 : : // Dequeue and return the message
75 : 12 : auto* msg = mailbox_->dequeue();
76 : 12 : if (msg) {
77 : : // Return by moving the Message out
78 : 12 : return std::move(*msg);
79 : : }
80 : : // Return empty message if dequeue failed
81 : 0 : return T{};
82 : : }
83 : :
84 : : private:
85 : : CoroutinePromise& promise_;
86 : : mailbox::MPSCActorMailbox<T>* mailbox_;
87 : : };
88 : :
89 : : // TimerAwaiter: awaitable for co_await scheduler.schedule_after(delay)
90 : : // Wires to HybridScheduler::schedule_timer() for real timer integration
91 : : class TimerAwaiter {
92 : : public:
93 : : TimerAwaiter(int64_t delay_ns, HybridScheduler& scheduler, ActorId actor_id,
94 : : uint8_t priority = 0) noexcept
95 : : : scheduler_(scheduler), actor_id_(actor_id), delay_ns_(delay_ns),
96 : : priority_(priority) {}
97 : :
98 : : bool await_ready() const noexcept {
99 : : return false;
100 : : }
101 : :
102 : : bool await_suspend(std::coroutine_handle<> continuation) noexcept {
103 : : continuation_ = continuation;
104 : :
105 : : // Set promise to IOWaiting
106 : : auto& promise = std::coroutine_handle<CoroutinePromise>::from_address(
107 : : continuation.address())
108 : : .promise();
109 : : promise.set_io_waiting();
110 : :
111 : : // Schedule timer — on expiry, actor is re-woken via notify_ready
112 : : timer_id_ = scheduler_.schedule_timer(delay_ns_, [this] {
113 : : scheduler_.notify_ready(actor_id_, priority_, INT64_MAX);
114 : : });
115 : :
116 : : return true;
117 : : }
118 : :
119 : : void await_resume() noexcept {
120 : : // Timer fired; actor has been re-woken
121 : : }
122 : :
123 : : void await_cancel() noexcept {
124 : : scheduler_.cancel_timer(TimerHandle{timer_id_});
125 : : }
126 : :
127 : : private:
128 : : HybridScheduler& scheduler_;
129 : : ActorId actor_id_;
130 : : int64_t delay_ns_;
131 : : uint8_t priority_;
132 : : uint64_t timer_id_{0};
133 : : std::coroutine_handle<> continuation_;
134 : : };
135 : :
136 : : // BlockingMailboxAwaiter: for blocking receive with stackful coroutines
137 : : // T is the message type (e.g., Message<MessageVariant>)
138 : : template <typename T> class BlockingMailboxAwaiter {
139 : : public:
140 : : BlockingMailboxAwaiter(CoroutinePromise& promise,
141 : : mailbox::MPSCActorMailbox<T>* mailbox,
142 : : std::coroutine_handle<> continuation) noexcept
143 : : : promise_(promise), mailbox_(mailbox), continuation_(continuation) {}
144 : :
145 : : bool await_ready() const noexcept {
146 : : return !mailbox_->was_empty();
147 : : }
148 : :
149 : : bool await_suspend(std::coroutine_handle<> continuation) noexcept {
150 : : // Check emptiness at this moment — a message may have arrived since
151 : : // await_ready()
152 : : bool was_empty = mailbox_->was_empty();
153 : : if (!was_empty)
154 : : return false; // message arrived between await_ready() and here
155 : :
156 : : // Only reset edge-trigger if mailbox was empty at entry.
157 : : // If a message arrived while we were deciding, the sender already
158 : : // claimed the wakeup via CAS(true, false) on was_empty.
159 : : if (was_empty) {
160 : : mailbox_->set_was_empty(true);
161 : : }
162 : :
163 : : promise_.continuation = continuation;
164 : : promise_.set_idle();
165 : : return true;
166 : : }
167 : :
168 : : void await_resume() noexcept {
169 : : // Returns the message
170 : : }
171 : :
172 : : private:
173 : : CoroutinePromise& promise_;
174 : : mailbox::MPSCActorMailbox<T>* mailbox_;
175 : : std::coroutine_handle<> continuation_;
176 : : };
177 : :
178 : : } // namespace hpactor::sched
179 : :
180 : : #endif // HPACTOR_SUPPORT_COROUTINES
|