Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/actor/actor_context.hpp>
16 : : #include <hpactor/actor/event_based_actor.hpp>
17 : : #include <hpactor/actor/lifecycle_actor.hpp>
18 : : #include <hpactor/cli_messages.pb.h>
19 : : #include <hpactor/core/actor_system.hpp>
20 : : #include <hpactor/hpactor_config.hpp>
21 : : #include <hpactor/log/log_field.hpp>
22 : : #include <hpactor/log/logger.hpp>
23 : : #include <hpactor/mailbox/dead_letter_queue.hpp>
24 : : #include <hpactor/messages.pb.h>
25 : : #include <hpactor/metrics/metrics_event.hpp>
26 : : #include <hpactor/tracing/trace_manager.hpp>
27 : :
28 : : #include <chrono>
29 : : #include <cstring>
30 : : #include <memory>
31 : :
32 : : namespace hpactor {
33 : :
34 : : namespace {
35 : :
36 : : class ReceiveSpanGuard {
37 : : public:
38 : 350 : ReceiveSpanGuard(tracing::TraceManager* manager, tracing::SpanHandle* handle) noexcept
39 : 350 : : manager_(manager), handle_(handle) {}
40 : :
41 : 350 : ~ReceiveSpanGuard() {
42 : 350 : if (manager_ != nullptr && handle_ != nullptr) {
43 : 1 : manager_->finish_span(*handle_, status_);
44 : : }
45 : 350 : }
46 : :
47 : : void set_status(tracing::SpanStatus status) noexcept {
48 : : status_ = status;
49 : : }
50 : :
51 : : private:
52 : : tracing::TraceManager* manager_{nullptr};
53 : : tracing::SpanHandle* handle_{nullptr};
54 : : tracing::SpanStatus status_{tracing::SpanStatus::kOk};
55 : : };
56 : :
57 : : } // namespace
58 : :
59 : 118 : EventBasedActor::EventBasedActor(ActorContext* ctx, ActorSystem& sys)
60 : 118 : : LocalActor(ctx, sys) {}
61 : :
62 : 114 : void EventBasedActor::on_activate() {}
63 : :
64 : 350 : void EventBasedActor::receive(TypedMessage& msg) {
65 : 350 : auto* ctx = context();
66 : 350 : auto* trace_manager = system().trace_manager();
67 : 350 : tracing::SpanHandle receive_span;
68 : 350 : std::unique_ptr<ActorContext::TraceScope> trace_scope;
69 : :
70 : 351 : if (trace_manager != nullptr && trace_manager->enabled() &&
71 : 1 : trace_manager->config().record_actor_receive_spans) {
72 : 1 : tracing::SpanStart start;
73 : 1 : start.name = "hpactor.actor.receive";
74 : 1 : start.kind = tracing::SpanKind::kConsumer;
75 : 1 : start.has_parent = msg.has_trace_context();
76 : 1 : if (msg.has_trace_context()) {
77 : 1 : start.parent = msg.trace_context();
78 : : }
79 : 1 : start.actor_id = id();
80 : 1 : start.sender_actor_id = msg.sender_address().id;
81 : 1 : start.type_tag = msg.type_id();
82 : 1 : start.payload_size = static_cast<uint32_t>(msg.payload().size());
83 : 1 : receive_span = trace_manager->start_span(start);
84 : 1 : if (ctx != nullptr && receive_span.context.valid()) {
85 : 2 : trace_scope = std::make_unique<ActorContext::TraceScope>(
86 : 1 : ctx, receive_span.context);
87 : : }
88 : : }
89 : :
90 : 350 : ReceiveSpanGuard span_guard(trace_manager, &receive_span);
91 : : // -- Drain gate: apply drain policy to every message during kDraining --
92 : 350 : if (auto* lc = as_lifecycle()) {
93 : 16 : if (lc->state() == LifecycleState::kDraining) {
94 : 16 : if (!drain_one(msg)) {
95 : : // Drain-completion check: if mailbox is now empty, finish drain
96 : 4 : if (mailbox_is_empty()) {
97 : 0 : cancel_drain_timer();
98 : 0 : lc->transition(LifecycleState::kStopping);
99 : 0 : lc->transition(LifecycleState::kStopped);
100 : 0 : on_exit();
101 : : }
102 : 4 : return; // message was dead-lettered by the drain policy
103 : : }
104 : : }
105 : : }
106 : : // -- End drain gate --
107 : :
108 : : // -- System message interception (link / monitor / death) --
109 : : {
110 : 346 : bool handled = false;
111 : 346 : switch (msg.type_id()) {
112 : 4 : case TypeTag::LinkMsg:
113 : 4 : handled = handle_link_msg(msg);
114 : 4 : break;
115 : 1 : case TypeTag::UnlinkMsg:
116 : 1 : handled = handle_unlink_msg(msg);
117 : 1 : break;
118 : 2 : case TypeTag::MonitorMsg:
119 : 2 : handled = handle_monitor_msg(msg);
120 : 2 : break;
121 : 1 : case TypeTag::DemonitorMsg:
122 : 1 : handled = handle_demonitor_msg(msg);
123 : 1 : break;
124 : 2 : case TypeTag::DownMsg:
125 : 2 : handle_down_msg(msg);
126 : 2 : break;
127 : 336 : default:
128 : 336 : break;
129 : : }
130 : 346 : if (handled)
131 : 8 : return;
132 : : }
133 : : // -- End system message interception --
134 : :
135 : : // -- Lifecycle message gate --
136 : : // User messages (TypeTag >= 0x1000) are only accepted in ACTIVE state
137 : : // or while draining (the drain gate handles drain policy decisions).
138 : : // System messages (TypeTag < 0x1000) always pass through.
139 : 338 : if (static_cast<uint32_t>(msg.type_id()) >= 0x1000) {
140 : 332 : if (auto* lc = as_lifecycle()) {
141 : 16 : if (!lc->accepts_user_msgs() &&
142 : 8 : lc->state() != LifecycleState::kDraining) {
143 : 0 : return;
144 : : }
145 : : }
146 : : }
147 : : // -- End lifecycle message gate --
148 : :
149 : 338 : if (!handlers_initialized_) {
150 : 25 : initialize_proto_handlers();
151 : : }
152 : :
153 : : // Capture sender for reply() tracking
154 : 338 : ctx = context();
155 : 338 : if (ctx != nullptr) {
156 : 338 : ctx->set_current_sender(msg.sender_address());
157 : : }
158 : :
159 : : // -- CLI introspection dispatch --
160 : : {
161 : : // InspectStateRequest: gather metadata + optional
162 : : // state/mailbox/children
163 : 338 : if (msg.type_id() == TypeTag::InspectStateRequestTag) {
164 : 0 : cli::InspectStateRequest req;
165 : 0 : if (!req.ParseFromArray(msg.payload().data(),
166 : 0 : static_cast<int>(msg.payload().size()))) {
167 : 0 : return;
168 : : }
169 : :
170 : 0 : cli::InspectStateReply reply;
171 : 0 : auto meta = to_metadata();
172 : 0 : auto* pb_meta = reply.mutable_metadata();
173 : 0 : pb_meta->set_actor_id(meta.actor_id);
174 : : pb_meta->set_actor_type(meta.actor_type);
175 : : pb_meta->set_state(meta.state);
176 : 0 : pb_meta->set_incarnation(meta.incarnation);
177 : :
178 : 0 : if (req.include_mailbox()) {
179 : 0 : auto ms = mailbox_snapshot();
180 : 0 : auto* pb_mbox = reply.mutable_mailbox();
181 : 0 : pb_mbox->set_depth(ms.depth);
182 : 0 : }
183 : :
184 : 0 : if (req.include_state()) {
185 : 0 : auto blob = serialize_state();
186 : 0 : reply.set_state_blob(std::string(
187 : 0 : reinterpret_cast<const char*>(blob.data()), blob.size()));
188 : 0 : }
189 : :
190 : 0 : std::string reply_data = reply.SerializeAsString();
191 : 0 : StreamBuffer payload(reply_data.begin(), reply_data.end());
192 : 0 : ctx->reply(TypedMessage(TypeTag::InspectStateResponseTag,
193 : 0 : std::move(payload)));
194 : 0 : return;
195 : 0 : }
196 : :
197 : : // KillRequest: terminate this actor
198 : 338 : if (msg.type_id() == TypeTag::KillRequestTag) {
199 : 0 : cli::KillRequest req;
200 : 0 : if (!req.ParseFromArray(msg.payload().data(),
201 : 0 : static_cast<int>(msg.payload().size()))) {
202 : 0 : return;
203 : : }
204 : :
205 : 0 : cli::KillReply reply;
206 : 0 : reply.set_success(true);
207 : 0 : reply.set_error_code(0);
208 : :
209 : 0 : std::string reply_data = reply.SerializeAsString();
210 : 0 : StreamBuffer payload(reply_data.begin(), reply_data.end());
211 : 0 : ctx->reply(TypedMessage(TypeTag::KillResponseTag, std::move(payload)));
212 : :
213 : : // Drive lifecycle state machine for graceful stop
214 : 0 : if (auto* lc = as_lifecycle()) {
215 : 0 : lc->transition(LifecycleState::kStopping);
216 : 0 : lc->transition(LifecycleState::kStopped);
217 : : }
218 : 0 : set_exit_reason(0);
219 : 0 : return;
220 : 0 : }
221 : : }
222 : : // -- End CLI dispatch --
223 : :
224 : 338 : auto t0 = metrics_ring_buffer_ ? std::chrono::steady_clock::now()
225 : 1 : : std::chrono::steady_clock::time_point{};
226 : :
227 : : // Try proto handler dispatch by TypeTag first
228 : 338 : auto it = proto_handlers_.find(msg.type_id());
229 : 338 : if (it != proto_handlers_.end()) {
230 : 0 : auto deserialized = it->second.deserialize(msg.payload());
231 : 0 : if (deserialized) {
232 : 0 : StreamBuffer response = it->second.invoke(std::move(deserialized));
233 : 0 : if (!response.empty() && ctx != nullptr) {
234 : 0 : TypedMessage reply_msg(it->first, response);
235 : 0 : ctx->reply(std::move(reply_msg));
236 : 0 : }
237 : 0 : }
238 : 0 : if (metrics_ring_buffer_) [[unlikely]] {
239 : 0 : auto t1 = std::chrono::steady_clock::now();
240 : : auto ns =
241 : 0 : std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0).count();
242 : 0 : metrics::MetricEvent evt{};
243 : 0 : evt.actor_id = id();
244 : 0 : evt.event_type = metrics::MetricEventType::kMessageProcessed;
245 : 0 : evt.value_hi = static_cast<uint32_t>(ns > UINT32_MAX ? UINT32_MAX : ns);
246 : 0 : metrics_ring_buffer_->try_push(evt);
247 : : }
248 : 0 : return;
249 : 0 : }
250 : :
251 : : // Fall through to Behavior-based handling
252 : 338 : if (behavior_) {
253 : 331 : behavior_(msg);
254 : : }
255 : :
256 : 338 : if (metrics_ring_buffer_) [[unlikely]] {
257 : 337 : auto t1 = std::chrono::steady_clock::now();
258 : : auto ns =
259 : 337 : std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0).count();
260 : 337 : metrics::MetricEvent evt{};
261 : 337 : evt.actor_id = id();
262 : 337 : evt.event_type = metrics::MetricEventType::kMessageProcessed;
263 : 337 : evt.value_hi = static_cast<uint32_t>(ns > UINT32_MAX ? UINT32_MAX : ns);
264 : 337 : metrics_ring_buffer_->try_push(evt);
265 : : }
266 : :
267 : : // -- Drain-completion check after processing a message --
268 : 338 : if (auto* lc = as_lifecycle()) {
269 : 12 : if (lc->state() == LifecycleState::kDraining && mailbox_is_empty()) {
270 : 6 : cancel_drain_timer();
271 : 6 : lc->transition(LifecycleState::kStopping);
272 : 6 : lc->transition(LifecycleState::kStopped);
273 : 6 : on_exit();
274 : : }
275 : : }
276 : : // -- End drain-completion check --
277 : 362 : }
278 : :
279 : 4 : bool EventBasedActor::handle_link_msg(const TypedMessage& msg) {
280 : 4 : auto* ctx = context();
281 : 4 : if (ctx == nullptr)
282 : 0 : return true;
283 : 4 : const auto& sender = msg.sender_address();
284 : 4 : bool already_linked = false;
285 : 4 : for (const auto& linked : ctx->linked_actors()) {
286 : 0 : if (linked == sender) {
287 : 0 : already_linked = true;
288 : 0 : break;
289 : : }
290 : 4 : }
291 : 4 : if (!already_linked) {
292 : 4 : ctx->add_linked(sender);
293 : : }
294 : 4 : return true;
295 : : }
296 : :
297 : 1 : bool EventBasedActor::handle_unlink_msg(const TypedMessage& msg) {
298 : 1 : auto* ctx = context();
299 : 1 : if (ctx != nullptr) {
300 : 1 : ctx->remove_linked(msg.sender_address());
301 : : }
302 : 1 : return true;
303 : : }
304 : :
305 : 2 : bool EventBasedActor::handle_monitor_msg(const TypedMessage& msg) {
306 : 2 : auto* ctx = context();
307 : 2 : if (ctx == nullptr)
308 : 0 : return true;
309 : 2 : const auto& sender = msg.sender_address();
310 : 2 : bool already = false;
311 : 2 : for (const auto& m : ctx->monitored_actors()) {
312 : 0 : if (m == sender) {
313 : 0 : already = true;
314 : 0 : break;
315 : : }
316 : : }
317 : 2 : if (!already) {
318 : 2 : ctx->add_monitored(sender);
319 : : }
320 : 2 : return true;
321 : : }
322 : :
323 : 1 : bool EventBasedActor::handle_demonitor_msg(const TypedMessage& msg) {
324 : 1 : auto* ctx = context();
325 : 1 : if (ctx != nullptr) {
326 : 1 : ctx->remove_monitored(msg.sender_address());
327 : : }
328 : 1 : return true;
329 : : }
330 : :
331 : 2 : void EventBasedActor::handle_down_msg(const TypedMessage& msg) {
332 : 2 : auto* ctx = context();
333 : 2 : if (ctx != nullptr) {
334 : 2 : ctx->remove_linked(msg.sender_address());
335 : 2 : ctx->remove_monitored(msg.sender_address());
336 : : }
337 : 2 : }
338 : :
339 : 53 : void EventBasedActor::become(Behavior bh) {
340 : 53 : behavior_ = std::move(bh);
341 : 53 : }
342 : :
343 : 0 : void EventBasedActor::become_empty() {
344 : 0 : behavior_ = Behavior{};
345 : 0 : }
346 : :
347 : 25 : void EventBasedActor::initialize_proto_handlers() {
348 : 25 : if (handlers_initialized_)
349 : 0 : return;
350 : 25 : register_handlers();
351 : 25 : handlers_initialized_ = true;
352 : : }
353 : :
354 : 0 : void EventBasedActor::on_proto_message(TypeTag tag, const StreamBuffer& payload) {
355 : 0 : if (!handlers_initialized_) {
356 : 0 : initialize_proto_handlers();
357 : : }
358 : :
359 : 0 : auto it = proto_handlers_.find(tag);
360 : 0 : if (it == proto_handlers_.end()) {
361 : 0 : return;
362 : : }
363 : :
364 : 0 : ProtoHandler& handler = it->second;
365 : 0 : auto msg = handler.deserialize(payload);
366 : 0 : if (!msg)
367 : 0 : return;
368 : :
369 : 0 : StreamBuffer response = handler.invoke(std::move(msg));
370 : 0 : auto* ctx = context();
371 : 0 : if (!response.empty() && ctx != nullptr) {
372 : 0 : TypedMessage reply_msg(tag, response);
373 : 0 : ctx->reply(std::move(reply_msg));
374 : 0 : }
375 : 0 : }
376 : :
377 : 1 : void EventBasedActor::on_deactivate() {
378 : : #if HPACTOR_SUPPORT_COROUTINES
379 : 1 : if (actor_coroutine_ && !actor_coroutine_.done()) {
380 : 0 : actor_coroutine_.task().handle().destroy();
381 : 0 : actor_coroutine_ = sched::ActorCoroutine{};
382 : : }
383 : : #endif
384 : 1 : }
385 : :
386 : 21 : void EventBasedActor::on_exit() {
387 : 21 : auto* ctx = context();
388 : 21 : if (ctx == nullptr) {
389 : 0 : return;
390 : : }
391 : :
392 : 21 : HPACTOR_LOG_INFO(log::LogCategory::kActor, id(),
393 : : static_cast<uint32_t>(log::LogEventId::kActorTerminated),
394 : : "actor terminated");
395 : :
396 : 21 : if (metrics_ring_buffer_) [[unlikely]] {
397 : 21 : metrics::MetricEvent evt{};
398 : 21 : evt.actor_id = id();
399 : 21 : evt.event_type = metrics::MetricEventType::kActorTerminated;
400 : 21 : evt.value_hi = static_cast<uint32_t>(exit_reason_);
401 : 21 : metrics_ring_buffer_->try_push(evt);
402 : : }
403 : :
404 : : // Build DownMessage
405 : 21 : hpactor::DownMessage pb;
406 : 21 : pb.set_actor_id(id().value());
407 : 21 : pb.set_reason_code(exit_reason_);
408 : :
409 : 21 : StreamBuffer payload(pb.ByteSizeLong());
410 : 21 : (void)pb.SerializeToArray(payload.data(), static_cast<int>(payload.size()));
411 : :
412 : : // Send DownMsg to all linked actors
413 : 22 : for (const auto& addr : ctx->linked_actors()) {
414 : 1 : TypedMessage down_msg(TypeTag::DownMsg, StreamBuffer(payload));
415 : 1 : ctx->send(addr, std::move(down_msg));
416 : 22 : }
417 : :
418 : : // Send DownMsg to all monitored actors
419 : 23 : for (const auto& addr : ctx->monitored_actors()) {
420 : 2 : TypedMessage down_msg(TypeTag::DownMsg, StreamBuffer(payload));
421 : 2 : ctx->send(addr, std::move(down_msg));
422 : 2 : }
423 : : // linked_ and monitored_ vectors will be destroyed with the context
424 : 21 : }
425 : :
426 : 0 : cli::MboxSnapshot EventBasedActor::mailbox_snapshot() const {
427 : 0 : if (mailbox_)
428 : 0 : return mailbox_->snapshot();
429 : 0 : return {};
430 : : }
431 : :
432 : : // ── Drain helpers
433 : : // ─────────────────────────────────────────────────────────────
434 : :
435 : 16 : bool EventBasedActor::drain_one(TypedMessage& msg) {
436 : 16 : auto* lc = as_lifecycle();
437 : 16 : if (!lc)
438 : 0 : return true; // no lifecycle = process normally
439 : :
440 : 16 : auto policy = lc->drain_config().policy;
441 : 16 : bool is_system = static_cast<uint32_t>(msg.type_id()) < 0x1000;
442 : :
443 : 16 : switch (policy) {
444 : 8 : case DrainPolicy::Drain:
445 : 8 : return true; // process normally
446 : :
447 : 7 : case DrainPolicy::DropUserMessages:
448 : 7 : if (!is_system) {
449 : 4 : mailbox::DeadLetterRecord record;
450 : 4 : record.reason = mailbox::DeadLetterReason::DrainPolicyDrop;
451 : 4 : record.source = mailbox::DeadLetterSource::MailboxAdmission;
452 : 4 : record.sender = msg.sender_address();
453 : 4 : record.target = address();
454 : 4 : record.type_tag = msg.type_id();
455 : 4 : auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
456 : 4 : system().clock().now().time_since_epoch())
457 : 4 : .count();
458 : 4 : record.timestamp_ns = static_cast<uint64_t>(ns);
459 : 4 : system().dead_letter(std::move(record));
460 : 4 : return false;
461 : 4 : }
462 : 3 : return true;
463 : :
464 : 0 : case DrainPolicy::ImmediateStop:
465 : 0 : return false; // handled at drain trigger, not per-message
466 : :
467 : 1 : case DrainPolicy::SnapshotAndStop:
468 : : case DrainPolicy::TransferShard: {
469 : : // Deferred — fall back to Drain (log warning)
470 : 1 : HPACTOR_LOG_WARNING(
471 : : log::LogCategory::kActor, id(),
472 : : static_cast<uint32_t>(log::LogEventId::kActorTerminated),
473 : : "deferred drain policy — falling back to Drain");
474 : 1 : lc->set_drain_config(
475 : 1 : DrainConfig{DrainPolicy::Drain, lc->drain_config().timeout});
476 : 1 : return true;
477 : : }
478 : : }
479 : 0 : return true;
480 : : }
481 : :
482 : 11 : void EventBasedActor::drain_all_immediate() {
483 : 11 : TypedMessage msg;
484 : 39 : while (mailbox_ && mailbox_->try_pop(msg)) {
485 : 28 : mailbox::DeadLetterRecord record;
486 : 28 : record.reason = mailbox::DeadLetterReason::MailboxClosed;
487 : 28 : record.source = mailbox::DeadLetterSource::MailboxAdmission;
488 : 28 : record.sender = msg.sender_address();
489 : 28 : record.target = address();
490 : 28 : record.type_tag = msg.type_id();
491 : 28 : auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
492 : 28 : system().clock().now().time_since_epoch())
493 : 28 : .count();
494 : 28 : record.timestamp_ns = static_cast<uint64_t>(ns);
495 : 28 : system().dead_letter(std::move(record));
496 : 28 : }
497 : 11 : }
498 : :
499 : 7 : void EventBasedActor::start_drain_timer() {
500 : 7 : auto* lc = as_lifecycle();
501 : 7 : if (!lc || !scheduler_)
502 : 0 : return;
503 : :
504 : 7 : auto timeout = lc->drain_config().timeout;
505 : : auto delay_ns =
506 : 7 : std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count();
507 : 7 : std::weak_ptr<AbstractActor> weak_self = shared_from_this();
508 : 7 : drain_timer_handle_ = scheduler_->schedule_after(
509 : 14 : [weak_self]() {
510 : 1 : if (auto self = weak_self.lock()) {
511 : 1 : auto* actor_ptr = static_cast<EventBasedActor*>(self.get());
512 : 1 : if (auto* lc2 = actor_ptr->as_lifecycle()) {
513 : 1 : if (lc2->state() == LifecycleState::kDraining) {
514 : 1 : lc2->on_drain_timeout();
515 : : // Dead-letter remaining mailbox messages
516 : 1 : actor_ptr->drain_all_immediate();
517 : 1 : lc2->transition(LifecycleState::kStopping);
518 : 1 : lc2->transition(LifecycleState::kStopped);
519 : 1 : actor_ptr->on_exit();
520 : : }
521 : : }
522 : 1 : }
523 : 1 : },
524 : : delay_ns);
525 : 7 : }
526 : :
527 : 6 : void EventBasedActor::cancel_drain_timer() {
528 : 6 : if (drain_timer_handle_.valid() && scheduler_) {
529 : 1 : scheduler_->cancel_timer(drain_timer_handle_);
530 : 1 : drain_timer_handle_ = sched::TimerHandle{};
531 : : }
532 : 6 : }
533 : :
534 : : } // namespace hpactor
|