LCOV - code coverage report
Current view: top level - include/hpactor/actor - event_based_actor.hpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 58.8 % 68 40
Test Date: 2026-05-20 02:24:49 Functions: 66.7 % 21 14
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #pragma once
      16                 :             : 
      17                 :             : #include <hpactor/actor/actor_state.hpp>
      18                 :             : #include <hpactor/actor/drain_config.hpp>
      19                 :             : #include <hpactor/actor/local_actor.hpp>
      20                 :             : #include <hpactor/actor/typed_message.hpp>
      21                 :             : #include <hpactor/behavior.hpp>
      22                 :             : #include <hpactor/core/actor_system.hpp>
      23                 :             : #include <hpactor/hpactor_config.hpp>
      24                 :             : #include <hpactor/mailbox/mpsc_actor_mailbox.hpp>
      25                 :             : #include <hpactor/metrics/metrics_ring_buffer.hpp>
      26                 :             : #include <hpactor/sched/scheduler.hpp>
      27                 :             : 
      28                 :             : #include <hpactor/mem/std_allocator.hpp>
      29                 :             : 
      30                 :             : #include <functional>
      31                 :             : #include <memory>
      32                 :             : #include <string>
      33                 :             : #include <unordered_map>
      34                 :             : 
      35                 :             : #if HPACTOR_SUPPORT_COROUTINES
      36                 :             : #    include <hpactor/sched/actor_coroutine.hpp>
      37                 :             : #    include <hpactor/sched/coroutine_awaiters.hpp>
      38                 :             : #    include <hpactor/sched/coroutine_task.hpp>
      39                 :             : #endif
      40                 :             : 
      41                 :             : namespace hpactor {
      42                 :             : 
      43                 :             : // Forward declarations
      44                 :             : class ProtoTypeRegistry;
      45                 :             : namespace log {
      46                 :             : class Logger;
      47                 :             : } // namespace log
      48                 :             : 
      49                 :             : // Internal handler storage — type-erased to avoid template bloat in the map
      50                 :             : struct ProtoHandler {
      51                 :             :     std::string type_name;
      52                 :             : 
      53                 :           0 :     ProtoHandler() = default;
      54                 :             :     ProtoHandler(ProtoHandler&&) = default;
      55                 :           0 :     ProtoHandler& operator=(ProtoHandler&&) = default;
      56                 :             :     ProtoHandler(const ProtoHandler&) = delete;
      57                 :             :     ProtoHandler& operator=(const ProtoHandler&) = delete;
      58                 :             : 
      59                 :             :     // Deserialize bytes into a shared_ptr<void> holding the concrete protobuf
      60                 :             :     // type
      61                 :             :     std::function<std::shared_ptr<void>(const StreamBuffer&)> deserialize;
      62                 :             : 
      63                 :             :     // Invoke the handler with a deserialized message.
      64                 :             :     // Returns serialized response bytes (empty for fire-and-forget).
      65                 :             :     std::function<StreamBuffer(std::shared_ptr<void>)> invoke;
      66                 :             : };
      67                 :             : 
      68                 :             : // -----------------------------------------------------------------------------
      69                 :             : // EventBasedActor - cooperatively scheduled actor with behavior-based
      70                 :             : // handling, proto handler dispatch, and optional coroutine support (C++20)
      71                 :             : // -----------------------------------------------------------------------------
      72                 :             : class EventBasedActor : public LocalActor {
      73                 :             :   public:
      74                 :             :     void become(Behavior bh);
      75                 :             :     void become_empty();
      76                 :             : 
      77                 :             :     void receive(TypedMessage& msg) override;
      78                 :             : 
      79                 :             :     // Type query for safe downcasting without RTTI
      80                 :         526 :     bool is_event_based_actor() const override {
      81                 :         526 :         return true;
      82                 :             :     }
      83                 :             : 
      84                 :             :     // Proto handler registration (absorbed from ProtoActor)
      85                 :             :     // Users override register_handlers() and call these in the override.
      86                 :             : 
      87                 :             :     // Register a fire-and-forget handler for a protobuf message type
      88                 :             :     template <typename ProtoMsgT>
      89                 :             :     void on(std::function<void(const ProtoMsgT&)> handler) {
      90                 :             :         TypeTag tag = type_tag_for<ProtoMsgT>();
      91                 :             :         auto handler_ptr =
      92                 :             :             mem::allocate_shared<std::function<void(const ProtoMsgT&)>>(
      93                 :             :                 id_ptr(), mem::RegionType::kActor, std::move(handler));
      94                 :             : 
      95                 :             :         ProtoHandler entry;
      96                 :             :         entry.type_name = ProtoMsgT().GetTypeName();
      97                 :             :         entry.deserialize = [](const StreamBuffer& data) -> std::shared_ptr<void> {
      98                 :             :             auto msg = mem::allocate_shared<ProtoMsgT>(mem::current_actor_id(),
      99                 :             :                                                        mem::RegionType::kMessage);
     100                 :             :             if (!msg->ParseFromArray(data.data(), static_cast<int>(data.size()))) {
     101                 :             :                 return nullptr;
     102                 :             :             }
     103                 :             :             return msg;
     104                 :             :         };
     105                 :             :         entry.invoke = [handler_ptr](std::shared_ptr<void> raw) -> StreamBuffer {
     106                 :             :             auto& msg = *static_cast<ProtoMsgT*>(raw.get());
     107                 :             :             (*handler_ptr)(msg);
     108                 :             :             return {};
     109                 :             :         };
     110                 :             : 
     111                 :             :         proto_handlers_[tag] = std::move(entry);
     112                 :             :     }
     113                 :             : 
     114                 :             :     // Register a request-response handler for protobuf types
     115                 :             :     template <typename ReqT, typename ResT>
     116                 :           0 :     void on_request(std::function<ResT(const ReqT&)> handler) {
     117                 :           0 :         TypeTag tag = type_tag_for<ReqT>();
     118                 :           0 :         auto handler_ptr = mem::allocate_shared<std::function<ResT(const ReqT&)>>(
     119                 :           0 :             id_ptr(), mem::RegionType::kActor, std::move(handler));
     120                 :             : 
     121                 :           0 :         ProtoHandler entry;
     122                 :           0 :         entry.type_name = ReqT().GetTypeName();
     123                 :           0 :         entry.deserialize = [](const StreamBuffer& data) -> std::shared_ptr<void> {
     124                 :           0 :             auto msg = mem::allocate_shared<ReqT>(mem::current_actor_id(),
     125                 :             :                                                   mem::RegionType::kMessage);
     126                 :           0 :             if (!msg->ParseFromArray(data.data(), static_cast<int>(data.size()))) {
     127                 :           0 :                 return nullptr;
     128                 :             :             }
     129                 :           0 :             return msg;
     130                 :           0 :         };
     131                 :           0 :         entry.invoke = [handler_ptr](std::shared_ptr<void> raw) -> StreamBuffer {
     132                 :           0 :             auto& req = *static_cast<ReqT*>(raw.get());
     133                 :           0 :             ResT res = (*handler_ptr)(req);
     134                 :           0 :             StreamBuffer result(res.ByteSizeLong());
     135                 :           0 :             (void)res.SerializeToArray(result.data(),
     136                 :           0 :                                        static_cast<int>(result.size()));
     137                 :           0 :             return result;
     138                 :           0 :         };
     139                 :             : 
     140                 :           0 :         proto_handlers_[tag] = std::move(entry);
     141                 :           0 :     }
     142                 :             : 
     143                 :             :     // Dispatch an incoming protobuf message by TypeTag
     144                 :             :     void on_proto_message(TypeTag tag, const StreamBuffer& payload);
     145                 :             : 
     146                 :             :     // Check if this actor can handle a given TypeTag
     147                 :             :     [[nodiscard]] bool handles(TypeTag tag) const {
     148                 :             :         return proto_handlers_.find(tag) != proto_handlers_.end();
     149                 :             :     }
     150                 :             : 
     151                 :             : #if HPACTOR_SUPPORT_COROUTINES
     152                 :             :     // Coroutine support (C++20 only)
     153                 :           0 :     virtual sched::CoroutineTask act() {
     154                 :             :         co_return;
     155                 :           0 :     }
     156                 :             : 
     157                 :          11 :     sched::ActorCoroutine& get_actor_coroutine() {
     158                 :          11 :         return actor_coroutine_;
     159                 :             :     }
     160                 :             :     const sched::ActorCoroutine& get_actor_coroutine() const {
     161                 :             :         return actor_coroutine_;
     162                 :             :     }
     163                 :             :     void set_actor_coroutine(sched::ActorCoroutine&& coroutine) {
     164                 :             :         actor_coroutine_ = std::move(coroutine);
     165                 :             :     }
     166                 :             : 
     167                 :             :     std::coroutine_handle<sched::CoroutinePromise> get_coro_handle() {
     168                 :             :         return coro_handle_;
     169                 :             :     }
     170                 :             :     void set_coro_handle(std::coroutine_handle<sched::CoroutinePromise> h) {
     171                 :             :         coro_handle_ = h;
     172                 :             :     }
     173                 :             : 
     174                 :          17 :     sched::MailboxAwaiter<TypedMessage> make_mailbox_awaiter() {
     175                 :          17 :         return sched::MailboxAwaiter<TypedMessage>{coro_handle_.promise(), mailbox_};
     176                 :             :     }
     177                 :             : 
     178                 :          11 :     void ensure_coroutine_started() {
     179                 :          11 :         if (!actor_coroutine_) {
     180                 :           9 :             auto task = act();
     181                 :           9 :             if (task) {
     182                 :           9 :                 coro_handle_ = task.handle();
     183                 :           9 :                 coro_handle_.promise().actor_state = &actor_state_;
     184                 :           9 :                 actor_coroutine_ = sched::ActorCoroutine{std::move(task), id()};
     185                 :             : 
     186                 :           9 :                 if (mailbox_) {
     187                 :             :                     // Continuation callback disabled: direct resume races with
     188                 :             :                     // the scheduler's execute_actor state machine, causing
     189                 :             :                     // await_suspend CAS(kRunning→kIdle) to fail and the
     190                 :             :                     // coroutine to busy-loop. Wakeups go through notify_ready()
     191                 :             :                     // which transitions state correctly.
     192                 :             :                     //
     193                 :             :                     // auto* coro_ptr = &actor_coroutine_;
     194                 :             :                     // mailbox_->set_continuation_callback([coro_ptr]() {
     195                 :             :                     //     if (!coro_ptr->done()) {
     196                 :             :                     //         coro_ptr->promise().notify_mailbox_nonempty();
     197                 :             :                     //     }
     198                 :             :                     // });
     199                 :             :                 }
     200                 :             :             }
     201                 :           9 :         }
     202                 :          11 :     }
     203                 :             : 
     204                 :             : #else  // !HPACTOR_SUPPORT_COROUTINES
     205                 :             :     void ensure_coroutine_started() {}
     206                 :             : #endif // HPACTOR_SUPPORT_COROUTINES
     207                 :             : 
     208                 :             :     sched::IScheduler* get_scheduler() {
     209                 :             :         return scheduler_;
     210                 :             :     }
     211                 :             : 
     212                 :          51 :     mailbox::MPSCActorMailbox<TypedMessage>* get_mailbox() {
     213                 :          51 :         return mailbox_;
     214                 :             :     }
     215                 :             : 
     216                 :         492 :     ActorState& actor_state() {
     217                 :         492 :         return actor_state_;
     218                 :             :     }
     219                 :             :     const ActorState& actor_state() const {
     220                 :             :         return actor_state_;
     221                 :             :     }
     222                 :             : 
     223                 :             :     bool mailbox_has_messages() const {
     224                 :             :         return mailbox_ && !mailbox_->empty();
     225                 :             :     }
     226                 :          18 :     bool mailbox_is_empty() const {
     227                 :          18 :         return !mailbox_ || mailbox_->empty();
     228                 :             :     }
     229                 :             : 
     230                 :             :     // Drain helpers
     231                 :             :     // Returns true if the message should be processed normally.
     232                 :             :     // Returns false if the message was dead-lettered by the drain policy.
     233                 :             :     bool drain_one(TypedMessage& msg);
     234                 :             : 
     235                 :             :     // Dead-letter all messages currently in the mailbox (ImmediateStop).
     236                 :             :     void drain_all_immediate();
     237                 :             : 
     238                 :             :     // System message handlers invoked from receive().
     239                 :             :     bool handle_link_msg(const TypedMessage& msg);
     240                 :             :     bool handle_unlink_msg(const TypedMessage& msg);
     241                 :             :     bool handle_monitor_msg(const TypedMessage& msg);
     242                 :             :     bool handle_demonitor_msg(const TypedMessage& msg);
     243                 :             :     void handle_down_msg(const TypedMessage& msg);
     244                 :             : 
     245                 :             :     // Drain timer management (stubs — implemented in Task 7).
     246                 :             :     void start_drain_timer();
     247                 :             :     void cancel_drain_timer();
     248                 :             : 
     249                 :         116 :     void set_scheduler(sched::IScheduler* scheduler) override {
     250                 :         116 :         scheduler_ = scheduler;
     251                 :         116 :     }
     252                 :         114 :     void set_mailbox(mailbox::MPSCActorMailbox<TypedMessage>* mailbox) override {
     253                 :         114 :         mailbox_ = mailbox;
     254                 :         114 :     }
     255                 :             : 
     256                 :         104 :     void set_metrics_ring_buffer(void* buf) noexcept override {
     257                 :         104 :         metrics_ring_buffer_ =
     258                 :             :             static_cast<metrics::MpscRingBuffer<metrics::MetricEvent>*>(buf);
     259                 :         104 :     }
     260                 :             : 
     261                 :         104 :     void set_logger(void* logger) noexcept override {
     262                 :         104 :         logger_ = static_cast<log::Logger*>(logger);
     263                 :         104 :     }
     264                 :             : 
     265                 :             :     cli::MboxSnapshot mailbox_snapshot() const override;
     266                 :             : 
     267                 :             :   protected:
     268                 :             :     metrics::MpscRingBuffer<metrics::MetricEvent>* metrics_ring_buffer_{nullptr};
     269                 :             :     log::Logger* logger_{nullptr};
     270                 :             : 
     271                 :           5 :     virtual Behavior make_behavior() {
     272                 :           5 :         return {};
     273                 :             :     }
     274                 :             : 
     275                 :             :     // Users override this to call on<T>() / on_request<ReqT,ResT>()
     276                 :          25 :     virtual void register_handlers() {}
     277                 :             : 
     278                 :             :     // Called by the framework after construction to set up handlers
     279                 :             :     void initialize_proto_handlers();
     280                 :             : 
     281                 :             :   public:
     282                 :             :     void on_activate() override;
     283                 :             :     void on_deactivate() override;
     284                 :             : 
     285                 :             :     // Get TypeTag for a protobuf type from MessageTraits (compile-time
     286                 :             :     // dispatch)
     287                 :           0 :     template <typename ProtoMsgT> TypeTag type_tag_for() const {
     288                 :           0 :         return MessageTraits<ProtoMsgT>::tag();
     289                 :             :     }
     290                 :             : 
     291                 :             :   public:
     292                 :             :     virtual void on_exit();
     293                 :             : 
     294                 :           4 :     void set_exit_reason(uint32_t code) {
     295                 :           4 :         exit_reason_ = code;
     296                 :           4 :     }
     297                 :             :     uint32_t exit_reason() const {
     298                 :             :         return exit_reason_;
     299                 :             :     }
     300                 :             : 
     301                 :             :     EventBasedActor(ActorContext* ctx, ActorSystem& sys);
     302                 :             : 
     303                 :             :   private:
     304                 :             : #if HPACTOR_SUPPORT_COROUTINES
     305                 :             :     sched::ActorCoroutine actor_coroutine_;
     306                 :             :     std::coroutine_handle<sched::CoroutinePromise> coro_handle_;
     307                 :             : #endif
     308                 :             :     Behavior behavior_;
     309                 :             :     ActorState actor_state_;
     310                 :             :     uint32_t exit_reason_ = 0;
     311                 :             :     mailbox::MPSCActorMailbox<TypedMessage>* mailbox_ = nullptr;
     312                 :             :     sched::IScheduler* scheduler_ = nullptr;
     313                 :             :     sched::TimerHandle drain_timer_handle_{};
     314                 :             : 
     315                 :             :     bool handlers_initialized_ = false;
     316                 :             : 
     317                 :             :     using ProtoHandlerMap =
     318                 :             :         std::unordered_map<TypeTag, ProtoHandler, std::hash<TypeTag>, std::equal_to<>,
     319                 :             :                            mem::MemStdAllocator<std::pair<const TypeTag, ProtoHandler>>>;
     320                 :             :     ProtoHandlerMap proto_handlers_{
     321                 :             :         mem::MemStdAllocator<std::pair<const TypeTag, ProtoHandler>>(
     322                 :             :             id_ptr(), mem::RegionType::kActor)};
     323                 :             : };
     324                 :             : 
     325                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1