Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/actor/actor_state.hpp>
18 : : #include <hpactor/actor/drain_config.hpp>
19 : : #include <hpactor/actor/local_actor.hpp>
20 : : #include <hpactor/actor/typed_message.hpp>
21 : : #include <hpactor/behavior.hpp>
22 : : #include <hpactor/core/actor_system.hpp>
23 : : #include <hpactor/hpactor_config.hpp>
24 : : #include <hpactor/mailbox/mpsc_actor_mailbox.hpp>
25 : : #include <hpactor/metrics/metrics_ring_buffer.hpp>
26 : : #include <hpactor/sched/scheduler.hpp>
27 : :
28 : : #include <hpactor/mem/std_allocator.hpp>
29 : :
30 : : #include <functional>
31 : : #include <memory>
32 : : #include <string>
33 : : #include <unordered_map>
34 : :
35 : : #if HPACTOR_SUPPORT_COROUTINES
36 : : # include <hpactor/sched/actor_coroutine.hpp>
37 : : # include <hpactor/sched/coroutine_awaiters.hpp>
38 : : # include <hpactor/sched/coroutine_task.hpp>
39 : : #endif
40 : :
41 : : namespace hpactor {
42 : :
43 : : // Forward declarations
44 : : class ProtoTypeRegistry;
45 : : namespace log {
46 : : class Logger;
47 : : } // namespace log
48 : :
49 : : // Internal handler storage — type-erased to avoid template bloat in the map
50 : : struct ProtoHandler {
51 : : std::string type_name;
52 : :
53 : 0 : ProtoHandler() = default;
54 : : ProtoHandler(ProtoHandler&&) = default;
55 : 0 : ProtoHandler& operator=(ProtoHandler&&) = default;
56 : : ProtoHandler(const ProtoHandler&) = delete;
57 : : ProtoHandler& operator=(const ProtoHandler&) = delete;
58 : :
59 : : // Deserialize bytes into a shared_ptr<void> holding the concrete protobuf
60 : : // type
61 : : std::function<std::shared_ptr<void>(const StreamBuffer&)> deserialize;
62 : :
63 : : // Invoke the handler with a deserialized message.
64 : : // Returns serialized response bytes (empty for fire-and-forget).
65 : : std::function<StreamBuffer(std::shared_ptr<void>)> invoke;
66 : : };
67 : :
68 : : // -----------------------------------------------------------------------------
69 : : // EventBasedActor - cooperatively scheduled actor with behavior-based
70 : : // handling, proto handler dispatch, and optional coroutine support (C++20)
71 : : // -----------------------------------------------------------------------------
72 : : class EventBasedActor : public LocalActor {
73 : : public:
74 : : void become(Behavior bh);
75 : : void become_empty();
76 : :
77 : : void receive(TypedMessage& msg) override;
78 : :
79 : : // Type query for safe downcasting without RTTI
80 : 526 : bool is_event_based_actor() const override {
81 : 526 : return true;
82 : : }
83 : :
84 : : // Proto handler registration (absorbed from ProtoActor)
85 : : // Users override register_handlers() and call these in the override.
86 : :
87 : : // Register a fire-and-forget handler for a protobuf message type
88 : : template <typename ProtoMsgT>
89 : : void on(std::function<void(const ProtoMsgT&)> handler) {
90 : : TypeTag tag = type_tag_for<ProtoMsgT>();
91 : : auto handler_ptr =
92 : : mem::allocate_shared<std::function<void(const ProtoMsgT&)>>(
93 : : id_ptr(), mem::RegionType::kActor, std::move(handler));
94 : :
95 : : ProtoHandler entry;
96 : : entry.type_name = ProtoMsgT().GetTypeName();
97 : : entry.deserialize = [](const StreamBuffer& data) -> std::shared_ptr<void> {
98 : : auto msg = mem::allocate_shared<ProtoMsgT>(mem::current_actor_id(),
99 : : mem::RegionType::kMessage);
100 : : if (!msg->ParseFromArray(data.data(), static_cast<int>(data.size()))) {
101 : : return nullptr;
102 : : }
103 : : return msg;
104 : : };
105 : : entry.invoke = [handler_ptr](std::shared_ptr<void> raw) -> StreamBuffer {
106 : : auto& msg = *static_cast<ProtoMsgT*>(raw.get());
107 : : (*handler_ptr)(msg);
108 : : return {};
109 : : };
110 : :
111 : : proto_handlers_[tag] = std::move(entry);
112 : : }
113 : :
114 : : // Register a request-response handler for protobuf types
115 : : template <typename ReqT, typename ResT>
116 : 0 : void on_request(std::function<ResT(const ReqT&)> handler) {
117 : 0 : TypeTag tag = type_tag_for<ReqT>();
118 : 0 : auto handler_ptr = mem::allocate_shared<std::function<ResT(const ReqT&)>>(
119 : 0 : id_ptr(), mem::RegionType::kActor, std::move(handler));
120 : :
121 : 0 : ProtoHandler entry;
122 : 0 : entry.type_name = ReqT().GetTypeName();
123 : 0 : entry.deserialize = [](const StreamBuffer& data) -> std::shared_ptr<void> {
124 : 0 : auto msg = mem::allocate_shared<ReqT>(mem::current_actor_id(),
125 : : mem::RegionType::kMessage);
126 : 0 : if (!msg->ParseFromArray(data.data(), static_cast<int>(data.size()))) {
127 : 0 : return nullptr;
128 : : }
129 : 0 : return msg;
130 : 0 : };
131 : 0 : entry.invoke = [handler_ptr](std::shared_ptr<void> raw) -> StreamBuffer {
132 : 0 : auto& req = *static_cast<ReqT*>(raw.get());
133 : 0 : ResT res = (*handler_ptr)(req);
134 : 0 : StreamBuffer result(res.ByteSizeLong());
135 : 0 : (void)res.SerializeToArray(result.data(),
136 : 0 : static_cast<int>(result.size()));
137 : 0 : return result;
138 : 0 : };
139 : :
140 : 0 : proto_handlers_[tag] = std::move(entry);
141 : 0 : }
142 : :
143 : : // Dispatch an incoming protobuf message by TypeTag
144 : : void on_proto_message(TypeTag tag, const StreamBuffer& payload);
145 : :
146 : : // Check if this actor can handle a given TypeTag
147 : : [[nodiscard]] bool handles(TypeTag tag) const {
148 : : return proto_handlers_.find(tag) != proto_handlers_.end();
149 : : }
150 : :
151 : : #if HPACTOR_SUPPORT_COROUTINES
152 : : // Coroutine support (C++20 only)
153 : 0 : virtual sched::CoroutineTask act() {
154 : : co_return;
155 : 0 : }
156 : :
157 : 11 : sched::ActorCoroutine& get_actor_coroutine() {
158 : 11 : return actor_coroutine_;
159 : : }
160 : : const sched::ActorCoroutine& get_actor_coroutine() const {
161 : : return actor_coroutine_;
162 : : }
163 : : void set_actor_coroutine(sched::ActorCoroutine&& coroutine) {
164 : : actor_coroutine_ = std::move(coroutine);
165 : : }
166 : :
167 : : std::coroutine_handle<sched::CoroutinePromise> get_coro_handle() {
168 : : return coro_handle_;
169 : : }
170 : : void set_coro_handle(std::coroutine_handle<sched::CoroutinePromise> h) {
171 : : coro_handle_ = h;
172 : : }
173 : :
174 : 17 : sched::MailboxAwaiter<TypedMessage> make_mailbox_awaiter() {
175 : 17 : return sched::MailboxAwaiter<TypedMessage>{coro_handle_.promise(), mailbox_};
176 : : }
177 : :
178 : 11 : void ensure_coroutine_started() {
179 : 11 : if (!actor_coroutine_) {
180 : 9 : auto task = act();
181 : 9 : if (task) {
182 : 9 : coro_handle_ = task.handle();
183 : 9 : coro_handle_.promise().actor_state = &actor_state_;
184 : 9 : actor_coroutine_ = sched::ActorCoroutine{std::move(task), id()};
185 : :
186 : 9 : if (mailbox_) {
187 : : // Continuation callback disabled: direct resume races with
188 : : // the scheduler's execute_actor state machine, causing
189 : : // await_suspend CAS(kRunning→kIdle) to fail and the
190 : : // coroutine to busy-loop. Wakeups go through notify_ready()
191 : : // which transitions state correctly.
192 : : //
193 : : // auto* coro_ptr = &actor_coroutine_;
194 : : // mailbox_->set_continuation_callback([coro_ptr]() {
195 : : // if (!coro_ptr->done()) {
196 : : // coro_ptr->promise().notify_mailbox_nonempty();
197 : : // }
198 : : // });
199 : : }
200 : : }
201 : 9 : }
202 : 11 : }
203 : :
204 : : #else // !HPACTOR_SUPPORT_COROUTINES
205 : : void ensure_coroutine_started() {}
206 : : #endif // HPACTOR_SUPPORT_COROUTINES
207 : :
208 : : sched::IScheduler* get_scheduler() {
209 : : return scheduler_;
210 : : }
211 : :
212 : 51 : mailbox::MPSCActorMailbox<TypedMessage>* get_mailbox() {
213 : 51 : return mailbox_;
214 : : }
215 : :
216 : 492 : ActorState& actor_state() {
217 : 492 : return actor_state_;
218 : : }
219 : : const ActorState& actor_state() const {
220 : : return actor_state_;
221 : : }
222 : :
223 : : bool mailbox_has_messages() const {
224 : : return mailbox_ && !mailbox_->empty();
225 : : }
226 : 18 : bool mailbox_is_empty() const {
227 : 18 : return !mailbox_ || mailbox_->empty();
228 : : }
229 : :
230 : : // Drain helpers
231 : : // Returns true if the message should be processed normally.
232 : : // Returns false if the message was dead-lettered by the drain policy.
233 : : bool drain_one(TypedMessage& msg);
234 : :
235 : : // Dead-letter all messages currently in the mailbox (ImmediateStop).
236 : : void drain_all_immediate();
237 : :
238 : : // System message handlers invoked from receive().
239 : : bool handle_link_msg(const TypedMessage& msg);
240 : : bool handle_unlink_msg(const TypedMessage& msg);
241 : : bool handle_monitor_msg(const TypedMessage& msg);
242 : : bool handle_demonitor_msg(const TypedMessage& msg);
243 : : void handle_down_msg(const TypedMessage& msg);
244 : :
245 : : // Drain timer management (stubs — implemented in Task 7).
246 : : void start_drain_timer();
247 : : void cancel_drain_timer();
248 : :
249 : 116 : void set_scheduler(sched::IScheduler* scheduler) override {
250 : 116 : scheduler_ = scheduler;
251 : 116 : }
252 : 114 : void set_mailbox(mailbox::MPSCActorMailbox<TypedMessage>* mailbox) override {
253 : 114 : mailbox_ = mailbox;
254 : 114 : }
255 : :
256 : 104 : void set_metrics_ring_buffer(void* buf) noexcept override {
257 : 104 : metrics_ring_buffer_ =
258 : : static_cast<metrics::MpscRingBuffer<metrics::MetricEvent>*>(buf);
259 : 104 : }
260 : :
261 : 104 : void set_logger(void* logger) noexcept override {
262 : 104 : logger_ = static_cast<log::Logger*>(logger);
263 : 104 : }
264 : :
265 : : cli::MboxSnapshot mailbox_snapshot() const override;
266 : :
267 : : protected:
268 : : metrics::MpscRingBuffer<metrics::MetricEvent>* metrics_ring_buffer_{nullptr};
269 : : log::Logger* logger_{nullptr};
270 : :
271 : 5 : virtual Behavior make_behavior() {
272 : 5 : return {};
273 : : }
274 : :
275 : : // Users override this to call on<T>() / on_request<ReqT,ResT>()
276 : 25 : virtual void register_handlers() {}
277 : :
278 : : // Called by the framework after construction to set up handlers
279 : : void initialize_proto_handlers();
280 : :
281 : : public:
282 : : void on_activate() override;
283 : : void on_deactivate() override;
284 : :
285 : : // Get TypeTag for a protobuf type from MessageTraits (compile-time
286 : : // dispatch)
287 : 0 : template <typename ProtoMsgT> TypeTag type_tag_for() const {
288 : 0 : return MessageTraits<ProtoMsgT>::tag();
289 : : }
290 : :
291 : : public:
292 : : virtual void on_exit();
293 : :
294 : 4 : void set_exit_reason(uint32_t code) {
295 : 4 : exit_reason_ = code;
296 : 4 : }
297 : : uint32_t exit_reason() const {
298 : : return exit_reason_;
299 : : }
300 : :
301 : : EventBasedActor(ActorContext* ctx, ActorSystem& sys);
302 : :
303 : : private:
304 : : #if HPACTOR_SUPPORT_COROUTINES
305 : : sched::ActorCoroutine actor_coroutine_;
306 : : std::coroutine_handle<sched::CoroutinePromise> coro_handle_;
307 : : #endif
308 : : Behavior behavior_;
309 : : ActorState actor_state_;
310 : : uint32_t exit_reason_ = 0;
311 : : mailbox::MPSCActorMailbox<TypedMessage>* mailbox_ = nullptr;
312 : : sched::IScheduler* scheduler_ = nullptr;
313 : : sched::TimerHandle drain_timer_handle_{};
314 : :
315 : : bool handlers_initialized_ = false;
316 : :
317 : : using ProtoHandlerMap =
318 : : std::unordered_map<TypeTag, ProtoHandler, std::hash<TypeTag>, std::equal_to<>,
319 : : mem::MemStdAllocator<std::pair<const TypeTag, ProtoHandler>>>;
320 : : ProtoHandlerMap proto_handlers_{
321 : : mem::MemStdAllocator<std::pair<const TypeTag, ProtoHandler>>(
322 : : id_ptr(), mem::RegionType::kActor)};
323 : : };
324 : :
325 : : } // namespace hpactor
|