LCOV - code coverage report
Current view: top level - include/hpactor/net - connection_pool.hpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 33.3 % 6 2
Test Date: 2026-05-20 02:24:49 Functions: 0.0 % 1 0
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #pragma once
      16                 :             : 
      17                 :             : #include <hpactor/net/acceptor.hpp>
      18                 :             : #include <hpactor/net/event_loop.hpp>
      19                 :             : #include <hpactor/net/tls_connection.hpp>
      20                 :             : #include <hpactor/net/tls_context.hpp>
      21                 :             : #include <hpactor/net/transport.hpp>
      22                 :             : #include <hpactor/net/wireframe_connection.hpp>
      23                 :             : #include <hpactor/ref/actor_address.hpp>
      24                 :             : #include <hpactor/rpc/rpc_types.hpp>
      25                 :             : #include <hpactor/spawn.hpp>
      26                 :             : 
      27                 :             : #include <atomic>
      28                 :             : #include <chrono>
      29                 :             : #include <deque>
      30                 :             : #include <functional>
      31                 :             : #include <memory>
      32                 :             : #include <mutex>
      33                 :             : #include <vector>
      34                 :             : 
      35                 :             : namespace hpactor {
      36                 :             : 
      37                 :             : namespace net {
      38                 :             : 
      39                 :             : struct WireFrame; // forward decl, full def in <hpactor/net/frame.hpp>
      40                 :             : 
      41                 :             : // -----------------------------------------------------------------------------
      42                 :             : // PoolConfig - connection pool configuration
      43                 :             : // -----------------------------------------------------------------------------
      44                 :             : struct PoolConfig {
      45                 :             :     size_t min_connections = 1;
      46                 :             :     size_t max_connections = 4;
      47                 :             :     size_t max_pending = 1000;
      48                 :             :     size_t max_attempts = 5;
      49                 :          12 :     std::chrono::milliseconds initial_backoff{1000};
      50                 :          12 :     std::chrono::milliseconds max_backoff{16000};
      51                 :             :     bool use_tls = false; // Default to plain text
      52                 :             : };
      53                 :             : 
      54                 :             : // Pending message entry
      55                 :             : struct PendingMessage {
      56                 :             :     ActorAddress target;
      57                 :             :     StreamBuffer data;
      58                 :             :     std::chrono::steady_clock::time_point enqueued_at;
      59                 :             : };
      60                 :             : 
      61                 :             : // Connection pool statistics
      62                 :             : struct PoolStats {
      63                 :             :     size_t active_connections = 0;
      64                 :             :     size_t pending_messages = 0;
      65                 :             :     size_t reconnect_attempts = 0;
      66                 :             :     bool is_connected = false;
      67                 :             : };
      68                 :             : 
      69                 :             : // -----------------------------------------------------------------------------
      70                 :             : // ConnectionPool - manages multiple connections to a remote node
      71                 :             : // -----------------------------------------------------------------------------
      72                 :             : // Owns a collection of PlainConnection/TlsConnection instances for load-
      73                 :             : // balanced communication with a single remote endpoint. Handles reconnection
      74                 :             : // with exponential backoff and pending message queuing.
      75                 :             : // -----------------------------------------------------------------------------
      76                 :             : class ConnectionPool {
      77                 :             :   public:
      78                 :             :     ConnectionPool(EndPoint remote_endpoint, const PoolConfig& config,
      79                 :             :                    EventLoop* loop);
      80                 :             :     ~ConnectionPool();
      81                 :             : 
      82                 :             :     // Non-copyable
      83                 :             :     ConnectionPool(const ConnectionPool&) = delete;
      84                 :             :     ConnectionPool& operator=(const ConnectionPool&) = delete;
      85                 :             : 
      86                 :             :     // Send message to specific actor on remote node (uses pool)
      87                 :             :     void send(const ActorAddress& target, const StreamBuffer& encoded);
      88                 :             : 
      89                 :             :     // Try to send message — returns false when no connection is available
      90                 :             :     // and the pending queue is full (or the pool is shutting down).
      91                 :             :     bool try_send(const ActorAddress& target, const StreamBuffer& encoded);
      92                 :             : 
      93                 :             :     // Send raw bytes to the remote node (uses default target)
      94                 :             :     void send(const StreamBuffer& data);
      95                 :             : 
      96                 :             :     // Close all connections and clear pending
      97                 :             :     void close();
      98                 :             : 
      99                 :             :     // Check if pool has active connections
     100                 :             :     bool is_connected() const;
     101                 :             : 
     102                 :             :     // Get pool statistics
     103                 :             :     PoolStats stats() const;
     104                 :             : 
     105                 :             :     // Graceful shutdown: drain pending messages
     106                 :             :     // Returns number of messages that could not be sent
     107                 :             :     size_t drain();
     108                 :             : 
     109                 :             :     // Immediate shutdown
     110                 :             :     void abort();
     111                 :             : 
     112                 :             :     // Get remote node ID
     113                 :             :     EndPoint remote_endpoint() const {
     114                 :             :         return remote_endpoint_;
     115                 :             :     }
     116                 :             : 
     117                 :             :     // Set handler for RPC responses (called when RpcResponse frame is received)
     118                 :             :     using rpc_response_handler = std::function<void(const RpcResponseFrame&)>;
     119                 :             :     void set_rpc_handler(rpc_response_handler handler);
     120                 :             : 
     121                 :             :     // Set handler for spawn responses (called when SpawnResponse frame is
     122                 :             :     // received)
     123                 :             :     using spawn_response_handler =
     124                 :             :         std::function<void(uint64_t message_id, const SpawnResponse&)>;
     125                 :             :     void set_spawn_handler(spawn_response_handler handler);
     126                 :             : 
     127                 :             :     // Set handler for actor messages (called for non-RPC, non-spawn frames)
     128                 :             :     using actor_message_handler = std::function<void(const WireFrame& frame)>;
     129                 :             : 
     130                 :           0 :     void set_actor_message_handler(actor_message_handler handler) {
     131                 :           0 :         std::lock_guard<std::mutex> lock(mutex_);
     132                 :           0 :         actor_message_handler_ = std::move(handler);
     133                 :           0 :     }
     134                 :             : 
     135                 :             :     // Called by TcpTransport when connection becomes ready
     136                 :             :     void on_connection_ready(ConnectionPtr conn);
     137                 :             : 
     138                 :             :     // Called by TcpTransport when connection has error
     139                 :             :     void on_connection_error(ConnectionPtr conn, const error& err);
     140                 :             : 
     141                 :             :     // Handle incoming frame (called by connection's frame handler).
     142                 :             :     void on_frame_received(StreamBuffer frame_data);
     143                 :             : 
     144                 :             :     // Add an externally-created connection to the pool
     145                 :             :     void add_connection(ConnectionPtr conn);
     146                 :             : 
     147                 :             :     // Proactively ensure the pool structure exists for an endpoint.
     148                 :             :     // The actual connection is established asynchronously on first use.
     149                 :             :     // This just ensures the pool is ready so the first send() doesn't pay
     150                 :             :     // discovery cost.
     151                 :             :     void prewarm_pool(EndPoint ep, const std::vector<AcceptorInfo>& acceptors);
     152                 :             : 
     153                 :             :   private:
     154                 :             :     // Get connection via round-robin
     155                 :             :     ConnectionPtr get_connection();
     156                 :             : 
     157                 :             :     // Schedule reconnect with backoff
     158                 :             :     void schedule_reconnect();
     159                 :             : 
     160                 :             :     // Flush pending messages
     161                 :             :     void flush_pending();
     162                 :             : 
     163                 :             :     // Add pending message
     164                 :             :     bool add_pending(const ActorAddress& target, const StreamBuffer& data);
     165                 :             : 
     166                 :             :     EndPoint remote_endpoint_;
     167                 :             :     PoolConfig config_;
     168                 :             :     EventLoop* loop_;
     169                 :             : 
     170                 :             :     std::vector<ConnectionPtr> active_connections_;
     171                 :             :     std::deque<PendingMessage> pending_messages_;
     172                 :             : 
     173                 :             :     std::atomic<size_t> next_index_{0};
     174                 :             :     std::atomic<size_t> reconnect_attempts_{0};
     175                 :             :     std::atomic<bool> reconnect_scheduled_{false};
     176                 :             : 
     177                 :             :     mutable std::mutex mutex_;
     178                 :             :     std::atomic<bool> shutting_down_{false};
     179                 :             : 
     180                 :             :     rpc_response_handler rpc_handler_;
     181                 :             :     spawn_response_handler spawn_handler_;
     182                 :             :     actor_message_handler actor_message_handler_;
     183                 :             : 
     184                 :             :     // Acceptor info for future connection establishment (set via prewarm_pool).
     185                 :             :     std::vector<AcceptorInfo> acceptors_;
     186                 :             : };
     187                 :             : 
     188                 :             : } // namespace net
     189                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1