Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/net/acceptor.hpp>
18 : : #include <hpactor/net/event_loop.hpp>
19 : : #include <hpactor/net/tls_connection.hpp>
20 : : #include <hpactor/net/tls_context.hpp>
21 : : #include <hpactor/net/transport.hpp>
22 : : #include <hpactor/net/wireframe_connection.hpp>
23 : : #include <hpactor/ref/actor_address.hpp>
24 : : #include <hpactor/rpc/rpc_types.hpp>
25 : : #include <hpactor/spawn.hpp>
26 : :
27 : : #include <atomic>
28 : : #include <chrono>
29 : : #include <deque>
30 : : #include <functional>
31 : : #include <memory>
32 : : #include <mutex>
33 : : #include <vector>
34 : :
35 : : namespace hpactor {
36 : :
37 : : namespace net {
38 : :
39 : : struct WireFrame; // forward decl, full def in <hpactor/net/frame.hpp>
40 : :
41 : : // -----------------------------------------------------------------------------
42 : : // PoolConfig - connection pool configuration
43 : : // -----------------------------------------------------------------------------
44 : : struct PoolConfig {
45 : : size_t min_connections = 1;
46 : : size_t max_connections = 4;
47 : : size_t max_pending = 1000;
48 : : size_t max_attempts = 5;
49 : 12 : std::chrono::milliseconds initial_backoff{1000};
50 : 12 : std::chrono::milliseconds max_backoff{16000};
51 : : bool use_tls = false; // Default to plain text
52 : : };
53 : :
54 : : // Pending message entry
55 : : struct PendingMessage {
56 : : ActorAddress target;
57 : : StreamBuffer data;
58 : : std::chrono::steady_clock::time_point enqueued_at;
59 : : };
60 : :
61 : : // Connection pool statistics
62 : : struct PoolStats {
63 : : size_t active_connections = 0;
64 : : size_t pending_messages = 0;
65 : : size_t reconnect_attempts = 0;
66 : : bool is_connected = false;
67 : : };
68 : :
69 : : // -----------------------------------------------------------------------------
70 : : // ConnectionPool - manages multiple connections to a remote node
71 : : // -----------------------------------------------------------------------------
72 : : // Owns a collection of PlainConnection/TlsConnection instances for load-
73 : : // balanced communication with a single remote endpoint. Handles reconnection
74 : : // with exponential backoff and pending message queuing.
75 : : // -----------------------------------------------------------------------------
76 : : class ConnectionPool {
77 : : public:
78 : : ConnectionPool(EndPoint remote_endpoint, const PoolConfig& config,
79 : : EventLoop* loop);
80 : : ~ConnectionPool();
81 : :
82 : : // Non-copyable
83 : : ConnectionPool(const ConnectionPool&) = delete;
84 : : ConnectionPool& operator=(const ConnectionPool&) = delete;
85 : :
86 : : // Send message to specific actor on remote node (uses pool)
87 : : void send(const ActorAddress& target, const StreamBuffer& encoded);
88 : :
89 : : // Try to send message — returns false when no connection is available
90 : : // and the pending queue is full (or the pool is shutting down).
91 : : bool try_send(const ActorAddress& target, const StreamBuffer& encoded);
92 : :
93 : : // Send raw bytes to the remote node (uses default target)
94 : : void send(const StreamBuffer& data);
95 : :
96 : : // Close all connections and clear pending
97 : : void close();
98 : :
99 : : // Check if pool has active connections
100 : : bool is_connected() const;
101 : :
102 : : // Get pool statistics
103 : : PoolStats stats() const;
104 : :
105 : : // Graceful shutdown: drain pending messages
106 : : // Returns number of messages that could not be sent
107 : : size_t drain();
108 : :
109 : : // Immediate shutdown
110 : : void abort();
111 : :
112 : : // Get remote node ID
113 : : EndPoint remote_endpoint() const {
114 : : return remote_endpoint_;
115 : : }
116 : :
117 : : // Set handler for RPC responses (called when RpcResponse frame is received)
118 : : using rpc_response_handler = std::function<void(const RpcResponseFrame&)>;
119 : : void set_rpc_handler(rpc_response_handler handler);
120 : :
121 : : // Set handler for spawn responses (called when SpawnResponse frame is
122 : : // received)
123 : : using spawn_response_handler =
124 : : std::function<void(uint64_t message_id, const SpawnResponse&)>;
125 : : void set_spawn_handler(spawn_response_handler handler);
126 : :
127 : : // Set handler for actor messages (called for non-RPC, non-spawn frames)
128 : : using actor_message_handler = std::function<void(const WireFrame& frame)>;
129 : :
130 : 0 : void set_actor_message_handler(actor_message_handler handler) {
131 : 0 : std::lock_guard<std::mutex> lock(mutex_);
132 : 0 : actor_message_handler_ = std::move(handler);
133 : 0 : }
134 : :
135 : : // Called by TcpTransport when connection becomes ready
136 : : void on_connection_ready(ConnectionPtr conn);
137 : :
138 : : // Called by TcpTransport when connection has error
139 : : void on_connection_error(ConnectionPtr conn, const error& err);
140 : :
141 : : // Handle incoming frame (called by connection's frame handler).
142 : : void on_frame_received(StreamBuffer frame_data);
143 : :
144 : : // Add an externally-created connection to the pool
145 : : void add_connection(ConnectionPtr conn);
146 : :
147 : : // Proactively ensure the pool structure exists for an endpoint.
148 : : // The actual connection is established asynchronously on first use.
149 : : // This just ensures the pool is ready so the first send() doesn't pay
150 : : // discovery cost.
151 : : void prewarm_pool(EndPoint ep, const std::vector<AcceptorInfo>& acceptors);
152 : :
153 : : private:
154 : : // Get connection via round-robin
155 : : ConnectionPtr get_connection();
156 : :
157 : : // Schedule reconnect with backoff
158 : : void schedule_reconnect();
159 : :
160 : : // Flush pending messages
161 : : void flush_pending();
162 : :
163 : : // Add pending message
164 : : bool add_pending(const ActorAddress& target, const StreamBuffer& data);
165 : :
166 : : EndPoint remote_endpoint_;
167 : : PoolConfig config_;
168 : : EventLoop* loop_;
169 : :
170 : : std::vector<ConnectionPtr> active_connections_;
171 : : std::deque<PendingMessage> pending_messages_;
172 : :
173 : : std::atomic<size_t> next_index_{0};
174 : : std::atomic<size_t> reconnect_attempts_{0};
175 : : std::atomic<bool> reconnect_scheduled_{false};
176 : :
177 : : mutable std::mutex mutex_;
178 : : std::atomic<bool> shutting_down_{false};
179 : :
180 : : rpc_response_handler rpc_handler_;
181 : : spawn_response_handler spawn_handler_;
182 : : actor_message_handler actor_message_handler_;
183 : :
184 : : // Acceptor info for future connection establishment (set via prewarm_pool).
185 : : std::vector<AcceptorInfo> acceptors_;
186 : : };
187 : :
188 : : } // namespace net
189 : : } // namespace hpactor
|