LCOV - code coverage report
Current view: top level - src/net - connection_pool.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 23.8 % 168 40
Test Date: 2026-05-20 02:24:49 Functions: 31.8 % 22 7
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/net/connection_pool.hpp>
      16                 :             : #include <hpactor/net/frame.hpp>
      17                 :             : #include <hpactor/spawn.hpp>
      18                 :             : 
      19                 :             : #include <hpactor/common.pb.h>
      20                 :             : #include <hpactor/log/logger.hpp>
      21                 :             : #include <hpactor/messages.pb.h>
      22                 :             : 
      23                 :             : namespace hpactor {
      24                 :             : 
      25                 :             : namespace net {
      26                 :             : 
      27                 :           9 : ConnectionPool::ConnectionPool(EndPoint remote_endpoint,
      28                 :           9 :                                const PoolConfig& config, EventLoop* loop)
      29                 :           9 :     : remote_endpoint_(remote_endpoint), config_(config), loop_(loop) {}
      30                 :             : 
      31                 :           9 : ConnectionPool::~ConnectionPool() {
      32                 :           9 :     abort();
      33                 :           9 : }
      34                 :             : 
      35                 :           0 : ConnectionPtr ConnectionPool::get_connection() {
      36                 :           0 :     std::lock_guard<std::mutex> lock(mutex_);
      37                 :           0 :     if (active_connections_.empty()) {
      38                 :           0 :         return nullptr;
      39                 :             :     }
      40                 :           0 :     auto index = next_index_.fetch_add(1) % active_connections_.size();
      41                 :           0 :     return active_connections_[index];
      42                 :           0 : }
      43                 :             : 
      44                 :           0 : void ConnectionPool::send(const ActorAddress& target, const StreamBuffer& encoded) {
      45                 :           0 :     if (shutting_down_.load()) {
      46                 :           0 :         return;
      47                 :             :     }
      48                 :             : 
      49                 :           0 :     ConnectionPtr conn = get_connection();
      50                 :           0 :     if (conn) {
      51                 :           0 :         conn->send(encoded);
      52                 :           0 :         return;
      53                 :             :     }
      54                 :             : 
      55                 :             :     // No connection available, queue pending
      56                 :           0 :     add_pending(target, encoded);
      57                 :           0 : }
      58                 :             : 
      59                 :           0 : bool ConnectionPool::try_send(const ActorAddress& target,
      60                 :             :                               const StreamBuffer& encoded) {
      61                 :           0 :     if (shutting_down_.load()) {
      62                 :           0 :         return false;
      63                 :             :     }
      64                 :             : 
      65                 :           0 :     ConnectionPtr conn = get_connection();
      66                 :           0 :     if (conn) {
      67                 :           0 :         conn->send(encoded);
      68                 :           0 :         return true;
      69                 :             :     }
      70                 :             : 
      71                 :             :     // No connection available — try to queue; fail if pending queue is full
      72                 :           0 :     return add_pending(target, encoded);
      73                 :           0 : }
      74                 :             : 
      75                 :           0 : void ConnectionPool::send(const StreamBuffer& data) {
      76                 :             :     // Create a minimal actor address using the remote endpoint
      77                 :           0 :     ActorAddress target;
      78                 :           0 :     target.endpoint =
      79                 :           0 :         endpoint_ops::parse_endpoint(endpoint_ops::to_string(remote_endpoint_));
      80                 :           0 :     send(target, data);
      81                 :           0 : }
      82                 :             : 
      83                 :           0 : void ConnectionPool::close() {
      84                 :           0 :     abort();
      85                 :           0 : }
      86                 :             : 
      87                 :           3 : bool ConnectionPool::is_connected() const {
      88                 :           3 :     std::lock_guard<std::mutex> lock(mutex_);
      89                 :           3 :     return !active_connections_.empty();
      90                 :           3 : }
      91                 :             : 
      92                 :           1 : PoolStats ConnectionPool::stats() const {
      93                 :           1 :     std::lock_guard<std::mutex> lock(mutex_);
      94                 :           1 :     PoolStats s;
      95                 :           1 :     s.active_connections = active_connections_.size();
      96                 :           1 :     s.pending_messages = pending_messages_.size();
      97                 :           1 :     s.reconnect_attempts = reconnect_attempts_.load();
      98                 :           1 :     s.is_connected = !active_connections_.empty();
      99                 :           1 :     return s;
     100                 :           1 : }
     101                 :             : 
     102                 :           1 : size_t ConnectionPool::drain() {
     103                 :           1 :     shutting_down_.store(true);
     104                 :           1 :     std::lock_guard<std::mutex> lock(mutex_);
     105                 :           1 :     size_t unsent = pending_messages_.size();
     106                 :           1 :     for (auto& conn : active_connections_) {
     107                 :           0 :         conn->close();
     108                 :             :     }
     109                 :           1 :     active_connections_.clear();
     110                 :           1 :     pending_messages_.clear();
     111                 :           2 :     return unsent;
     112                 :           1 : }
     113                 :             : 
     114                 :          15 : void ConnectionPool::abort() {
     115                 :          15 :     shutting_down_.store(true);
     116                 :          15 :     std::lock_guard<std::mutex> lock(mutex_);
     117                 :          33 :     for (auto& conn : active_connections_) {
     118                 :          18 :         conn->close();
     119                 :             :     }
     120                 :          15 :     active_connections_.clear();
     121                 :          15 :     pending_messages_.clear();
     122                 :          15 : }
     123                 :             : 
     124                 :           0 : void ConnectionPool::set_rpc_handler(rpc_response_handler handler) {
     125                 :           0 :     std::lock_guard<std::mutex> lock(mutex_);
     126                 :           0 :     rpc_handler_ = std::move(handler);
     127                 :           0 : }
     128                 :             : 
     129                 :           0 : void ConnectionPool::set_spawn_handler(spawn_response_handler handler) {
     130                 :           0 :     std::lock_guard<std::mutex> lock(mutex_);
     131                 :           0 :     spawn_handler_ = std::move(handler);
     132                 :           0 : }
     133                 :             : 
     134                 :           0 : void ConnectionPool::on_connection_ready(ConnectionPtr conn) {
     135                 :             :     {
     136                 :           0 :         std::lock_guard<std::mutex> lock(mutex_);
     137                 :           0 :         active_connections_.push_back(conn);
     138                 :           0 :     }
     139                 :           0 :     flush_pending();
     140                 :           0 : }
     141                 :             : 
     142                 :           0 : void ConnectionPool::on_connection_error(ConnectionPtr conn, const error& err) {
     143                 :             :     (void)err;
     144                 :             :     {
     145                 :           0 :         std::lock_guard<std::mutex> lock(mutex_);
     146                 :           0 :         active_connections_.erase(std::remove(active_connections_.begin(),
     147                 :             :                                               active_connections_.end(), conn),
     148                 :           0 :                                   active_connections_.end());
     149                 :           0 :     }
     150                 :           0 :     HPACTOR_LOG_ERROR(log::LogCategory::kNetwork, ActorId{0}, 0, "connection error");
     151                 :           0 :     schedule_reconnect();
     152                 :           0 : }
     153                 :             : 
     154                 :           0 : void ConnectionPool::on_frame_received(StreamBuffer frame_data) {
     155                 :           0 :     WireFrame frame = WireFrame::decode(frame_data);
     156                 :             : 
     157                 :             :     // Check for RPC response
     158                 :           0 :     if (frame.pb_frame.flags() & WireFrame::RpcResponse) {
     159                 :             :         // Try to decode as spawn response first
     160                 :           0 :         if (static_cast<TypeTag>(frame.pb_frame.type_tag()) ==
     161                 :             :             TypeTag::SpawnResponseTag) {
     162                 :           0 :             ::hpactor::SpawnResponseMessage pb_resp;
     163                 :           0 :             if (pb_resp.ParseFromArray(
     164                 :           0 :                     frame.pb_frame.payload().data(),
     165                 :           0 :                     static_cast<int>(frame.pb_frame.payload().size()))) {
     166                 :           0 :                 if (spawn_handler_) {
     167                 :           0 :                     SpawnResponse resp;
     168                 :           0 :                     resp.actor_addr = net::from_proto(pb_resp.actor_addr());
     169                 :           0 :                     resp.error_code = pb_resp.error_code();
     170                 :           0 :                     spawn_handler_(frame.pb_frame.message_id(), resp);
     171                 :           0 :                     return;
     172                 :             :                 }
     173                 :             :             }
     174                 :           0 :         }
     175                 :             : 
     176                 :             :         // Fall through to RPC handler
     177                 :           0 :         if (rpc_handler_) {
     178                 :           0 :             RpcResponseFrame response;
     179                 :           0 :             response.msg_id = MessageId(frame.pb_frame.message_id());
     180                 :           0 :             response.payload = StreamBuffer(frame.pb_frame.payload().begin(),
     181                 :           0 :                                             frame.pb_frame.payload().end());
     182                 :           0 :             if (frame.pb_frame.has_trace_context()) {
     183                 :             :                 auto parsed = net::trace_context_from_proto(
     184                 :           0 :                     frame.pb_frame.trace_context(), 256);
     185                 :           0 :                 if (parsed.has_value()) {
     186                 :           0 :                     response.has_trace_context = true;
     187                 :           0 :                     response.trace_context = parsed.value();
     188                 :             :                 }
     189                 :           0 :             }
     190                 :           0 :             rpc_handler_(response);
     191                 :           0 :         }
     192                 :           0 :         return;
     193                 :             :     }
     194                 :             : 
     195                 :             :     // Route to actor message handler (deliver_remote)
     196                 :           0 :     if (actor_message_handler_) {
     197                 :           0 :         actor_message_handler_(frame);
     198                 :             :     }
     199                 :           0 : }
     200                 :             : 
     201                 :           0 : void ConnectionPool::schedule_reconnect() {
     202                 :           0 :     if (shutting_down_.load()) {
     203                 :           0 :         return;
     204                 :             :     }
     205                 :           0 :     if (reconnect_attempts_.load() >= config_.max_attempts) {
     206                 :           0 :         return; // Exhausted retries
     207                 :             :     }
     208                 :           0 :     if (reconnect_scheduled_.load()) {
     209                 :           0 :         return;
     210                 :             :     }
     211                 :           0 :     reconnect_scheduled_.store(true);
     212                 :             : 
     213                 :           0 :     auto backoff = config_.initial_backoff;
     214                 :           0 :     auto attempts = reconnect_attempts_.load();
     215                 :           0 :     for (size_t i = 0; i < attempts; ++i) {
     216                 :           0 :         backoff = backoff * 2;
     217                 :           0 :         if (backoff > config_.max_backoff) {
     218                 :           0 :             backoff = config_.max_backoff;
     219                 :             :         }
     220                 :             :     }
     221                 :             : 
     222                 :           0 :     reconnect_attempts_.fetch_add(1);
     223                 :           0 :     loop_->run_after([this]() { reconnect_scheduled_.store(false); },
     224                 :           0 :                      static_cast<int>(backoff.count()));
     225                 :             : }
     226                 :             : 
     227                 :           0 : void ConnectionPool::flush_pending() {
     228                 :           0 :     std::lock_guard<std::mutex> lock(mutex_);
     229                 :           0 :     while (!pending_messages_.empty() && !active_connections_.empty()) {
     230                 :           0 :         auto& msg = pending_messages_.front();
     231                 :           0 :         auto conn = get_connection();
     232                 :           0 :         if (conn) {
     233                 :           0 :             conn->send(msg.data);
     234                 :           0 :             pending_messages_.pop_front();
     235                 :             :         } else {
     236                 :           0 :             break;
     237                 :             :         }
     238                 :           0 :     }
     239                 :           0 : }
     240                 :             : 
     241                 :           0 : bool ConnectionPool::add_pending(const ActorAddress& target,
     242                 :             :                                  const StreamBuffer& data) {
     243                 :           0 :     std::lock_guard<std::mutex> lock(mutex_);
     244                 :           0 :     if (pending_messages_.size() >= config_.max_pending) {
     245                 :           0 :         return false;
     246                 :             :     }
     247                 :           0 :     pending_messages_.push_back({target, data, std::chrono::steady_clock::now()});
     248                 :           0 :     return true;
     249                 :           0 : }
     250                 :             : 
     251                 :          18 : void ConnectionPool::add_connection(ConnectionPtr conn) {
     252                 :          18 :     std::lock_guard<std::mutex> lock(mutex_);
     253                 :          18 :     active_connections_.push_back(conn);
     254                 :          18 : }
     255                 :             : 
     256                 :           0 : void ConnectionPool::prewarm_pool(EndPoint ep,
     257                 :             :                                   const std::vector<AcceptorInfo>& acceptors) {
     258                 :             :     (void)ep; // Pool already bound to remote_endpoint_ from constructor
     259                 :           0 :     std::lock_guard<std::mutex> lock(mutex_);
     260                 :           0 :     acceptors_ = acceptors;
     261                 :             :     // The actual connection is established asynchronously on first use.
     262                 :             :     // This just ensures the pool is ready so the first send() doesn't pay
     263                 :             :     // discovery cost.
     264                 :           0 : }
     265                 :             : 
     266                 :             : } // namespace net
     267                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1