LCOV - code coverage report
Current view: top level - src/actor - actor_context.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 78.8 % 236 186
Test Date: 2026-05-20 02:24:49 Functions: 70.6 % 34 24
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/actor/event_based_actor.hpp>
      16                 :             : #include <hpactor/actor_context.hpp>
      17                 :             : #include <hpactor/core/actor_system.hpp>
      18                 :             : #include <hpactor/hpactor_config.hpp>
      19                 :             : #include <hpactor/metrics/metrics_event.hpp>
      20                 :             : #include <hpactor/ref/actor_proxy.hpp>
      21                 :             : #include <hpactor/tracing/trace_manager.hpp>
      22                 :             : 
      23                 :             : #include <google/protobuf/message.h>
      24                 :             : 
      25                 :             : #include <chrono>
      26                 :             : #include <thread>
      27                 :             : 
      28                 :             : namespace hpactor {
      29                 :             : 
      30                 :         152 : ActorContext::ActorContext(Actor owner, ActorSystem* system)
      31                 :         152 :     : owner_(std::move(owner)), system_(system) {}
      32                 :             : 
      33                 :         152 : ActorContext::~ActorContext() = default;
      34                 :             : 
      35                 :           2 : ActorContext::TraceScope::TraceScope(ActorContext* ctx, const TraceContext& next) noexcept
      36                 :           2 :     : ctx_(ctx) {
      37                 :           2 :     if (ctx_ == nullptr) {
      38                 :           0 :         return;
      39                 :             :     }
      40                 :           2 :     previous_ = ctx_->current_trace_context_;
      41                 :           2 :     previous_valid_ = ctx_->has_current_trace_context_;
      42                 :           2 :     ctx_->set_current_trace_context(next);
      43                 :             : }
      44                 :             : 
      45                 :           4 : ActorContext::TraceScope::~TraceScope() {
      46                 :           2 :     if (ctx_ == nullptr) {
      47                 :           0 :         return;
      48                 :             :     }
      49                 :           2 :     if (previous_valid_) {
      50                 :           0 :         ctx_->set_current_trace_context(previous_);
      51                 :             :     } else {
      52                 :           2 :         ctx_->clear_current_trace_context();
      53                 :             :     }
      54                 :           2 : }
      55                 :             : 
      56                 :         332 : ActorRef ActorContext::resolve(const ActorAddress& target) {
      57                 :             :     // 1. Check cache (hot path)
      58                 :         332 :     auto cached = ref_cache_.get(target.id);
      59                 :         332 :     if (cached.has_value()) {
      60                 :         301 :         return *cached;
      61                 :             :     }
      62                 :             : 
      63                 :             :     // 2. Resolve system pointer
      64                 :             :     // NOLINTNEXTLINE(readability-avoid-nested-conditional-operator)
      65                 :          31 :     auto* system = system_ != nullptr
      66                 :          31 :                        ? system_
      67                 :          31 :                        : (owner_ ? &owner_.get()->system() : nullptr);
      68                 :          31 :     if (system == nullptr) {
      69                 :           0 :         return ActorRef{};
      70                 :             :     }
      71                 :             : 
      72                 :             :     // 3. If target endpoint differs from our own, it's a remote actor.
      73                 :             :     //    Skip local lookup — actor IDs are only unique within a system.
      74                 :          31 :     if (!(target.endpoint == system->endpoint())) {
      75                 :           1 :         ActorProxy proxy(target, system);
      76                 :           1 :         ActorRef ref(proxy);
      77                 :             :         // Only cache if transport was resolved successfully
      78                 :           1 :         if (ref.get_proxy() != nullptr && ref.get_proxy()->transport() != nullptr) {
      79                 :           0 :             ref_cache_.put(target.id, ref);
      80                 :             :         }
      81                 :           1 :         return ref;
      82                 :           1 :     }
      83                 :             : 
      84                 :             :     // 4. Local endpoint: check local actors
      85                 :          30 :     auto actor = system->get_actor(target.id);
      86                 :          30 :     if (actor != nullptr) {
      87                 :          29 :         ActorRef ref{Actor(actor)};
      88                 :          29 :         ref_cache_.put(target.id, ref);
      89                 :          29 :         return ref;
      90                 :          29 :     }
      91                 :             : 
      92                 :           1 :     return ActorRef{};
      93                 :         332 : }
      94                 :             : 
      95                 :           4 : void ActorContext::send(ActorRef& target, TypedMessage msg) {
      96                 :             :     // Stamp sender identity for reply tracking
      97                 :           4 :     if (owner_) {
      98                 :           4 :         msg.set_sender_address(owner_.address());
      99                 :             :     }
     100                 :             : 
     101                 :           4 :     auto* system = system_ != nullptr
     102                 :           4 :                        ? system_
     103                 :           4 :                        : (owner_ ? &owner_.get()->system() : nullptr);
     104                 :           4 :     if (system != nullptr && system->trace_manager() != nullptr) {
     105                 :           0 :         system->trace_manager()->inject_message_context(
     106                 :             :             msg, this,
     107                 :           0 :             system->trace_manager()->config().create_roots_for_actor_context_sends);
     108                 :             :     }
     109                 :             : 
     110                 :           4 :     target.send(target.address(), std::move(msg));
     111                 :           4 : }
     112                 :             : 
     113                 :         318 : void ActorContext::send(const ActorAddress& target, TypedMessage msg) {
     114                 :         318 :     (void)try_send(target, std::move(msg));
     115                 :         318 : }
     116                 :             : 
     117                 :           0 : void ActorContext::send(const ActorAddress& target, TypeTag tag,
     118                 :             :                         const google::protobuf::Message& proto_msg) {
     119                 :           0 :     TypedMessage msg(tag, proto_msg);
     120                 :           0 :     send(target, std::move(msg));
     121                 :           0 : }
     122                 :             : 
     123                 :           0 : void ActorContext::send_with_priority(const ActorAddress& target, TypedMessage msg,
     124                 :             :                                       uint8_t priority, int64_t deadline_ns) {
     125                 :           0 :     (void)try_send_with_priority(target, std::move(msg), priority, deadline_ns);
     126                 :           0 : }
     127                 :             : 
     128                 :             : mailbox::EnqueueResult
     129                 :         329 : ActorContext::try_send(const ActorAddress& target, TypedMessage msg,
     130                 :             :                        mailbox::DeliveryOptions options) {
     131                 :         329 :     auto ref = resolve(target);
     132                 :         329 :     if (!ref) {
     133                 :           1 :         return {mailbox::EnqueueResultCode::ActorNotFound, target.id};
     134                 :             :     }
     135                 :             : 
     136                 :         328 :     if (owner_) {
     137                 :         328 :         msg.set_sender_address(owner_.address());
     138                 :             :     }
     139                 :             : 
     140                 :         328 :     auto* system = owner_ ? &owner_.get()->system() : system_;
     141                 :         328 :     if (system != nullptr && system->trace_manager() != nullptr) {
     142                 :           1 :         system->trace_manager()->inject_message_context(
     143                 :             :             msg, this,
     144                 :           1 :             system->trace_manager()->config().create_roots_for_actor_context_sends);
     145                 :             :     }
     146                 :             : 
     147                 :         328 :     return ref.try_send(ref.address(), std::move(msg), options);
     148                 :         329 : }
     149                 :             : 
     150                 :             : mailbox::EnqueueResult
     151                 :           1 : ActorContext::try_send_with_priority(const ActorAddress& target, TypedMessage msg,
     152                 :             :                                      uint8_t priority, int64_t deadline_ns,
     153                 :             :                                      mailbox::DeliveryOptions options) {
     154                 :           1 :     auto ref = resolve(target);
     155                 :           1 :     if (!ref) {
     156                 :           0 :         return {mailbox::EnqueueResultCode::ActorNotFound, target.id};
     157                 :             :     }
     158                 :             : 
     159                 :           1 :     if (owner_) {
     160                 :           1 :         msg.set_sender_address(owner_.address());
     161                 :             :     }
     162                 :             : 
     163                 :           1 :     auto* system = owner_ ? &owner_.get()->system() : system_;
     164                 :           1 :     if (system != nullptr && system->trace_manager() != nullptr) {
     165                 :           0 :         system->trace_manager()->inject_message_context(
     166                 :             :             msg, this,
     167                 :           0 :             system->trace_manager()->config().create_roots_for_actor_context_sends);
     168                 :             :     }
     169                 :             : 
     170                 :           1 :     if (ref.is_local()) {
     171                 :           1 :         if (system != nullptr) {
     172                 :           2 :             return system->try_deliver_local(target.id, std::move(msg),
     173                 :           1 :                                              priority, deadline_ns, options);
     174                 :             :         }
     175                 :           0 :         return {mailbox::EnqueueResultCode::ActorNotFound, target.id};
     176                 :             :     }
     177                 :             : 
     178                 :           0 :     return ref.try_send(ref.address(), std::move(msg), options);
     179                 :           1 : }
     180                 :             : 
     181                 :           4 : void ActorContext::reply(TypedMessage msg) {
     182                 :           4 :     if (current_sender_.id != ActorId{0}) {
     183                 :           3 :         send(current_sender_, std::move(msg));
     184                 :             :     }
     185                 :           4 : }
     186                 :             : 
     187                 :           0 : void ActorContext::reply(TypeTag tag, const google::protobuf::Message& proto_msg) {
     188                 :           0 :     TypedMessage msg(tag, proto_msg);
     189                 :           0 :     reply(std::move(msg));
     190                 :           0 : }
     191                 :             : 
     192                 :           1 : void ActorContext::reply_with_error(const error& err) {
     193                 :           1 :     if (current_sender_.id == ActorId{0}) {
     194                 :           0 :         return;
     195                 :             :     }
     196                 :             : 
     197                 :             :     // Wire format: [4 bytes: error code BE][error message string]
     198                 :             :     // A protobuf error message can replace this payload later without
     199                 :             :     // changing the TypeTag or dispatch path.
     200                 :           1 :     StreamBuffer payload;
     201                 :           1 :     const uint32_t code = err.code();
     202                 :           1 :     payload.push_back(static_cast<uint8_t>((code >> 24) & 0xFF));
     203                 :           1 :     payload.push_back(static_cast<uint8_t>((code >> 16) & 0xFF));
     204                 :           1 :     payload.push_back(static_cast<uint8_t>((code >> 8) & 0xFF));
     205                 :           1 :     payload.push_back(static_cast<uint8_t>(code & 0xFF));
     206                 :           1 :     const auto& msg = err.message();
     207                 :           1 :     payload.insert(payload.end(), msg.begin(), msg.end());
     208                 :             : 
     209                 :           1 :     TypedMessage error_msg(TypeTag::ErrorMsg, std::move(payload));
     210                 :           1 :     send(current_sender_, std::move(error_msg));
     211                 :           1 : }
     212                 :             : 
     213                 :             : AlarmHandle
     214                 :           2 : ActorContext::schedule(std::chrono::milliseconds delay, TypedMessage msg) {
     215                 :           2 :     auto* sched = system_->scheduler();
     216                 :           2 :     if (!sched)
     217                 :           0 :         return AlarmHandle{};
     218                 :             : 
     219                 :           2 :     msg.set_sender_address(owner_.address());
     220                 :             : 
     221                 :           2 :     ActorId self_id = owner_.id();
     222                 :           2 :     ActorSystem* sys = system_;
     223                 :             : 
     224                 :             :     // Wrap in shared_ptr because TypedMessage is move-only and std::function
     225                 :             :     // requires a copyable callable.
     226                 :           2 :     auto msg_ptr = std::make_shared<TypedMessage>(std::move(msg));
     227                 :           0 :     auto callback = [sys, self_id, msg_ptr]() {
     228                 :           0 :         sys->deliver_local(self_id, std::move(*msg_ptr));
     229                 :           2 :     };
     230                 :             : 
     231                 :             :     int64_t delay_ns =
     232                 :           2 :         std::chrono::duration_cast<std::chrono::nanoseconds>(delay).count();
     233                 :           2 :     auto handle = sched->schedule_after(std::move(callback), delay_ns);
     234                 :           2 :     return AlarmHandle{handle.value()};
     235                 :           2 : }
     236                 :             : 
     237                 :           5 : void ActorContext::cancel_schedule(AlarmHandle handle) {
     238                 :           5 :     if (handle.value() == 0)
     239                 :           2 :         return;
     240                 :           3 :     auto* sched = system_->scheduler();
     241                 :           3 :     if (!sched)
     242                 :           0 :         return;
     243                 :           3 :     sched->cancel_timer(sched::TimerHandle{handle.value()});
     244                 :             : }
     245                 :             : 
     246                 :           3 : std::vector<Actor> ActorContext::children() const {
     247                 :           3 :     return children_;
     248                 :             : }
     249                 :             : 
     250                 :           1 : void ActorContext::add_child(Actor child) {
     251                 :           1 :     children_.push_back(std::move(child));
     252                 :           1 : }
     253                 :             : 
     254                 :           1 : void ActorContext::remove_child(Actor child) {
     255                 :           1 :     for (auto it = children_.begin(); it != children_.end(); ++it) {
     256                 :           1 :         if (it->address() == child.address()) {
     257                 :           1 :             children_.erase(it);
     258                 :           1 :             return;
     259                 :             :         }
     260                 :             :     }
     261                 :             : }
     262                 :             : 
     263                 :           2 : std::vector<ActorRef> ActorContext::remote_children() const {
     264                 :           2 :     return remote_children_;
     265                 :             : }
     266                 :             : 
     267                 :           1 : void ActorContext::add_remote_child(ActorRef child) {
     268                 :           1 :     remote_children_.push_back(std::move(child));
     269                 :           1 : }
     270                 :             : 
     271                 :          46 : std::vector<ActorAddress> ActorContext::linked_actors() const {
     272                 :          46 :     return linked_;
     273                 :             : }
     274                 :             : 
     275                 :           1 : void ActorContext::monitor(const ActorAddress& target) {
     276                 :           1 :     add_monitored(target);
     277                 :           1 : }
     278                 :             : 
     279                 :           0 : RpcFuture<StreamBuffer> ActorContext::rpc(const ActorAddress& target,
     280                 :             :                                           const StreamBuffer& encoded_request,
     281                 :             :                                           std::chrono::milliseconds timeout_ms) {
     282                 :             :     const TraceContext* trace =
     283                 :           0 :         has_current_trace_context() ? &current_trace_context() : nullptr;
     284                 :           0 :     return system_->rpc_channel().call_raw(target, encoded_request, timeout_ms,
     285                 :           0 :                                            trace);
     286                 :             : }
     287                 :             : 
     288                 :             : RpcFuture<StreamBuffer>
     289                 :           0 : ActorContext::http_get(const std::string& url, std::vector<net::HttpHeader> headers) {
     290                 :           0 :     return system_->http_client().get(url, std::move(headers));
     291                 :             : }
     292                 :             : 
     293                 :             : RpcFuture<StreamBuffer>
     294                 :           0 : ActorContext::http_post(const std::string& url, StreamBuffer body,
     295                 :             :                         std::vector<net::HttpHeader> headers) {
     296                 :           0 :     return system_->http_client().post(url, std::move(body), std::move(headers));
     297                 :             : }
     298                 :             : 
     299                 :             : RpcFuture<StreamBuffer>
     300                 :           0 : ActorContext::http_put(const std::string& url, StreamBuffer body,
     301                 :             :                        std::vector<net::HttpHeader> headers) {
     302                 :           0 :     return system_->http_client().put(url, std::move(body), std::move(headers));
     303                 :             : }
     304                 :             : 
     305                 :             : RpcFuture<StreamBuffer>
     306                 :           0 : ActorContext::http_delete(const std::string& url,
     307                 :             :                           std::vector<net::HttpHeader> headers) {
     308                 :           0 :     return system_->http_client().del(url, std::move(headers));
     309                 :             : }
     310                 :             : 
     311                 :             : RpcFuture<StreamBuffer>
     312                 :           0 : ActorContext::http_request(net::HttpMethod method, const std::string& url,
     313                 :             :                            std::vector<net::HttpHeader> headers, StreamBuffer body) {
     314                 :           0 :     return system_->http_client().request(method, url, std::move(headers),
     315                 :           0 :                                           std::move(body));
     316                 :             : }
     317                 :             : 
     318                 :           3 : void ActorContext::on_backpressure(BackpressureHandler handler) {
     319                 :           3 :     backpressure_handler_ = std::move(handler);
     320                 :           3 : }
     321                 :             : 
     322                 :           2 : void ActorContext::handle_backpressure(const mailbox::BackpressureSignal& signal) {
     323                 :           2 :     if (backpressure_handler_) {
     324                 :           1 :         backpressure_handler_(signal);
     325                 :             :     }
     326                 :           2 : }
     327                 :             : 
     328                 :           4 : void ActorContext::stop(ActorId target) {
     329                 :             :     // Resolve system pointer
     330                 :           4 :     auto* system = system_ != nullptr
     331                 :           4 :                        ? system_
     332                 :           4 :                        : (owner_ ? &owner_.get()->system() : nullptr);
     333                 :           4 :     if (system == nullptr) {
     334                 :           1 :         return;
     335                 :             :     }
     336                 :             : 
     337                 :             :     // Resolve target actor
     338                 :           4 :     auto actor = system->get_actor(target);
     339                 :           4 :     if (actor == nullptr) {
     340                 :           0 :         return;
     341                 :             :     }
     342                 :             : 
     343                 :             :     // Emit drain start metric event
     344                 :           4 :     if (auto* ring_buf = system->metrics_ring_buffer()) {
     345                 :           4 :         metrics::MetricEvent evt{};
     346                 :           4 :         evt.actor_id = target;
     347                 :           4 :         evt.event_type = metrics::MetricEventType::kActorDrainStart;
     348                 :           4 :         evt.value_hi = 1;
     349                 :           4 :         ring_buf->try_push(evt);
     350                 :             :     }
     351                 :             : 
     352                 :             :     // Check for lifecycle support
     353                 :           4 :     auto* lc = actor->as_lifecycle();
     354                 :           4 :     if (lc == nullptr) {
     355                 :             :         // No lifecycle: call on_exit directly if EventBasedActor
     356                 :           1 :         if (actor->is_event_based_actor()) {
     357                 :           1 :             auto* eba = static_cast<EventBasedActor*>(actor.get());
     358                 :           1 :             eba->on_exit();
     359                 :             :         }
     360                 :           1 :         return;
     361                 :             :     }
     362                 :             : 
     363                 :           3 :     auto policy = lc->drain_config().policy;
     364                 :             : 
     365                 :           3 :     if (policy == DrainPolicy::ImmediateStop) {
     366                 :             :         // Drain mailbox directly (dead-letter all messages)
     367                 :           2 :         if (actor->is_event_based_actor()) {
     368                 :           2 :             auto* eba = static_cast<EventBasedActor*>(actor.get());
     369                 :           2 :             eba->drain_all_immediate();
     370                 :             :         }
     371                 :             :         // Drive lifecycle: kActive -> kStopping -> kStopped
     372                 :           2 :         lc->transition(LifecycleState::kStopping);
     373                 :           2 :         lc->transition(LifecycleState::kStopped);
     374                 :             :         // Notify linked/monitored actors
     375                 :           2 :         if (actor->is_event_based_actor()) {
     376                 :           2 :             auto* eba = static_cast<EventBasedActor*>(actor.get());
     377                 :           2 :             eba->on_exit();
     378                 :             :         }
     379                 :             :     } else {
     380                 :             :         // Transition to kDraining (invokes on_drain() hook)
     381                 :           1 :         lc->transition(LifecycleState::kDraining);
     382                 :             :         // Start drain timer (completes drain on timeout or when mailbox
     383                 :             :         // empties)
     384                 :           1 :         if (actor->is_event_based_actor()) {
     385                 :           1 :             auto* eba = static_cast<EventBasedActor*>(actor.get());
     386                 :           1 :             eba->start_drain_timer();
     387                 :             :         } else {
     388                 :             :             // Non-EventBasedActor with lifecycle but no drain timer support:
     389                 :             :             // transition directly to stopped.
     390                 :           0 :             lc->transition(LifecycleState::kStopping);
     391                 :           0 :             lc->transition(LifecycleState::kStopped);
     392                 :             :         }
     393                 :             :     }
     394                 :           4 : }
     395                 :             : 
     396                 :             : result<void>
     397                 :           2 : ActorContext::stop_sync(ActorId target, std::chrono::milliseconds timeout) {
     398                 :           2 :     stop(target);
     399                 :             : 
     400                 :             :     // Resolve system pointer for polling
     401                 :           2 :     auto* system = system_ != nullptr
     402                 :           2 :                        ? system_
     403                 :           2 :                        : (owner_ ? &owner_.get()->system() : nullptr);
     404                 :           2 :     if (system == nullptr) {
     405                 :           0 :         return result<void>::make(error(errors::actor_not_found, "no actor "
     406                 :             :                                                                  "system "
     407                 :           0 :                                                                  "available"));
     408                 :             :     }
     409                 :             : 
     410                 :           2 :     auto deadline = std::chrono::steady_clock::now() + timeout;
     411                 :             : 
     412                 :           3 :     while (std::chrono::steady_clock::now() < deadline) {
     413                 :           2 :         auto actor = system->get_actor(target);
     414                 :           2 :         if (actor == nullptr) {
     415                 :             :             // Actor removed from registry (already fully stopped / cleaned up)
     416                 :           0 :             return result<void>::make();
     417                 :             :         }
     418                 :             : 
     419                 :           2 :         auto* lc = actor->as_lifecycle();
     420                 :           2 :         if (lc != nullptr && lc->state() == LifecycleState::kStopped) {
     421                 :           1 :             return result<void>::make();
     422                 :             :         }
     423                 :           1 :         if (lc == nullptr) {
     424                 :             :             // Non-lifecycle actors: on_exit was called synchronously in stop(),
     425                 :             :             // consider it done.
     426                 :           0 :             return result<void>::make();
     427                 :             :         }
     428                 :             : 
     429                 :           1 :         std::this_thread::sleep_for(std::chrono::milliseconds(10));
     430                 :           2 :     }
     431                 :             : 
     432                 :           2 :     return result<void>::make(error(errors::timeout, "stop_sync timed out"));
     433                 :             : }
     434                 :             : 
     435                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1