LCOV - code coverage report
Current view: top level - src/actor - actor_system.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 60.4 % 545 329
Test Date: 2026-05-20 02:24:49 Functions: 59.2 % 49 29
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/actor/event_based_actor.hpp>
      16                 :             : #include <hpactor/actor/http_gateway_actor.hpp>
      17                 :             : #include <hpactor/actor/local_actor.hpp>
      18                 :             : #include <hpactor/actor/spawn_receiver.hpp>
      19                 :             : #include <hpactor/actor_type_registry.hpp>
      20                 :             : #include <hpactor/config/actor_factory_registry.hpp>
      21                 :             : #include <hpactor/config/toml_parser.hpp>
      22                 :             : #include <hpactor/core/actor_system.hpp>
      23                 :             : #include <hpactor/hpactor_config.hpp>
      24                 :             : 
      25                 :             : #include <hpactor/cli/cli_actor.hpp>
      26                 :             : #include <hpactor/core/actor_system_ids.hpp>
      27                 :             : #include <hpactor/log/log_manager.hpp>
      28                 :             : #include <hpactor/log/logger.hpp>
      29                 :             : #include <hpactor/mem/std_allocator.hpp>
      30                 :             : #include <hpactor/net/async_io_fwd.hpp>
      31                 :             : #include <hpactor/net/frame.hpp>
      32                 :             : #include <hpactor/net/tcp_transport.hpp>
      33                 :             : #include <hpactor/sched/scheduler.hpp>
      34                 :             : #include <hpactor/spawn.hpp>
      35                 :             : 
      36                 :             : // Protobuf message types for spawn serialization
      37                 :             : #include <hpactor/common.pb.h>
      38                 :             : #include <hpactor/messages.pb.h>
      39                 :             : 
      40                 :             : namespace hpactor {
      41                 :             : 
      42                 :             : // -----------------------------------------------------------------------------
      43                 :             : // actor_registry implementation
      44                 :             : // -----------------------------------------------------------------------------
      45                 :         106 : actor_registry::actor_registry(EndPoint endpoint) : endpoint_(endpoint) {}
      46                 :             : 
      47                 :           9 : void actor_registry::put(const std::string& name, ActorAddress addr) {
      48                 :           9 :     actors_[name] = addr;
      49                 :           9 : }
      50                 :             : 
      51                 :           9 : ActorAddress actor_registry::get(const std::string& name) const {
      52                 :           9 :     auto it = actors_.find(name);
      53                 :           9 :     if (it != actors_.end()) {
      54                 :           9 :         return it->second;
      55                 :             :     }
      56                 :           0 :     return invalid_actor_addr;
      57                 :             : }
      58                 :             : 
      59                 :           0 : void actor_registry::erase(const std::string& name) {
      60                 :           0 :     actors_.erase(name);
      61                 :           0 : }
      62                 :             : 
      63                 :             : // -----------------------------------------------------------------------------
      64                 :             : // ActorSystem implementation
      65                 :             : // -----------------------------------------------------------------------------
      66                 :         106 : ActorSystem::ActorSystem(const Config& config)
      67                 :         106 :     : config_(config), endpoint_(config.endpoint), registry_(endpoint_),
      68                 :         106 :       scheduler_(std::make_unique<sched::HybridScheduler>(
      69                 :         212 :           *this, config.scheduler_threads, 4, config.timer_backend,
      70                 :         106 :           config.scheduler_start_paused)),
      71                 :         212 :       actor_type_registry_(std::make_unique<ActorTypeRegistry>()) {
      72                 :             :     // Register system protobuf types
      73                 :         106 :     proto_registry_.register_system_types();
      74                 :             : 
      75                 :             :     // Initialize dead-letter queue
      76                 :             :     dead_letters_ =
      77                 :         106 :         std::make_unique<mailbox::DeadLetterQueue>(config_.dead_letters);
      78                 :             : 
      79                 :             :     // Initialize metrics subsystem (before scheduler so instrumentation is
      80                 :             :     // ready)
      81                 :         106 :     if (metrics_config_.enabled) {
      82                 :             :         metrics_ring_buffer_ =
      83                 :         106 :             std::make_shared<metrics::MpscRingBuffer<metrics::MetricEvent>>();
      84                 :         106 :         scheduler_->set_metrics_ring_buffer(metrics_ring_buffer_.get());
      85                 :             :     }
      86                 :             : 
      87                 :             :     // Initialize logging subsystem before starting the scheduler so that
      88                 :             :     // worker threads see a valid global logger, not a dangling pointer
      89                 :             :     // left over from a previous ActorSystem instance.
      90                 :         106 :     if (logging_config_.enabled) {
      91                 :         106 :         log_manager_ = std::make_unique<log::LogManager>(logging_config_);
      92                 :         106 :         log_manager_->start();
      93                 :         106 :         logger_ = &log_manager_->logger();
      94                 :             :     }
      95                 :             : 
      96                 :             :     // Wire logger to scheduler for scheduler-event logs
      97                 :         106 :     if (logger_) [[unlikely]] {
      98                 :         106 :         scheduler_->set_logger(logger_);
      99                 :             :     }
     100                 :             : 
     101                 :         106 :     scheduler_->start();
     102                 :             : 
     103                 :         106 :     apply_tracing_config(config_.tracing);
     104                 :             : 
     105                 :         106 :     if (config.enable_network) {
     106                 :           0 :         network_loop_ = std::make_unique<net::EventLoop>();
     107                 :           0 :         network_loop_->set_actor_system(this);
     108                 :             : 
     109                 :             :         // ── Service discovery backend ────────────────────────────
     110                 :           0 :         if (config.service_discovery) {
     111                 :           0 :             discovery_ = config.service_discovery;
     112                 :           0 :         } else if (config.registrar.udp_port > 0) {
     113                 :             :             auto reg = std::make_shared<net::UdpRegistrar>(
     114                 :           0 :                 config.registrar, endpoint_, network_loop_.get());
     115                 :           0 :             discovery_ = reg;
     116                 :           0 :             registrar_ = reg; // shared ownership for registrar() accessor
     117                 :           0 :         } else {
     118                 :             :             discovery_ =
     119                 :           0 :                 std::make_shared<net::StaticDiscovery>(std::vector<net::Member>{});
     120                 :             :         }
     121                 :             : 
     122                 :           0 :         discovery_->start();
     123                 :             : 
     124                 :           0 :         discovery_->on_member_change([this](const net::Member& m, bool joined) {
     125                 :           0 :             if (!joined) {
     126                 :           0 :                 on_node_dead(m.identity.endpoint);
     127                 :             :             }
     128                 :             :             // Note: proactive connection pool warming (prewarm_pool) will be
     129                 :             :             // integrated in a follow-up task when ConnectionPool is updated.
     130                 :           0 :         });
     131                 :             : 
     132                 :           0 :         location_cache_ = std::make_shared<net::ActorLocationCache>();
     133                 :           0 :         if (network_loop_) {
     134                 :           0 :             cache_purge_timer_ = network_loop_->run_every(
     135                 :           0 :                 [this]() {
     136                 :           0 :                     if (location_cache_)
     137                 :           0 :                         location_cache_->purge_expired();
     138                 :           0 :                 },
     139                 :             :                 60000);
     140                 :             :         }
     141                 :             : 
     142                 :           0 :         transport_ = std::make_unique<net::TcpTransport>(endpoint_, config.tls,
     143                 :           0 :                                                          config.pool, nullptr);
     144                 :             : 
     145                 :             :         rpc_channel_ =
     146                 :           0 :             std::make_unique<RpcChannel>(transport_.get(), scheduler_.get());
     147                 :             : 
     148                 :           0 :         if (config_.enable_http_client) {
     149                 :           0 :             http_client_ = std::make_unique<net::HttpClient>(network_loop_.get());
     150                 :             :         }
     151                 :             : 
     152                 :           0 :         if (config_.enable_http_gateway) {
     153                 :           0 :             http_gateway_actor_ = spawn<net::HTTPGatewayActor>(
     154                 :           0 :                 config_.http_bind_host, config_.http_port);
     155                 :             :         }
     156                 :             : 
     157                 :           0 :         if (config.tcp_port > 0) {
     158                 :           0 :             transport_->set_rpc_handler(
     159                 :           0 :                 [this](const hpactor::RpcResponseFrame& response) {
     160                 :           0 :                     rpc_channel_->on_response(response);
     161                 :           0 :                 });
     162                 :             : 
     163                 :           0 :             transport_->set_actor_message_handler([this](const net::WireFrame& frame) {
     164                 :           0 :                 this->deliver_remote(frame);
     165                 :           0 :             });
     166                 :             : 
     167                 :           0 :             transport_->listen(config.tcp_port);
     168                 :             :         }
     169                 :             : 
     170                 :           0 :         network_thread_ = std::thread([this]() {
     171                 :           0 :             while (network_loop_->wait(100) >= 0) {
     172                 :           0 :                 network_loop_->process_completions();
     173                 :           0 :                 if (!is_running())
     174                 :           0 :                     break;
     175                 :             :             }
     176                 :           0 :         });
     177                 :             : 
     178                 :             :         auto spawn_receiver = std::make_shared<SpawnReceiver>(
     179                 :           0 :             *this, *actor_type_registry_, transport_.get());
     180                 :           0 :         spawn_receiver->set_address(
     181                 :             :             ActorAddress{endpoint_, SystemActorType, SpawnReceiverId, 0});
     182                 :             : 
     183                 :             :         {
     184                 :           0 :             std::lock_guard<std::mutex> lock(actors_mutex_);
     185                 :           0 :             actors_.emplace(SpawnReceiverId, spawn_receiver);
     186                 :           0 :         }
     187                 :             : 
     188                 :             :         {
     189                 :           0 :             std::lock_guard<std::mutex> lock(mailboxes_mutex_);
     190                 :           0 :             mailboxes_.emplace(
     191                 :             :                 SpawnReceiverId,
     192                 :           0 :                 std::make_unique<mailbox::MPSCActorMailbox<TypedMessage>>(
     193                 :           0 :                     SpawnReceiverId, scheduler_.get(), mailbox_config_for_spawn()));
     194                 :           0 :         }
     195                 :           0 :     }
     196                 :             : 
     197                 :             :     // Spawn CLI actor (runtime opt-in via config_.cli.enabled)
     198                 :         106 :     if (config_.cli.enabled) {
     199                 :           0 :         auto spawned = spawn<cli::CliActor>(config_.cli);
     200                 :           0 :         cli_actor_ = std::static_pointer_cast<cli::CliActor>(spawned.get());
     201                 :           0 :     }
     202                 :         106 : }
     203                 :             : 
     204                 :         106 : ActorSystem::~ActorSystem() {
     205                 :         106 :     running_.store(false);
     206                 :         106 :     if (config_.enable_network) {
     207                 :           0 :         if (network_loop_) {
     208                 :           0 :             network_loop_->stop();
     209                 :             :         }
     210                 :           0 :         if (network_thread_.joinable()) {
     211                 :           0 :             network_thread_.join();
     212                 :             :         }
     213                 :           0 :         if (transport_) {
     214                 :           0 :             transport_->stop_listening();
     215                 :             :         }
     216                 :           0 :         if (discovery_) {
     217                 :           0 :             discovery_->stop();
     218                 :             :         }
     219                 :             :     }
     220                 :         106 :     if (log_manager_) {
     221                 :         106 :         log_manager_->stop();
     222                 :             :     }
     223                 :         106 :     if (trace_manager_) {
     224                 :           2 :         trace_manager_->stop();
     225                 :             :     }
     226                 :         106 :     scheduler_->stop();
     227                 :         106 : }
     228                 :             : 
     229                 :         113 : void ActorSystem::apply_tracing_config(const tracing::TraceConfig& config) {
     230                 :         113 :     tracing_config_ = config;
     231                 :         113 :     if (!tracing_config_.enabled) {
     232                 :         111 :         if (trace_manager_) {
     233                 :           0 :             trace_manager_->stop();
     234                 :           0 :             trace_manager_.reset();
     235                 :             :         }
     236                 :         111 :         return;
     237                 :             :     }
     238                 :           2 :     trace_manager_ = std::make_unique<tracing::TraceManager>(tracing_config_, this);
     239                 :           2 :     trace_manager_->start();
     240                 :             : }
     241                 :             : 
     242                 :           0 : void ActorSystem::on_node_dead(EndPoint dead_ep) {
     243                 :             :     // Find all actors linked to or monitoring actors on the dead endpoint.
     244                 :             :     // Uses the internal actor_contexts_ map directly (actor_context() is
     245                 :             :     // a protected member of AbstractActor, not accessible from ActorSystem).
     246                 :           0 :     std::lock_guard<std::mutex> lock(actor_contexts_mutex_);
     247                 :           0 :     for (const auto& [id, ctx] : actor_contexts_) {
     248                 :           0 :         if (!ctx)
     249                 :           0 :             continue;
     250                 :           0 :         for (const auto& addr : ctx->linked_actors()) {
     251                 :           0 :             if (addr.endpoint == dead_ep) {
     252                 :           0 :                 TypedMessage down(TypeTag::DownMsg, StreamBuffer{});
     253                 :           0 :                 down.set_sender_address(ActorAddress{dead_ep, 0, ActorId(0), 0});
     254                 :           0 :                 deliver_local(id, std::move(down));
     255                 :           0 :                 break;
     256                 :           0 :             }
     257                 :           0 :         }
     258                 :             :     }
     259                 :           0 :     if (location_cache_)
     260                 :           0 :         location_cache_->evict_node(dead_ep);
     261                 :           0 : }
     262                 :             : 
     263                 :           3 : void ActorSystem::signal_backpressure(const mailbox::BackpressureSignal& signal) {
     264                 :           3 :     if (signal.sender.id == ActorId{0})
     265                 :           1 :         return; // no sender
     266                 :             : 
     267                 :           2 :     std::lock_guard<std::mutex> lock(actor_contexts_mutex_);
     268                 :           2 :     auto it = actor_contexts_.find(signal.sender.id);
     269                 :           2 :     if (it != actor_contexts_.end() && it->second) {
     270                 :           2 :         it->second->handle_backpressure(signal);
     271                 :             :     }
     272                 :           2 : }
     273                 :             : 
     274                 :           0 : void ActorSystem::register_actor(const std::string& name, Actor actor) {
     275                 :           0 :     registry_.put(name, actor.address());
     276                 :           0 : }
     277                 :             : 
     278                 :           0 : Actor ActorSystem::resolve_actor(const std::string& name) {
     279                 :           0 :     ActorAddress addr = registry_.get(name);
     280                 :           0 :     if (!addr) {
     281                 :           0 :         return Actor{};
     282                 :             :     }
     283                 :           0 :     return Actor{};
     284                 :             : }
     285                 :             : 
     286                 :           0 : void ActorSystem::unregister_actor(const std::string& name) {
     287                 :           0 :     registry_.erase(name);
     288                 :           0 : }
     289                 :             : 
     290                 :           0 : void ActorSystem::register_actor_type(const ActorTypeDef& def) {
     291                 :           0 :     actor_types_[def.id] = def;
     292                 :           0 : }
     293                 :             : 
     294                 :           0 : ActorTypeDef ActorSystem::get_actor_type(ActorType type) const {
     295                 :           0 :     auto it = actor_types_.find(type);
     296                 :           0 :     if (it != actor_types_.end()) {
     297                 :           0 :         return it->second;
     298                 :             :     }
     299                 :           0 :     return ActorTypeDef{};
     300                 :             : }
     301                 :             : 
     302                 :         602 : std::shared_ptr<AbstractActor> ActorSystem::get_actor(ActorId id) {
     303                 :         602 :     std::lock_guard<std::mutex> lock(actors_mutex_);
     304                 :         602 :     auto it = actors_.find(id);
     305                 :         602 :     if (it != actors_.end()) {
     306                 :         580 :         return it->second;
     307                 :             :     }
     308                 :          22 :     return nullptr;
     309                 :         602 : }
     310                 :             : 
     311                 :         723 : mailbox::MPSCActorMailbox<TypedMessage>* ActorSystem::get_mailbox(ActorId id) {
     312                 :         723 :     std::lock_guard<std::mutex> lock(mailboxes_mutex_);
     313                 :         723 :     auto it = mailboxes_.find(id);
     314                 :         723 :     if (it != mailboxes_.end()) {
     315                 :         722 :         return it->second.get();
     316                 :             :     }
     317                 :           1 :     return nullptr;
     318                 :         723 : }
     319                 :             : 
     320                 :           0 : size_t ActorSystem::actor_count() const {
     321                 :           0 :     std::lock_guard<std::mutex> lock(actors_mutex_);
     322                 :           0 :     return actors_.size();
     323                 :           0 : }
     324                 :             : 
     325                 :           6 : void ActorSystem::for_each_actor(
     326                 :             :     std::function<void(ActorId, AbstractActor&)> callback) const {
     327                 :           6 :     std::lock_guard<std::mutex> lock(actors_mutex_);
     328                 :          19 :     for (auto& [id, actor] : actors_) {
     329                 :          13 :         callback(id, *actor);
     330                 :             :     }
     331                 :           6 : }
     332                 :             : 
     333                 :           0 : cli::CliActor* ActorSystem::cli_actor() const {
     334                 :           0 :     return cli_actor_.get();
     335                 :             : }
     336                 :             : 
     337                 :             : // -----------------------------------------------------------------------------
     338                 :             : // Dead-letter queue
     339                 :             : // -----------------------------------------------------------------------------
     340                 :          34 : bool ActorSystem::dead_letter(mailbox::DeadLetterRecord record) noexcept {
     341                 :          34 :     if (!dead_letters_) {
     342                 :           0 :         return false;
     343                 :             :     }
     344                 :          34 :     return dead_letters_->try_push(std::move(record));
     345                 :             : }
     346                 :             : 
     347                 :           5 : mailbox::DeadLetterQueueSnapshot ActorSystem::dead_letter_snapshot() const noexcept {
     348                 :           5 :     if (!dead_letters_) {
     349                 :           0 :         return {};
     350                 :             :     }
     351                 :           5 :     return dead_letters_->snapshot();
     352                 :             : }
     353                 :             : 
     354                 :          23 : bool ActorSystem::pop_dead_letter(mailbox::DeadLetterRecord& out) noexcept {
     355                 :          23 :     if (!dead_letters_) {
     356                 :           0 :         return false;
     357                 :             :     }
     358                 :          23 :     return dead_letters_->try_pop(out);
     359                 :             : }
     360                 :             : 
     361                 :             : // -----------------------------------------------------------------------------
     362                 :             : // Mailbox config helpers
     363                 :             : // -----------------------------------------------------------------------------
     364                 :         118 : mailbox::MailboxConfig ActorSystem::mailbox_config_for_spawn() const {
     365                 :         118 :     mailbox::MailboxConfig cfg;
     366                 :         118 :     cfg.capacity.max_messages = config_.mailbox.default_capacity;
     367                 :         118 :     cfg.capacity.max_bytes = config_.mailbox.default_byte_capacity;
     368                 :         118 :     cfg.overflow_policy = config_.mailbox.default_policy;
     369                 :         118 :     cfg.high_watermark = config_.mailbox.high_watermark;
     370                 :         118 :     cfg.low_watermark = config_.mailbox.low_watermark;
     371                 :         118 :     cfg.protected_system_messages = config_.mailbox.protected_system_messages;
     372                 :         118 :     cfg.backpressure_mode = config_.mailbox.backpressure_mode;
     373                 :         118 :     return cfg;
     374                 :             : }
     375                 :             : 
     376                 :             : mailbox::MailboxConfig
     377                 :          10 : ActorSystem::mailbox_config_for_actor_def(const config::ActorDef& def) const {
     378                 :          10 :     auto cfg = mailbox_config_for_spawn();
     379                 :          10 :     if (def.mailbox_capacity != 0) {
     380                 :           1 :         cfg.capacity.max_messages = def.mailbox_capacity;
     381                 :             :     }
     382                 :          10 :     if (def.mailbox.policy != mailbox::OverflowPolicy::RejectNewest) {
     383                 :           0 :         cfg.overflow_policy = def.mailbox.policy;
     384                 :             :     }
     385                 :          10 :     cfg.priority_aware = def.mailbox.priority_aware;
     386                 :          10 :     cfg.max_overflow_depth = def.mailbox.max_overflow_depth;
     387                 :          10 :     return cfg;
     388                 :             : }
     389                 :             : 
     390                 :             : // -----------------------------------------------------------------------------
     391                 :             : // try_deliver_local — bounded admission boundary
     392                 :             : // -----------------------------------------------------------------------------
     393                 :             : mailbox::EnqueueResult
     394                 :         365 : ActorSystem::try_deliver_local(ActorId target, TypedMessage msg,
     395                 :             :                                uint8_t priority, int64_t deadline_ns,
     396                 :             :                                mailbox::DeliveryOptions options) {
     397                 :         365 :     auto* mailbox = get_mailbox(target);
     398                 :         365 :     if (mailbox == nullptr) {
     399                 :             :         // Capture dead letter for missing actor
     400                 :           1 :         if (dead_letters_) {
     401                 :           1 :             mailbox::DeadLetterRecord dl;
     402                 :           1 :             dl.reason = mailbox::DeadLetterReason::ActorNotFound;
     403                 :           1 :             dl.source = mailbox::DeadLetterSource::LocalDelivery;
     404                 :           1 :             dl.sender = msg.sender_address();
     405                 :           1 :             dl.target = ActorAddress{endpoint_, ActorType{0}, target, 0};
     406                 :           1 :             dl.type_tag = msg.type_id();
     407                 :           1 :             dl.message_id = options.message_id;
     408                 :           1 :             dl.frame_flags = options.flags;
     409                 :           1 :             dl.priority = priority;
     410                 :           1 :             dl.deadline_ns = deadline_ns;
     411                 :           1 :             dl.payload_sample = msg.payload();
     412                 :           1 :             (void)dead_letter(std::move(dl));
     413                 :           1 :         }
     414                 :             : 
     415                 :           1 :         mailbox::EnqueueResult r;
     416                 :           1 :         r.code = mailbox::EnqueueResultCode::ActorNotFound;
     417                 :           1 :         r.target = target;
     418                 :           1 :         return r;
     419                 :             :     }
     420                 :             : 
     421                 :         364 :     mailbox::MailboxEnvelopeMeta meta;
     422                 :         364 :     meta.sender = msg.sender_address();
     423                 :         364 :     meta.type_tag = msg.type_id();
     424                 :         364 :     meta.priority = priority;
     425                 :         364 :     meta.deadline_ns = deadline_ns;
     426                 :         364 :     meta.flags = options.flags;
     427                 :         364 :     meta.message_id = options.message_id;
     428                 :         364 :     if (options.no_drop) {
     429                 :           0 :         meta.flags |= net::WireFrame::NoDrop;
     430                 :             :     }
     431                 :             : 
     432                 :         364 :     auto result = mailbox->try_push(std::move(msg), meta);
     433                 :             : 
     434                 :             :     // Capture dead letter when mailbox rejects and policy is DeadLetter
     435                 :         364 :     if (!result.accepted() && mailbox->config().overflow_policy ==
     436                 :             :                                   mailbox::OverflowPolicy::DeadLetter) {
     437                 :           0 :         if (dead_letters_) {
     438                 :           0 :             mailbox::DeadLetterRecord dl;
     439                 :           0 :             dl.reason = mailbox::DeadLetterReason::OverflowPolicy;
     440                 :           0 :             dl.source = mailbox::DeadLetterSource::MailboxAdmission;
     441                 :           0 :             dl.sender = meta.sender;
     442                 :           0 :             dl.target = ActorAddress{endpoint_, ActorType{0}, target, 0};
     443                 :           0 :             dl.type_tag = meta.type_tag;
     444                 :           0 :             dl.message_id = meta.message_id;
     445                 :           0 :             dl.frame_flags = meta.flags;
     446                 :           0 :             dl.priority = meta.priority;
     447                 :           0 :             dl.deadline_ns = meta.deadline_ns;
     448                 :           0 :             dl.mailbox_depth = result.depth;
     449                 :           0 :             dl.mailbox_capacity = result.capacity;
     450                 :           0 :             (void)dead_letter(std::move(dl));
     451                 :           0 :         }
     452                 :             :     }
     453                 :             : 
     454                 :             :     // Emit backpressure signal when target mailbox is under soft pressure
     455                 :         364 :     if (result.code == mailbox::EnqueueResultCode::AcceptedWithSoftPressure &&
     456                 :           4 :         options.emit_backpressure) {
     457                 :           3 :         mailbox::BackpressureSignal signal;
     458                 :           3 :         signal.target = ActorAddress{endpoint_, ActorType{0}, target, 0};
     459                 :           3 :         signal.sender = meta.sender;
     460                 :           3 :         signal.reason = mailbox::BackpressureReason::HighWatermark;
     461                 :           3 :         signal.depth = result.depth;
     462                 :           3 :         signal.capacity = result.capacity;
     463                 :           3 :         signal.pressure_ratio = result.pressure_ratio;
     464                 :           3 :         signal.retry_after = result.retry_after;
     465                 :           3 :         signal_backpressure(signal);
     466                 :             :     }
     467                 :             : 
     468                 :         364 :     return result;
     469                 :             : }
     470                 :             : 
     471                 :          28 : void ActorSystem::deliver_local(ActorId target, TypedMessage msg) {
     472                 :          28 :     (void)try_deliver_local(target, std::move(msg));
     473                 :          28 : }
     474                 :             : 
     475                 :           0 : void ActorSystem::deliver_local(ActorId target, TypedMessage msg,
     476                 :             :                                 uint8_t priority, int64_t deadline_ns) {
     477                 :           0 :     (void)try_deliver_local(target, std::move(msg), priority, deadline_ns, {});
     478                 :           0 : }
     479                 :             : 
     480                 :           1 : void ActorSystem::deliver_remote(const net::WireFrame& frame) {
     481                 :           1 :     StreamBuffer payload(frame.pb_frame.payload().begin(),
     482                 :           2 :                          frame.pb_frame.payload().end());
     483                 :           1 :     TypedMessage msg(static_cast<TypeTag>(frame.pb_frame.type_tag()),
     484                 :           2 :                      std::move(payload));
     485                 :           1 :     msg.set_sender_address(net::from_proto(frame.pb_frame.sender()));
     486                 :           1 :     if (frame.pb_frame.has_trace_context()) {
     487                 :           0 :         uint16_t max_state = tracing_config_.max_tracestate_len;
     488                 :             :         auto parsed = net::trace_context_from_proto(
     489                 :           0 :             frame.pb_frame.trace_context(), max_state);
     490                 :           0 :         if (parsed.has_value()) {
     491                 :           0 :             msg.set_trace_context(parsed.value());
     492                 :             :         }
     493                 :           0 :     }
     494                 :           1 :     deliver_local(net::from_proto(frame.pb_frame.receiver()).id, std::move(msg));
     495                 :           1 : }
     496                 :             : 
     497                 :           0 : void ActorSystem::enqueue_completion(net::OpCompletion completion) {
     498                 :             :     // Pack the completion into a StreamBuffer for mailbox delivery.
     499                 :             :     // The binary layout is a flat memcpy of the OpCompletion struct —
     500                 :             :     // it is local-only (same process, same address space), so no
     501                 :             :     // endianness or portability concerns.
     502                 :           0 :     StreamBuffer payload(sizeof(net::OpCompletion));
     503                 :           0 :     std::memcpy(payload.data(), &completion, sizeof(net::OpCompletion));
     504                 :             : 
     505                 :           0 :     TypedMessage msg(TypeTag::IoCompletionTag, std::move(payload));
     506                 :           0 :     deliver_local(completion.actor, std::move(msg));
     507                 :           0 : }
     508                 :             : 
     509                 :           3 : net::Transport* ActorSystem::get_transport_for(const EndPoint& /*endpoint*/) {
     510                 :             :     // TcpTransport already handles per-endpoint routing via its internal pools_
     511                 :             :     // map — TcpTransport::send() calls get_or_create_pool(target.endpoint)
     512                 :             :     // internally. Return the single transport_ for all remote endpoints.
     513                 :           3 :     if (!config_.enable_network) {
     514                 :           3 :         return nullptr;
     515                 :             :     }
     516                 :           0 :     return transport_.get();
     517                 :             : }
     518                 :             : 
     519                 :           0 : result<ActorRef> ActorSystem::spawn_remote(const std::string& node_name,
     520                 :             :                                            const std::string& actor_type,
     521                 :             :                                            const StreamBuffer& /*args*/) {
     522                 :           0 :     return spawn_remote_async(node_name, actor_type, StreamBuffer{}).get();
     523                 :             : }
     524                 :             : 
     525                 :           0 : AsyncActor ActorSystem::spawn_remote_async(const std::string& node_name,
     526                 :             :                                            const std::string& actor_type,
     527                 :             :                                            const StreamBuffer& /*args*/) {
     528                 :           0 :     AsyncActor handle(endpoint_, config_.spawn_timeout_ms);
     529                 :             : 
     530                 :           0 :     if (!config_.enable_network || !transport_) {
     531                 :           0 :         SpawnResponse resp;
     532                 :           0 :         resp.error_code = spawn_errors::node_unreachable;
     533                 :           0 :         handle.set_response(resp);
     534                 :           0 :         return handle;
     535                 :             :     }
     536                 :             : 
     537                 :           0 :     auto remote_endpoint = endpoint_ops::parse_endpoint(node_name);
     538                 :             : 
     539                 :             :     // Serialize spawn request using protobuf
     540                 :           0 :     ::hpactor::SpawnRequestMessage pb_req;
     541                 :             :     pb_req.set_actor_type_name(actor_type);
     542                 :           0 :     pb_req.set_args_type(static_cast<uint32_t>(TypeTag::User));
     543                 :           0 :     net::to_proto(pb_req.mutable_supervisor(), system_actor_.address());
     544                 :             : 
     545                 :           0 :     StreamBuffer request_bytes = proto_registry_.serialize(pb_req);
     546                 :           0 :     uint64_t msg_id = generate_message_id().value();
     547                 :             : 
     548                 :           0 :     net::WireFrame frame;
     549                 :           0 :     net::to_proto(frame.pb_frame.mutable_sender(), system_actor_.address());
     550                 :           0 :     net::to_proto(
     551                 :             :         frame.pb_frame.mutable_receiver(),
     552                 :           0 :         ActorAddress{remote_endpoint, SystemActorType, SpawnReceiverId, 0});
     553                 :           0 :     frame.pb_frame.set_message_id(msg_id);
     554                 :           0 :     frame.pb_frame.set_flags(net::WireFrame::RpcRequest);
     555                 :           0 :     frame.pb_frame.set_payload(reinterpret_cast<const char*>(request_bytes.data()),
     556                 :             :                                request_bytes.size());
     557                 :             : 
     558                 :           0 :     auto pending = std::make_shared<AsyncActor>(std::move(handle));
     559                 :           0 :     pending->set_message_id(msg_id);
     560                 :             : 
     561                 :             :     {
     562                 :           0 :         std::lock_guard<std::mutex> lock(pending_spawns_mutex_);
     563                 :           0 :         pending_spawns_.emplace(msg_id, pending);
     564                 :           0 :     }
     565                 :             : 
     566                 :           0 :     transport_->send(net::from_proto(frame.pb_frame.receiver()), frame.encode());
     567                 :             : 
     568                 :           0 :     return std::move(*pending);
     569                 :           0 : }
     570                 :             : 
     571                 :             : // -----------------------------------------------------------------------------
     572                 :             : // spawn_configured — spawn a pre-constructed actor with ActorDef config
     573                 :             : // -----------------------------------------------------------------------------
     574                 :          10 : Actor ActorSystem::spawn_configured(std::shared_ptr<AbstractActor> actor,
     575                 :             :                                     const config::ActorDef& def) {
     576                 :          20 :     ActorId id(next_actor_id_.fetch_add(1));
     577                 :          10 :     actor->set_address(ActorAddress(endpoint_, actor->type(), id, 0));
     578                 :          10 :     actor->set_type_name(def.behavior);
     579                 :             : 
     580                 :             :     {
     581                 :          10 :         std::lock_guard<std::mutex> lock(actors_mutex_);
     582                 :          10 :         actors_.emplace(id, actor);
     583                 :          10 :     }
     584                 :             : 
     585                 :             :     // Create mailbox with capacity from ActorDef
     586                 :             :     {
     587                 :          10 :         std::lock_guard<std::mutex> lock(mailboxes_mutex_);
     588                 :          10 :         mailboxes_.emplace(
     589                 :          10 :             id, std::make_unique<mailbox::MPSCActorMailbox<TypedMessage>>(
     590                 :          10 :                     id, scheduler_.get(), mailbox_config_for_actor_def(def)));
     591                 :          10 :     }
     592                 :             : 
     593                 :             :     // Create actor context and set it on the actor
     594                 :          10 :     auto* local = static_cast<LocalActor*>(actor.get());
     595                 :          10 :     auto actor_ctx = std::make_unique<ActorContext>(Actor(actor), this);
     596                 :          10 :     local->set_context(actor_ctx.get());
     597                 :          10 :     actor_contexts_.emplace(id, std::move(actor_ctx));
     598                 :             : 
     599                 :             :     // Set scheduler and mailbox on actor
     600                 :          10 :     actor->set_scheduler(scheduler_.get());
     601                 :          10 :     actor->set_mailbox(mailboxes_[id].get());
     602                 :             : 
     603                 :             :     // Register with scheduler. Actor class policy is authoritative for
     604                 :             :     // specialized actors such as DaemonActor and DenseComputingActor; TOML can
     605                 :             :     // only upgrade otherwise-cooperative actors.
     606                 :          10 :     auto policy = actor->dispatch_policy();
     607                 :          10 :     auto hints = actor->dispatch_hints();
     608                 :          10 :     if (policy == sched::DispatchPolicy::Cooperative) {
     609                 :           9 :         switch (def.dispatch_policy) {
     610                 :           9 :             case config::DispatchPolicy::Cooperative:
     611                 :           9 :                 break;
     612                 :           0 :             case config::DispatchPolicy::DedicatedThread:
     613                 :           0 :                 policy = sched::DispatchPolicy::DedicatedThread;
     614                 :           0 :                 break;
     615                 :           0 :             case config::DispatchPolicy::DedicatedPool:
     616                 :           0 :                 policy = sched::DispatchPolicy::DedicatedPool;
     617                 :           0 :                 break;
     618                 :             :         }
     619                 :             :     }
     620                 :             : 
     621                 :          10 :     switch (policy) {
     622                 :           9 :         case sched::DispatchPolicy::Cooperative:
     623                 :           9 :             scheduler_->notify_ready(id, 0, INT64_MAX);
     624                 :           9 :             break;
     625                 :           0 :         case sched::DispatchPolicy::DedicatedThread:
     626                 :           0 :             scheduler_->register_dedicated_thread(id, hints.cpu_affinity);
     627                 :           0 :             break;
     628                 :           1 :         case sched::DispatchPolicy::DedicatedPool:
     629                 :           1 :             scheduler_->register_dedicated_pool(id, hints.pool_size);
     630                 :           1 :             break;
     631                 :             :     }
     632                 :             : 
     633                 :             :     // Activate the actor (DaemonActor starts its thread here, etc.)
     634                 :          10 :     local->on_activate();
     635                 :             : 
     636                 :          10 :     return Actor(actor);
     637                 :          10 : }
     638                 :             : 
     639                 :             : // -----------------------------------------------------------------------------
     640                 :             : // load_topology — convenience entry point for TOML-based bootstrapping
     641                 :             : // -----------------------------------------------------------------------------
     642                 :           7 : result<void> ActorSystem::load_topology(const std::string& toml_path) {
     643                 :           7 :     auto parse_result = config::TomlParser::parse(toml_path);
     644                 :           7 :     if (!parse_result.has_value()) {
     645                 :           0 :         return result<void>::make(parse_result.error());
     646                 :             :     }
     647                 :             : 
     648                 :           7 :     auto& model = parse_result.value();
     649                 :             : 
     650                 :             :     // Apply system-level metrics config from topology
     651                 :           7 :     if (model.system.metrics_enabled) {
     652                 :           7 :         metrics_config_.enabled = model.system.metrics_enabled;
     653                 :           7 :         metrics_config_.ring_buffer_capacity =
     654                 :           7 :             model.system.metrics_ring_buffer_capacity;
     655                 :           7 :         metrics_config_.metrics_path = model.system.metrics_path;
     656                 :             :     }
     657                 :             : 
     658                 :             :     // Apply system-level logging config from topology
     659                 :           7 :     logging_config_ = model.system.logging;
     660                 :             : 
     661                 :             : // Apply shared system fields from topology
     662                 :             : // NOLINTBEGIN(cppcoreguidelines-macro-usage)
     663                 :             : #define HPACTOR_SYSTEM_TOML_FIELD(name, type, toml, def)                       \
     664                 :             :     config_.name = static_cast<decltype(config_.name)>(model.system.name);
     665                 :             : #include <hpactor/config/system_toml_fields.def>
     666                 :             : #undef HPACTOR_SYSTEM_TOML_FIELD
     667                 :             : 
     668                 :             : // Apply mailbox defaults from topology
     669                 :             : #define HPACTOR_MAILBOX_FIELD(name, type, toml, def)                           \
     670                 :             :     config_.mailbox.name = model.system.mailbox.name;
     671                 :             : #include <hpactor/config/mailbox_fields.def>
     672                 :             : #undef HPACTOR_MAILBOX_FIELD
     673                 :             :     // NOLINTEND(cppcoreguidelines-macro-usage)
     674                 :             : 
     675                 :           7 :     config_.dead_letters = model.system.dead_letters;
     676                 :             :     dead_letters_ =
     677                 :           7 :         std::make_unique<mailbox::DeadLetterQueue>(config_.dead_letters);
     678                 :             : 
     679                 :           7 :     apply_tracing_config(model.system.tracing);
     680                 :             : 
     681                 :           7 :     HPACTOR_LOG_INFO(log::LogCategory::kConfig, ActorId{0}, 0,
     682                 :             :                      "topology bootstrap complete");
     683                 :             : 
     684                 :             :     // Validate all behaviors are registered
     685                 :           7 :     auto& registry = config::ActorFactoryRegistry::instance();
     686                 :          16 :     for (const auto& actor_def : model.actors) {
     687                 :          10 :         if (!registry.has(actor_def.behavior)) {
     688                 :           1 :             error err(errors::unknown);
     689                 :           1 :             return result<void>::make(std::move(err));
     690                 :           1 :         }
     691                 :             :     }
     692                 :             : 
     693                 :             :     // Spawn actors in topological order; track numeric IDs for SystemInit
     694                 :             :     std::vector<ActorId, mem::MemStdAllocator<ActorId>> spawned_ids(
     695                 :           6 :         mem::MemStdAllocator<ActorId>(system_actor_.id(),
     696                 :           6 :                                       mem::RegionType::kInternal));
     697                 :          15 :     for (const auto& actor_def : model.actors) {
     698                 :           9 :         auto factory = registry.get_factory(actor_def.behavior);
     699                 :           9 :         auto actor_ptr = factory(nullptr, *this);
     700                 :             : 
     701                 :           9 :         Actor actor_handle = spawn_configured(std::move(actor_ptr), actor_def);
     702                 :             : 
     703                 :             :         // Register in name registry
     704                 :           9 :         registry_.put(actor_def.id, actor_handle.address());
     705                 :             : 
     706                 :           9 :         spawned_ids.push_back(actor_handle.id());
     707                 :           9 :     }
     708                 :             : 
     709                 :             :     // Broadcast SystemInit to all spawned actors
     710                 :           6 :     ActorAddress sys_addr = system_actor_.address();
     711                 :           6 :     StreamBuffer empty_payload;
     712                 :          15 :     for (ActorId id : spawned_ids) {
     713                 :           9 :         TypedMessage init_msg(TypeTag::SystemInitTag, std::move(empty_payload));
     714                 :           9 :         init_msg.set_sender_address(sys_addr);
     715                 :           9 :         deliver_local(id, std::move(init_msg));
     716                 :           9 :         empty_payload = StreamBuffer{};
     717                 :           9 :     }
     718                 :             : 
     719                 :           6 :     return result<void>::make();
     720                 :           7 : }
     721                 :             : 
     722                 :             : } // namespace hpactor
     723                 :             : 
     724                 :             : // ═══════════════════════════════════════════════════════════════════════════════
     725                 :             : // Shutdown helpers (anonymous namespace, uses public ActorSystem API only)
     726                 :             : // ═══════════════════════════════════════════════════════════════════════════════
     727                 :             : 
     728                 :             : namespace hpactor {
     729                 :             : namespace {
     730                 :             : 
     731                 :             : struct ActorDrainInfo {
     732                 :             :     ActorId id;
     733                 :             :     bool is_system;
     734                 :             : };
     735                 :             : 
     736                 :          13 : void initiate_actor_drain(ActorSystem& sys, ActorId id) {
     737                 :          13 :     auto actor = sys.get_actor(id);
     738                 :          13 :     if (!actor)
     739                 :           0 :         return;
     740                 :             : 
     741                 :          13 :     auto* lc = actor->as_lifecycle();
     742                 :          13 :     if (lc == nullptr) {
     743                 :             :         // No lifecycle: call on_exit directly if EventBasedActor
     744                 :           0 :         if (actor->is_event_based_actor()) {
     745                 :           0 :             static_cast<EventBasedActor*>(actor.get())->on_exit();
     746                 :             :         }
     747                 :           0 :         return;
     748                 :             :     }
     749                 :             : 
     750                 :          13 :     auto state = lc->state();
     751                 :             :     // Skip actors that are already stopping or stopped
     752                 :          13 :     if (state == LifecycleState::kStopping || state == LifecycleState::kStopped)
     753                 :           2 :         return;
     754                 :             : 
     755                 :          11 :     auto policy = lc->drain_config().policy;
     756                 :             : 
     757                 :          11 :     if (policy == DrainPolicy::ImmediateStop) {
     758                 :             :         // Drain mailbox synchronously (dead-letter all messages)
     759                 :           7 :         if (actor->is_event_based_actor()) {
     760                 :           7 :             static_cast<EventBasedActor*>(actor.get())->drain_all_immediate();
     761                 :             :         } else {
     762                 :           0 :             auto* mailbox = sys.get_mailbox(id);
     763                 :           0 :             if (mailbox) {
     764                 :           0 :                 TypedMessage msg;
     765                 :           0 :                 while (mailbox->try_pop(msg)) {
     766                 :             :                     // Messages dropped — equivalent to dead-lettering
     767                 :             :                 }
     768                 :           0 :             }
     769                 :             :         }
     770                 :             :         // Drive lifecycle: kActive -> kStopping -> kStopped
     771                 :           7 :         lc->transition(LifecycleState::kStopping);
     772                 :           7 :         lc->transition(LifecycleState::kStopped);
     773                 :             :         // Notify linked/monitored actors
     774                 :           7 :         if (actor->is_event_based_actor()) {
     775                 :           7 :             static_cast<EventBasedActor*>(actor.get())->on_exit();
     776                 :             :         }
     777                 :             :     } else {
     778                 :             :         // Drain / DropUserMessages / deferred policies:
     779                 :             :         // transition to kDraining, let EventBasedActor::receive() / drain
     780                 :             :         // timer handle completion.
     781                 :           4 :         if (state == LifecycleState::kActive) {
     782                 :           4 :             lc->transition(LifecycleState::kDraining);
     783                 :             :         }
     784                 :             :         // Start drain timer if EventBasedActor
     785                 :           4 :         if (actor->is_event_based_actor()) {
     786                 :           4 :             static_cast<EventBasedActor*>(actor.get())->start_drain_timer();
     787                 :             :         } else {
     788                 :             :             // Non-EventBasedActor with lifecycle but no drain timer:
     789                 :             :             // transition directly to stopped.
     790                 :           0 :             lc->transition(LifecycleState::kStopping);
     791                 :           0 :             lc->transition(LifecycleState::kStopped);
     792                 :             :         }
     793                 :             :     }
     794                 :          13 : }
     795                 :             : 
     796                 :          12 : void poll_drain_complete(ActorSystem& sys, ActorId id,
     797                 :             :                          std::chrono::steady_clock::time_point deadline) {
     798                 :          33 :     while (std::chrono::steady_clock::now() < deadline) {
     799                 :          30 :         auto actor = sys.get_actor(id);
     800                 :          30 :         if (!actor)
     801                 :           0 :             return; // Actor removed from registry
     802                 :             : 
     803                 :          30 :         auto* lc = actor->as_lifecycle();
     804                 :          30 :         if (lc == nullptr)
     805                 :           0 :             return; // No lifecycle — already handled
     806                 :             : 
     807                 :          30 :         if (lc->state() == LifecycleState::kStopped)
     808                 :           9 :             return; // Drain complete
     809                 :             : 
     810                 :          21 :         std::this_thread::sleep_for(std::chrono::milliseconds(1));
     811                 :          30 :     }
     812                 :             : }
     813                 :             : 
     814                 :             : } // anonymous namespace
     815                 :             : } // namespace hpactor
     816                 :             : 
     817                 :             : // ═══════════════════════════════════════════════════════════════════════════════
     818                 :             : // ActorSystem — shutdown implementation
     819                 :             : // ═══════════════════════════════════════════════════════════════════════════════
     820                 :             : 
     821                 :             : namespace hpactor {
     822                 :             : 
     823                 :           0 : result<void> ActorSystem::shutdown() {
     824                 :           0 :     return shutdown(ShutdownOptions{});
     825                 :             : }
     826                 :             : 
     827                 :           6 : result<void> ActorSystem::shutdown(const ShutdownOptions& opts) {
     828                 :           6 :     ShutdownPhase phase = ShutdownPhase::Running;
     829                 :             : 
     830                 :             :     // Helper: check if we should force-stop (modifies phase/running in place)
     831                 :          58 :     auto check_force = [&](std::chrono::steady_clock::time_point deadline) -> bool {
     832                 :          58 :         if (!opts.force_after_timeout)
     833                 :           0 :             return false;
     834                 :          58 :         if (std::chrono::steady_clock::now() < deadline)
     835                 :          46 :             return false;
     836                 :          12 :         phase = ShutdownPhase::ForcedStop;
     837                 :          12 :         shutdown_phase_.store(ShutdownPhase::ForcedStop, std::memory_order_release);
     838                 :          12 :         running_.store(false, std::memory_order_release);
     839                 :          12 :         return true;
     840                 :           6 :     };
     841                 :             : 
     842                 :             :     // ── Phase: DrainingIngress ──────────────────────────────────────────
     843                 :           6 :     phase = ShutdownPhase::DrainingIngress;
     844                 :           6 :     shutdown_phase_.store(ShutdownPhase::DrainingIngress, std::memory_order_release);
     845                 :           6 :     is_ready_.store(false, std::memory_order_release);
     846                 :             : 
     847                 :           6 :     auto ingress_deadline = std::chrono::steady_clock::now() + opts.ingress_timeout;
     848                 :             :     // (HTTP gateway / remote spawn gating deferred to follow-up tasks)
     849                 :           6 :     if (check_force(ingress_deadline))
     850                 :           0 :         return result<void>::make();
     851                 :             : 
     852                 :             :     // ── Phase: DrainingActors ──────────────────────────────────────────
     853                 :           6 :     phase = ShutdownPhase::DrainingActors;
     854                 :           6 :     shutdown_phase_.store(ShutdownPhase::DrainingActors, std::memory_order_release);
     855                 :             :     auto actor_deadline =
     856                 :           6 :         std::chrono::steady_clock::now() + opts.actor_drain_timeout;
     857                 :             : 
     858                 :             :     // Collect actor IDs under lock, then drain in order
     859                 :             :     {
     860                 :           6 :         std::vector<ActorDrainInfo> actors;
     861                 :           6 :         for_each_actor([&](ActorId id, AbstractActor& actor) {
     862                 :          13 :             actors.push_back({id, actor.is_system_actor()});
     863                 :          13 :         });
     864                 :             :         // Lock released — safe to call into actors
     865                 :             : 
     866                 :             :         // Pass 1: initiate drain for non-system actors
     867                 :          19 :         for (auto& info : actors) {
     868                 :          13 :             if (info.is_system)
     869                 :           1 :                 continue;
     870                 :          12 :             initiate_actor_drain(*this, info.id);
     871                 :          12 :             if (check_force(actor_deadline))
     872                 :           0 :                 break;
     873                 :             :         }
     874                 :             :         // Poll non-system actors to completion
     875                 :           6 :         if (!check_force(actor_deadline)) {
     876                 :          15 :             for (const auto& info : actors) {
     877                 :          12 :                 if (info.is_system)
     878                 :           1 :                     continue;
     879                 :          11 :                 poll_drain_complete(*this, info.id, actor_deadline);
     880                 :          11 :                 if (check_force(actor_deadline))
     881                 :           3 :                     break;
     882                 :             :             }
     883                 :             :         }
     884                 :             : 
     885                 :             :         // Pass 2: initiate drain for system actors (last)
     886                 :           6 :         if (!check_force(actor_deadline)) {
     887                 :          12 :             for (auto& info : actors) {
     888                 :           9 :                 if (!info.is_system)
     889                 :           8 :                     continue;
     890                 :           1 :                 initiate_actor_drain(*this, info.id);
     891                 :           1 :                 if (check_force(actor_deadline))
     892                 :           0 :                     break;
     893                 :             :             }
     894                 :             :         }
     895                 :             :         // Poll system actors to completion
     896                 :           6 :         if (!check_force(actor_deadline)) {
     897                 :          12 :             for (const auto& info : actors) {
     898                 :           9 :                 if (!info.is_system)
     899                 :           8 :                     continue;
     900                 :           1 :                 poll_drain_complete(*this, info.id, actor_deadline);
     901                 :           1 :                 if (check_force(actor_deadline))
     902                 :           0 :                     break;
     903                 :             :             }
     904                 :             :         }
     905                 :           6 :     }
     906                 :             : 
     907                 :           6 :     if (check_force(actor_deadline))
     908                 :           3 :         return result<void>::make();
     909                 :             : 
     910                 :             :     // ── Phase: LeavingCluster ──────────────────────────────────────────
     911                 :           3 :     phase = ShutdownPhase::LeavingCluster;
     912                 :           3 :     shutdown_phase_.store(ShutdownPhase::LeavingCluster, std::memory_order_release);
     913                 :             :     auto leave_deadline =
     914                 :           3 :         std::chrono::steady_clock::now() + opts.cluster_leave_timeout;
     915                 :             :     // (Full implementation deferred until sharding)
     916                 :           3 :     if (check_force(leave_deadline))
     917                 :           0 :         return result<void>::make();
     918                 :             : 
     919                 :             :     // ── Phase: FlushingTelemetry ──────────────────────────────────────
     920                 :           3 :     phase = ShutdownPhase::FlushingTelemetry;
     921                 :           3 :     shutdown_phase_.store(ShutdownPhase::FlushingTelemetry,
     922                 :             :                           std::memory_order_release);
     923                 :             :     // Best-effort flush of logs, metrics, DLQ — no blocking
     924                 :             : 
     925                 :             :     // ── Phase: Stopped ─────────────────────────────────────────────────
     926                 :           3 :     phase = ShutdownPhase::Stopped;
     927                 :           3 :     shutdown_phase_.store(ShutdownPhase::Stopped, std::memory_order_release);
     928                 :           3 :     running_.store(false, std::memory_order_release);
     929                 :           3 :     return result<void>::make();
     930                 :             : }
     931                 :             : 
     932                 :           5 : ShutdownPhase ActorSystem::shutdown_phase() const noexcept {
     933                 :           5 :     return shutdown_phase_.load(std::memory_order_acquire);
     934                 :             : }
     935                 :             : 
     936                 :           4 : bool ActorSystem::is_ready() const noexcept {
     937                 :           4 :     return is_ready_.load(std::memory_order_acquire);
     938                 :             : }
     939                 :             : 
     940                 :           1 : bool ActorSystem::is_draining() const noexcept {
     941                 :           1 :     return shutdown_phase_.load(std::memory_order_acquire) ==
     942                 :           1 :            ShutdownPhase::DrainingActors;
     943                 :             : }
     944                 :             : 
     945                 :           0 : void ActorSystem::set_drain_config(ActorId target, DrainConfig cfg) {
     946                 :           0 :     auto actor = get_actor(target);
     947                 :           0 :     if (actor) {
     948                 :           0 :         if (auto* lc = actor->as_lifecycle()) {
     949                 :           0 :             lc->set_drain_config(cfg);
     950                 :             :         }
     951                 :             :     }
     952                 :           0 : }
     953                 :             : 
     954                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1