LCOV - code coverage report
Current view: top level - src/actor - event_based_actor.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 68.5 % 333 228
Test Date: 2026-05-20 02:24:49 Functions: 86.4 % 22 19
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/actor/actor_context.hpp>
      16                 :             : #include <hpactor/actor/event_based_actor.hpp>
      17                 :             : #include <hpactor/actor/lifecycle_actor.hpp>
      18                 :             : #include <hpactor/cli_messages.pb.h>
      19                 :             : #include <hpactor/core/actor_system.hpp>
      20                 :             : #include <hpactor/hpactor_config.hpp>
      21                 :             : #include <hpactor/log/log_field.hpp>
      22                 :             : #include <hpactor/log/logger.hpp>
      23                 :             : #include <hpactor/mailbox/dead_letter_queue.hpp>
      24                 :             : #include <hpactor/messages.pb.h>
      25                 :             : #include <hpactor/metrics/metrics_event.hpp>
      26                 :             : #include <hpactor/tracing/trace_manager.hpp>
      27                 :             : 
      28                 :             : #include <chrono>
      29                 :             : #include <cstring>
      30                 :             : #include <memory>
      31                 :             : 
      32                 :             : namespace hpactor {
      33                 :             : 
      34                 :             : namespace {
      35                 :             : 
      36                 :             : class ReceiveSpanGuard {
      37                 :             :   public:
      38                 :         350 :     ReceiveSpanGuard(tracing::TraceManager* manager, tracing::SpanHandle* handle) noexcept
      39                 :         350 :         : manager_(manager), handle_(handle) {}
      40                 :             : 
      41                 :         350 :     ~ReceiveSpanGuard() {
      42                 :         350 :         if (manager_ != nullptr && handle_ != nullptr) {
      43                 :           1 :             manager_->finish_span(*handle_, status_);
      44                 :             :         }
      45                 :         350 :     }
      46                 :             : 
      47                 :             :     void set_status(tracing::SpanStatus status) noexcept {
      48                 :             :         status_ = status;
      49                 :             :     }
      50                 :             : 
      51                 :             :   private:
      52                 :             :     tracing::TraceManager* manager_{nullptr};
      53                 :             :     tracing::SpanHandle* handle_{nullptr};
      54                 :             :     tracing::SpanStatus status_{tracing::SpanStatus::kOk};
      55                 :             : };
      56                 :             : 
      57                 :             : } // namespace
      58                 :             : 
      59                 :         118 : EventBasedActor::EventBasedActor(ActorContext* ctx, ActorSystem& sys)
      60                 :         118 :     : LocalActor(ctx, sys) {}
      61                 :             : 
      62                 :         114 : void EventBasedActor::on_activate() {}
      63                 :             : 
      64                 :         350 : void EventBasedActor::receive(TypedMessage& msg) {
      65                 :         350 :     auto* ctx = context();
      66                 :         350 :     auto* trace_manager = system().trace_manager();
      67                 :         350 :     tracing::SpanHandle receive_span;
      68                 :         350 :     std::unique_ptr<ActorContext::TraceScope> trace_scope;
      69                 :             : 
      70                 :         351 :     if (trace_manager != nullptr && trace_manager->enabled() &&
      71                 :           1 :         trace_manager->config().record_actor_receive_spans) {
      72                 :           1 :         tracing::SpanStart start;
      73                 :           1 :         start.name = "hpactor.actor.receive";
      74                 :           1 :         start.kind = tracing::SpanKind::kConsumer;
      75                 :           1 :         start.has_parent = msg.has_trace_context();
      76                 :           1 :         if (msg.has_trace_context()) {
      77                 :           1 :             start.parent = msg.trace_context();
      78                 :             :         }
      79                 :           1 :         start.actor_id = id();
      80                 :           1 :         start.sender_actor_id = msg.sender_address().id;
      81                 :           1 :         start.type_tag = msg.type_id();
      82                 :           1 :         start.payload_size = static_cast<uint32_t>(msg.payload().size());
      83                 :           1 :         receive_span = trace_manager->start_span(start);
      84                 :           1 :         if (ctx != nullptr && receive_span.context.valid()) {
      85                 :           2 :             trace_scope = std::make_unique<ActorContext::TraceScope>(
      86                 :           1 :                 ctx, receive_span.context);
      87                 :             :         }
      88                 :             :     }
      89                 :             : 
      90                 :         350 :     ReceiveSpanGuard span_guard(trace_manager, &receive_span);
      91                 :             :     // -- Drain gate: apply drain policy to every message during kDraining --
      92                 :         350 :     if (auto* lc = as_lifecycle()) {
      93                 :          16 :         if (lc->state() == LifecycleState::kDraining) {
      94                 :          16 :             if (!drain_one(msg)) {
      95                 :             :                 // Drain-completion check: if mailbox is now empty, finish drain
      96                 :           4 :                 if (mailbox_is_empty()) {
      97                 :           0 :                     cancel_drain_timer();
      98                 :           0 :                     lc->transition(LifecycleState::kStopping);
      99                 :           0 :                     lc->transition(LifecycleState::kStopped);
     100                 :           0 :                     on_exit();
     101                 :             :                 }
     102                 :           4 :                 return; // message was dead-lettered by the drain policy
     103                 :             :             }
     104                 :             :         }
     105                 :             :     }
     106                 :             :     // -- End drain gate --
     107                 :             : 
     108                 :             :     // -- System message interception (link / monitor / death) --
     109                 :             :     {
     110                 :         346 :         bool handled = false;
     111                 :         346 :         switch (msg.type_id()) {
     112                 :           4 :             case TypeTag::LinkMsg:
     113                 :           4 :                 handled = handle_link_msg(msg);
     114                 :           4 :                 break;
     115                 :           1 :             case TypeTag::UnlinkMsg:
     116                 :           1 :                 handled = handle_unlink_msg(msg);
     117                 :           1 :                 break;
     118                 :           2 :             case TypeTag::MonitorMsg:
     119                 :           2 :                 handled = handle_monitor_msg(msg);
     120                 :           2 :                 break;
     121                 :           1 :             case TypeTag::DemonitorMsg:
     122                 :           1 :                 handled = handle_demonitor_msg(msg);
     123                 :           1 :                 break;
     124                 :           2 :             case TypeTag::DownMsg:
     125                 :           2 :                 handle_down_msg(msg);
     126                 :           2 :                 break;
     127                 :         336 :             default:
     128                 :         336 :                 break;
     129                 :             :         }
     130                 :         346 :         if (handled)
     131                 :           8 :             return;
     132                 :             :     }
     133                 :             :     // -- End system message interception --
     134                 :             : 
     135                 :             :     // -- Lifecycle message gate --
     136                 :             :     // User messages (TypeTag >= 0x1000) are only accepted in ACTIVE state
     137                 :             :     // or while draining (the drain gate handles drain policy decisions).
     138                 :             :     // System messages (TypeTag < 0x1000) always pass through.
     139                 :         338 :     if (static_cast<uint32_t>(msg.type_id()) >= 0x1000) {
     140                 :         332 :         if (auto* lc = as_lifecycle()) {
     141                 :          16 :             if (!lc->accepts_user_msgs() &&
     142                 :           8 :                 lc->state() != LifecycleState::kDraining) {
     143                 :           0 :                 return;
     144                 :             :             }
     145                 :             :         }
     146                 :             :     }
     147                 :             :     // -- End lifecycle message gate --
     148                 :             : 
     149                 :         338 :     if (!handlers_initialized_) {
     150                 :          25 :         initialize_proto_handlers();
     151                 :             :     }
     152                 :             : 
     153                 :             :     // Capture sender for reply() tracking
     154                 :         338 :     ctx = context();
     155                 :         338 :     if (ctx != nullptr) {
     156                 :         338 :         ctx->set_current_sender(msg.sender_address());
     157                 :             :     }
     158                 :             : 
     159                 :             :     // -- CLI introspection dispatch --
     160                 :             :     {
     161                 :             :         // InspectStateRequest: gather metadata + optional
     162                 :             :         // state/mailbox/children
     163                 :         338 :         if (msg.type_id() == TypeTag::InspectStateRequestTag) {
     164                 :           0 :             cli::InspectStateRequest req;
     165                 :           0 :             if (!req.ParseFromArray(msg.payload().data(),
     166                 :           0 :                                     static_cast<int>(msg.payload().size()))) {
     167                 :           0 :                 return;
     168                 :             :             }
     169                 :             : 
     170                 :           0 :             cli::InspectStateReply reply;
     171                 :           0 :             auto meta = to_metadata();
     172                 :           0 :             auto* pb_meta = reply.mutable_metadata();
     173                 :           0 :             pb_meta->set_actor_id(meta.actor_id);
     174                 :             :             pb_meta->set_actor_type(meta.actor_type);
     175                 :             :             pb_meta->set_state(meta.state);
     176                 :           0 :             pb_meta->set_incarnation(meta.incarnation);
     177                 :             : 
     178                 :           0 :             if (req.include_mailbox()) {
     179                 :           0 :                 auto ms = mailbox_snapshot();
     180                 :           0 :                 auto* pb_mbox = reply.mutable_mailbox();
     181                 :           0 :                 pb_mbox->set_depth(ms.depth);
     182                 :           0 :             }
     183                 :             : 
     184                 :           0 :             if (req.include_state()) {
     185                 :           0 :                 auto blob = serialize_state();
     186                 :           0 :                 reply.set_state_blob(std::string(
     187                 :           0 :                     reinterpret_cast<const char*>(blob.data()), blob.size()));
     188                 :           0 :             }
     189                 :             : 
     190                 :           0 :             std::string reply_data = reply.SerializeAsString();
     191                 :           0 :             StreamBuffer payload(reply_data.begin(), reply_data.end());
     192                 :           0 :             ctx->reply(TypedMessage(TypeTag::InspectStateResponseTag,
     193                 :           0 :                                     std::move(payload)));
     194                 :           0 :             return;
     195                 :           0 :         }
     196                 :             : 
     197                 :             :         // KillRequest: terminate this actor
     198                 :         338 :         if (msg.type_id() == TypeTag::KillRequestTag) {
     199                 :           0 :             cli::KillRequest req;
     200                 :           0 :             if (!req.ParseFromArray(msg.payload().data(),
     201                 :           0 :                                     static_cast<int>(msg.payload().size()))) {
     202                 :           0 :                 return;
     203                 :             :             }
     204                 :             : 
     205                 :           0 :             cli::KillReply reply;
     206                 :           0 :             reply.set_success(true);
     207                 :           0 :             reply.set_error_code(0);
     208                 :             : 
     209                 :           0 :             std::string reply_data = reply.SerializeAsString();
     210                 :           0 :             StreamBuffer payload(reply_data.begin(), reply_data.end());
     211                 :           0 :             ctx->reply(TypedMessage(TypeTag::KillResponseTag, std::move(payload)));
     212                 :             : 
     213                 :             :             // Drive lifecycle state machine for graceful stop
     214                 :           0 :             if (auto* lc = as_lifecycle()) {
     215                 :           0 :                 lc->transition(LifecycleState::kStopping);
     216                 :           0 :                 lc->transition(LifecycleState::kStopped);
     217                 :             :             }
     218                 :           0 :             set_exit_reason(0);
     219                 :           0 :             return;
     220                 :           0 :         }
     221                 :             :     }
     222                 :             :     // -- End CLI dispatch --
     223                 :             : 
     224                 :         338 :     auto t0 = metrics_ring_buffer_ ? std::chrono::steady_clock::now()
     225                 :           1 :                                    : std::chrono::steady_clock::time_point{};
     226                 :             : 
     227                 :             :     // Try proto handler dispatch by TypeTag first
     228                 :         338 :     auto it = proto_handlers_.find(msg.type_id());
     229                 :         338 :     if (it != proto_handlers_.end()) {
     230                 :           0 :         auto deserialized = it->second.deserialize(msg.payload());
     231                 :           0 :         if (deserialized) {
     232                 :           0 :             StreamBuffer response = it->second.invoke(std::move(deserialized));
     233                 :           0 :             if (!response.empty() && ctx != nullptr) {
     234                 :           0 :                 TypedMessage reply_msg(it->first, response);
     235                 :           0 :                 ctx->reply(std::move(reply_msg));
     236                 :           0 :             }
     237                 :           0 :         }
     238                 :           0 :         if (metrics_ring_buffer_) [[unlikely]] {
     239                 :           0 :             auto t1 = std::chrono::steady_clock::now();
     240                 :             :             auto ns =
     241                 :           0 :                 std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0).count();
     242                 :           0 :             metrics::MetricEvent evt{};
     243                 :           0 :             evt.actor_id = id();
     244                 :           0 :             evt.event_type = metrics::MetricEventType::kMessageProcessed;
     245                 :           0 :             evt.value_hi = static_cast<uint32_t>(ns > UINT32_MAX ? UINT32_MAX : ns);
     246                 :           0 :             metrics_ring_buffer_->try_push(evt);
     247                 :             :         }
     248                 :           0 :         return;
     249                 :           0 :     }
     250                 :             : 
     251                 :             :     // Fall through to Behavior-based handling
     252                 :         338 :     if (behavior_) {
     253                 :         331 :         behavior_(msg);
     254                 :             :     }
     255                 :             : 
     256                 :         338 :     if (metrics_ring_buffer_) [[unlikely]] {
     257                 :         337 :         auto t1 = std::chrono::steady_clock::now();
     258                 :             :         auto ns =
     259                 :         337 :             std::chrono::duration_cast<std::chrono::nanoseconds>(t1 - t0).count();
     260                 :         337 :         metrics::MetricEvent evt{};
     261                 :         337 :         evt.actor_id = id();
     262                 :         337 :         evt.event_type = metrics::MetricEventType::kMessageProcessed;
     263                 :         337 :         evt.value_hi = static_cast<uint32_t>(ns > UINT32_MAX ? UINT32_MAX : ns);
     264                 :         337 :         metrics_ring_buffer_->try_push(evt);
     265                 :             :     }
     266                 :             : 
     267                 :             :     // -- Drain-completion check after processing a message --
     268                 :         338 :     if (auto* lc = as_lifecycle()) {
     269                 :          12 :         if (lc->state() == LifecycleState::kDraining && mailbox_is_empty()) {
     270                 :           6 :             cancel_drain_timer();
     271                 :           6 :             lc->transition(LifecycleState::kStopping);
     272                 :           6 :             lc->transition(LifecycleState::kStopped);
     273                 :           6 :             on_exit();
     274                 :             :         }
     275                 :             :     }
     276                 :             :     // -- End drain-completion check --
     277                 :         362 : }
     278                 :             : 
     279                 :           4 : bool EventBasedActor::handle_link_msg(const TypedMessage& msg) {
     280                 :           4 :     auto* ctx = context();
     281                 :           4 :     if (ctx == nullptr)
     282                 :           0 :         return true;
     283                 :           4 :     const auto& sender = msg.sender_address();
     284                 :           4 :     bool already_linked = false;
     285                 :           4 :     for (const auto& linked : ctx->linked_actors()) {
     286                 :           0 :         if (linked == sender) {
     287                 :           0 :             already_linked = true;
     288                 :           0 :             break;
     289                 :             :         }
     290                 :           4 :     }
     291                 :           4 :     if (!already_linked) {
     292                 :           4 :         ctx->add_linked(sender);
     293                 :             :     }
     294                 :           4 :     return true;
     295                 :             : }
     296                 :             : 
     297                 :           1 : bool EventBasedActor::handle_unlink_msg(const TypedMessage& msg) {
     298                 :           1 :     auto* ctx = context();
     299                 :           1 :     if (ctx != nullptr) {
     300                 :           1 :         ctx->remove_linked(msg.sender_address());
     301                 :             :     }
     302                 :           1 :     return true;
     303                 :             : }
     304                 :             : 
     305                 :           2 : bool EventBasedActor::handle_monitor_msg(const TypedMessage& msg) {
     306                 :           2 :     auto* ctx = context();
     307                 :           2 :     if (ctx == nullptr)
     308                 :           0 :         return true;
     309                 :           2 :     const auto& sender = msg.sender_address();
     310                 :           2 :     bool already = false;
     311                 :           2 :     for (const auto& m : ctx->monitored_actors()) {
     312                 :           0 :         if (m == sender) {
     313                 :           0 :             already = true;
     314                 :           0 :             break;
     315                 :             :         }
     316                 :             :     }
     317                 :           2 :     if (!already) {
     318                 :           2 :         ctx->add_monitored(sender);
     319                 :             :     }
     320                 :           2 :     return true;
     321                 :             : }
     322                 :             : 
     323                 :           1 : bool EventBasedActor::handle_demonitor_msg(const TypedMessage& msg) {
     324                 :           1 :     auto* ctx = context();
     325                 :           1 :     if (ctx != nullptr) {
     326                 :           1 :         ctx->remove_monitored(msg.sender_address());
     327                 :             :     }
     328                 :           1 :     return true;
     329                 :             : }
     330                 :             : 
     331                 :           2 : void EventBasedActor::handle_down_msg(const TypedMessage& msg) {
     332                 :           2 :     auto* ctx = context();
     333                 :           2 :     if (ctx != nullptr) {
     334                 :           2 :         ctx->remove_linked(msg.sender_address());
     335                 :           2 :         ctx->remove_monitored(msg.sender_address());
     336                 :             :     }
     337                 :           2 : }
     338                 :             : 
     339                 :          53 : void EventBasedActor::become(Behavior bh) {
     340                 :          53 :     behavior_ = std::move(bh);
     341                 :          53 : }
     342                 :             : 
     343                 :           0 : void EventBasedActor::become_empty() {
     344                 :           0 :     behavior_ = Behavior{};
     345                 :           0 : }
     346                 :             : 
     347                 :          25 : void EventBasedActor::initialize_proto_handlers() {
     348                 :          25 :     if (handlers_initialized_)
     349                 :           0 :         return;
     350                 :          25 :     register_handlers();
     351                 :          25 :     handlers_initialized_ = true;
     352                 :             : }
     353                 :             : 
     354                 :           0 : void EventBasedActor::on_proto_message(TypeTag tag, const StreamBuffer& payload) {
     355                 :           0 :     if (!handlers_initialized_) {
     356                 :           0 :         initialize_proto_handlers();
     357                 :             :     }
     358                 :             : 
     359                 :           0 :     auto it = proto_handlers_.find(tag);
     360                 :           0 :     if (it == proto_handlers_.end()) {
     361                 :           0 :         return;
     362                 :             :     }
     363                 :             : 
     364                 :           0 :     ProtoHandler& handler = it->second;
     365                 :           0 :     auto msg = handler.deserialize(payload);
     366                 :           0 :     if (!msg)
     367                 :           0 :         return;
     368                 :             : 
     369                 :           0 :     StreamBuffer response = handler.invoke(std::move(msg));
     370                 :           0 :     auto* ctx = context();
     371                 :           0 :     if (!response.empty() && ctx != nullptr) {
     372                 :           0 :         TypedMessage reply_msg(tag, response);
     373                 :           0 :         ctx->reply(std::move(reply_msg));
     374                 :           0 :     }
     375                 :           0 : }
     376                 :             : 
     377                 :           1 : void EventBasedActor::on_deactivate() {
     378                 :             : #if HPACTOR_SUPPORT_COROUTINES
     379                 :           1 :     if (actor_coroutine_ && !actor_coroutine_.done()) {
     380                 :           0 :         actor_coroutine_.task().handle().destroy();
     381                 :           0 :         actor_coroutine_ = sched::ActorCoroutine{};
     382                 :             :     }
     383                 :             : #endif
     384                 :           1 : }
     385                 :             : 
     386                 :          21 : void EventBasedActor::on_exit() {
     387                 :          21 :     auto* ctx = context();
     388                 :          21 :     if (ctx == nullptr) {
     389                 :           0 :         return;
     390                 :             :     }
     391                 :             : 
     392                 :          21 :     HPACTOR_LOG_INFO(log::LogCategory::kActor, id(),
     393                 :             :                      static_cast<uint32_t>(log::LogEventId::kActorTerminated),
     394                 :             :                      "actor terminated");
     395                 :             : 
     396                 :          21 :     if (metrics_ring_buffer_) [[unlikely]] {
     397                 :          21 :         metrics::MetricEvent evt{};
     398                 :          21 :         evt.actor_id = id();
     399                 :          21 :         evt.event_type = metrics::MetricEventType::kActorTerminated;
     400                 :          21 :         evt.value_hi = static_cast<uint32_t>(exit_reason_);
     401                 :          21 :         metrics_ring_buffer_->try_push(evt);
     402                 :             :     }
     403                 :             : 
     404                 :             :     // Build DownMessage
     405                 :          21 :     hpactor::DownMessage pb;
     406                 :          21 :     pb.set_actor_id(id().value());
     407                 :          21 :     pb.set_reason_code(exit_reason_);
     408                 :             : 
     409                 :          21 :     StreamBuffer payload(pb.ByteSizeLong());
     410                 :          21 :     (void)pb.SerializeToArray(payload.data(), static_cast<int>(payload.size()));
     411                 :             : 
     412                 :             :     // Send DownMsg to all linked actors
     413                 :          22 :     for (const auto& addr : ctx->linked_actors()) {
     414                 :           1 :         TypedMessage down_msg(TypeTag::DownMsg, StreamBuffer(payload));
     415                 :           1 :         ctx->send(addr, std::move(down_msg));
     416                 :          22 :     }
     417                 :             : 
     418                 :             :     // Send DownMsg to all monitored actors
     419                 :          23 :     for (const auto& addr : ctx->monitored_actors()) {
     420                 :           2 :         TypedMessage down_msg(TypeTag::DownMsg, StreamBuffer(payload));
     421                 :           2 :         ctx->send(addr, std::move(down_msg));
     422                 :           2 :     }
     423                 :             :     // linked_ and monitored_ vectors will be destroyed with the context
     424                 :          21 : }
     425                 :             : 
     426                 :           0 : cli::MboxSnapshot EventBasedActor::mailbox_snapshot() const {
     427                 :           0 :     if (mailbox_)
     428                 :           0 :         return mailbox_->snapshot();
     429                 :           0 :     return {};
     430                 :             : }
     431                 :             : 
     432                 :             : // ── Drain helpers
     433                 :             : // ─────────────────────────────────────────────────────────────
     434                 :             : 
     435                 :          16 : bool EventBasedActor::drain_one(TypedMessage& msg) {
     436                 :          16 :     auto* lc = as_lifecycle();
     437                 :          16 :     if (!lc)
     438                 :           0 :         return true; // no lifecycle = process normally
     439                 :             : 
     440                 :          16 :     auto policy = lc->drain_config().policy;
     441                 :          16 :     bool is_system = static_cast<uint32_t>(msg.type_id()) < 0x1000;
     442                 :             : 
     443                 :          16 :     switch (policy) {
     444                 :           8 :         case DrainPolicy::Drain:
     445                 :           8 :             return true; // process normally
     446                 :             : 
     447                 :           7 :         case DrainPolicy::DropUserMessages:
     448                 :           7 :             if (!is_system) {
     449                 :           4 :                 mailbox::DeadLetterRecord record;
     450                 :           4 :                 record.reason = mailbox::DeadLetterReason::DrainPolicyDrop;
     451                 :           4 :                 record.source = mailbox::DeadLetterSource::MailboxAdmission;
     452                 :           4 :                 record.sender = msg.sender_address();
     453                 :           4 :                 record.target = address();
     454                 :           4 :                 record.type_tag = msg.type_id();
     455                 :           4 :                 auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
     456                 :           4 :                               system().clock().now().time_since_epoch())
     457                 :           4 :                               .count();
     458                 :           4 :                 record.timestamp_ns = static_cast<uint64_t>(ns);
     459                 :           4 :                 system().dead_letter(std::move(record));
     460                 :           4 :                 return false;
     461                 :           4 :             }
     462                 :           3 :             return true;
     463                 :             : 
     464                 :           0 :         case DrainPolicy::ImmediateStop:
     465                 :           0 :             return false; // handled at drain trigger, not per-message
     466                 :             : 
     467                 :           1 :         case DrainPolicy::SnapshotAndStop:
     468                 :             :         case DrainPolicy::TransferShard: {
     469                 :             :             // Deferred — fall back to Drain (log warning)
     470                 :           1 :             HPACTOR_LOG_WARNING(
     471                 :             :                 log::LogCategory::kActor, id(),
     472                 :             :                 static_cast<uint32_t>(log::LogEventId::kActorTerminated),
     473                 :             :                 "deferred drain policy — falling back to Drain");
     474                 :           1 :             lc->set_drain_config(
     475                 :           1 :                 DrainConfig{DrainPolicy::Drain, lc->drain_config().timeout});
     476                 :           1 :             return true;
     477                 :             :         }
     478                 :             :     }
     479                 :           0 :     return true;
     480                 :             : }
     481                 :             : 
     482                 :          11 : void EventBasedActor::drain_all_immediate() {
     483                 :          11 :     TypedMessage msg;
     484                 :          39 :     while (mailbox_ && mailbox_->try_pop(msg)) {
     485                 :          28 :         mailbox::DeadLetterRecord record;
     486                 :          28 :         record.reason = mailbox::DeadLetterReason::MailboxClosed;
     487                 :          28 :         record.source = mailbox::DeadLetterSource::MailboxAdmission;
     488                 :          28 :         record.sender = msg.sender_address();
     489                 :          28 :         record.target = address();
     490                 :          28 :         record.type_tag = msg.type_id();
     491                 :          28 :         auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
     492                 :          28 :                       system().clock().now().time_since_epoch())
     493                 :          28 :                       .count();
     494                 :          28 :         record.timestamp_ns = static_cast<uint64_t>(ns);
     495                 :          28 :         system().dead_letter(std::move(record));
     496                 :          28 :     }
     497                 :          11 : }
     498                 :             : 
     499                 :           7 : void EventBasedActor::start_drain_timer() {
     500                 :           7 :     auto* lc = as_lifecycle();
     501                 :           7 :     if (!lc || !scheduler_)
     502                 :           0 :         return;
     503                 :             : 
     504                 :           7 :     auto timeout = lc->drain_config().timeout;
     505                 :             :     auto delay_ns =
     506                 :           7 :         std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count();
     507                 :           7 :     std::weak_ptr<AbstractActor> weak_self = shared_from_this();
     508                 :           7 :     drain_timer_handle_ = scheduler_->schedule_after(
     509                 :          14 :         [weak_self]() {
     510                 :           1 :             if (auto self = weak_self.lock()) {
     511                 :           1 :                 auto* actor_ptr = static_cast<EventBasedActor*>(self.get());
     512                 :           1 :                 if (auto* lc2 = actor_ptr->as_lifecycle()) {
     513                 :           1 :                     if (lc2->state() == LifecycleState::kDraining) {
     514                 :           1 :                         lc2->on_drain_timeout();
     515                 :             :                         // Dead-letter remaining mailbox messages
     516                 :           1 :                         actor_ptr->drain_all_immediate();
     517                 :           1 :                         lc2->transition(LifecycleState::kStopping);
     518                 :           1 :                         lc2->transition(LifecycleState::kStopped);
     519                 :           1 :                         actor_ptr->on_exit();
     520                 :             :                     }
     521                 :             :                 }
     522                 :           1 :             }
     523                 :           1 :         },
     524                 :             :         delay_ns);
     525                 :           7 : }
     526                 :             : 
     527                 :           6 : void EventBasedActor::cancel_drain_timer() {
     528                 :           6 :     if (drain_timer_handle_.valid() && scheduler_) {
     529                 :           1 :         scheduler_->cancel_timer(drain_timer_handle_);
     530                 :           1 :         drain_timer_handle_ = sched::TimerHandle{};
     531                 :             :     }
     532                 :           6 : }
     533                 :             : 
     534                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1