Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/net/connection_pool.hpp>
16 : : #include <hpactor/net/frame.hpp>
17 : : #include <hpactor/spawn.hpp>
18 : :
19 : : #include <hpactor/common.pb.h>
20 : : #include <hpactor/log/logger.hpp>
21 : : #include <hpactor/messages.pb.h>
22 : :
23 : : namespace hpactor {
24 : :
25 : : namespace net {
26 : :
27 : 9 : ConnectionPool::ConnectionPool(EndPoint remote_endpoint,
28 : 9 : const PoolConfig& config, EventLoop* loop)
29 : 9 : : remote_endpoint_(remote_endpoint), config_(config), loop_(loop) {}
30 : :
31 : 9 : ConnectionPool::~ConnectionPool() {
32 : 9 : abort();
33 : 9 : }
34 : :
35 : 0 : ConnectionPtr ConnectionPool::get_connection() {
36 : 0 : std::lock_guard<std::mutex> lock(mutex_);
37 : 0 : if (active_connections_.empty()) {
38 : 0 : return nullptr;
39 : : }
40 : 0 : auto index = next_index_.fetch_add(1) % active_connections_.size();
41 : 0 : return active_connections_[index];
42 : 0 : }
43 : :
44 : 0 : void ConnectionPool::send(const ActorAddress& target, const StreamBuffer& encoded) {
45 : 0 : if (shutting_down_.load()) {
46 : 0 : return;
47 : : }
48 : :
49 : 0 : ConnectionPtr conn = get_connection();
50 : 0 : if (conn) {
51 : 0 : conn->send(encoded);
52 : 0 : return;
53 : : }
54 : :
55 : : // No connection available, queue pending
56 : 0 : add_pending(target, encoded);
57 : 0 : }
58 : :
59 : 0 : bool ConnectionPool::try_send(const ActorAddress& target,
60 : : const StreamBuffer& encoded) {
61 : 0 : if (shutting_down_.load()) {
62 : 0 : return false;
63 : : }
64 : :
65 : 0 : ConnectionPtr conn = get_connection();
66 : 0 : if (conn) {
67 : 0 : conn->send(encoded);
68 : 0 : return true;
69 : : }
70 : :
71 : : // No connection available — try to queue; fail if pending queue is full
72 : 0 : return add_pending(target, encoded);
73 : 0 : }
74 : :
75 : 0 : void ConnectionPool::send(const StreamBuffer& data) {
76 : : // Create a minimal actor address using the remote endpoint
77 : 0 : ActorAddress target;
78 : 0 : target.endpoint =
79 : 0 : endpoint_ops::parse_endpoint(endpoint_ops::to_string(remote_endpoint_));
80 : 0 : send(target, data);
81 : 0 : }
82 : :
83 : 0 : void ConnectionPool::close() {
84 : 0 : abort();
85 : 0 : }
86 : :
87 : 3 : bool ConnectionPool::is_connected() const {
88 : 3 : std::lock_guard<std::mutex> lock(mutex_);
89 : 3 : return !active_connections_.empty();
90 : 3 : }
91 : :
92 : 1 : PoolStats ConnectionPool::stats() const {
93 : 1 : std::lock_guard<std::mutex> lock(mutex_);
94 : 1 : PoolStats s;
95 : 1 : s.active_connections = active_connections_.size();
96 : 1 : s.pending_messages = pending_messages_.size();
97 : 1 : s.reconnect_attempts = reconnect_attempts_.load();
98 : 1 : s.is_connected = !active_connections_.empty();
99 : 1 : return s;
100 : 1 : }
101 : :
102 : 1 : size_t ConnectionPool::drain() {
103 : 1 : shutting_down_.store(true);
104 : 1 : std::lock_guard<std::mutex> lock(mutex_);
105 : 1 : size_t unsent = pending_messages_.size();
106 : 1 : for (auto& conn : active_connections_) {
107 : 0 : conn->close();
108 : : }
109 : 1 : active_connections_.clear();
110 : 1 : pending_messages_.clear();
111 : 2 : return unsent;
112 : 1 : }
113 : :
114 : 15 : void ConnectionPool::abort() {
115 : 15 : shutting_down_.store(true);
116 : 15 : std::lock_guard<std::mutex> lock(mutex_);
117 : 33 : for (auto& conn : active_connections_) {
118 : 18 : conn->close();
119 : : }
120 : 15 : active_connections_.clear();
121 : 15 : pending_messages_.clear();
122 : 15 : }
123 : :
124 : 0 : void ConnectionPool::set_rpc_handler(rpc_response_handler handler) {
125 : 0 : std::lock_guard<std::mutex> lock(mutex_);
126 : 0 : rpc_handler_ = std::move(handler);
127 : 0 : }
128 : :
129 : 0 : void ConnectionPool::set_spawn_handler(spawn_response_handler handler) {
130 : 0 : std::lock_guard<std::mutex> lock(mutex_);
131 : 0 : spawn_handler_ = std::move(handler);
132 : 0 : }
133 : :
134 : 0 : void ConnectionPool::on_connection_ready(ConnectionPtr conn) {
135 : : {
136 : 0 : std::lock_guard<std::mutex> lock(mutex_);
137 : 0 : active_connections_.push_back(conn);
138 : 0 : }
139 : 0 : flush_pending();
140 : 0 : }
141 : :
142 : 0 : void ConnectionPool::on_connection_error(ConnectionPtr conn, const error& err) {
143 : : (void)err;
144 : : {
145 : 0 : std::lock_guard<std::mutex> lock(mutex_);
146 : 0 : active_connections_.erase(std::remove(active_connections_.begin(),
147 : : active_connections_.end(), conn),
148 : 0 : active_connections_.end());
149 : 0 : }
150 : 0 : HPACTOR_LOG_ERROR(log::LogCategory::kNetwork, ActorId{0}, 0, "connection error");
151 : 0 : schedule_reconnect();
152 : 0 : }
153 : :
154 : 0 : void ConnectionPool::on_frame_received(StreamBuffer frame_data) {
155 : 0 : WireFrame frame = WireFrame::decode(frame_data);
156 : :
157 : : // Check for RPC response
158 : 0 : if (frame.pb_frame.flags() & WireFrame::RpcResponse) {
159 : : // Try to decode as spawn response first
160 : 0 : if (static_cast<TypeTag>(frame.pb_frame.type_tag()) ==
161 : : TypeTag::SpawnResponseTag) {
162 : 0 : ::hpactor::SpawnResponseMessage pb_resp;
163 : 0 : if (pb_resp.ParseFromArray(
164 : 0 : frame.pb_frame.payload().data(),
165 : 0 : static_cast<int>(frame.pb_frame.payload().size()))) {
166 : 0 : if (spawn_handler_) {
167 : 0 : SpawnResponse resp;
168 : 0 : resp.actor_addr = net::from_proto(pb_resp.actor_addr());
169 : 0 : resp.error_code = pb_resp.error_code();
170 : 0 : spawn_handler_(frame.pb_frame.message_id(), resp);
171 : 0 : return;
172 : : }
173 : : }
174 : 0 : }
175 : :
176 : : // Fall through to RPC handler
177 : 0 : if (rpc_handler_) {
178 : 0 : RpcResponseFrame response;
179 : 0 : response.msg_id = MessageId(frame.pb_frame.message_id());
180 : 0 : response.payload = StreamBuffer(frame.pb_frame.payload().begin(),
181 : 0 : frame.pb_frame.payload().end());
182 : 0 : if (frame.pb_frame.has_trace_context()) {
183 : : auto parsed = net::trace_context_from_proto(
184 : 0 : frame.pb_frame.trace_context(), 256);
185 : 0 : if (parsed.has_value()) {
186 : 0 : response.has_trace_context = true;
187 : 0 : response.trace_context = parsed.value();
188 : : }
189 : 0 : }
190 : 0 : rpc_handler_(response);
191 : 0 : }
192 : 0 : return;
193 : : }
194 : :
195 : : // Route to actor message handler (deliver_remote)
196 : 0 : if (actor_message_handler_) {
197 : 0 : actor_message_handler_(frame);
198 : : }
199 : 0 : }
200 : :
201 : 0 : void ConnectionPool::schedule_reconnect() {
202 : 0 : if (shutting_down_.load()) {
203 : 0 : return;
204 : : }
205 : 0 : if (reconnect_attempts_.load() >= config_.max_attempts) {
206 : 0 : return; // Exhausted retries
207 : : }
208 : 0 : if (reconnect_scheduled_.load()) {
209 : 0 : return;
210 : : }
211 : 0 : reconnect_scheduled_.store(true);
212 : :
213 : 0 : auto backoff = config_.initial_backoff;
214 : 0 : auto attempts = reconnect_attempts_.load();
215 : 0 : for (size_t i = 0; i < attempts; ++i) {
216 : 0 : backoff = backoff * 2;
217 : 0 : if (backoff > config_.max_backoff) {
218 : 0 : backoff = config_.max_backoff;
219 : : }
220 : : }
221 : :
222 : 0 : reconnect_attempts_.fetch_add(1);
223 : 0 : loop_->run_after([this]() { reconnect_scheduled_.store(false); },
224 : 0 : static_cast<int>(backoff.count()));
225 : : }
226 : :
227 : 0 : void ConnectionPool::flush_pending() {
228 : 0 : std::lock_guard<std::mutex> lock(mutex_);
229 : 0 : while (!pending_messages_.empty() && !active_connections_.empty()) {
230 : 0 : auto& msg = pending_messages_.front();
231 : 0 : auto conn = get_connection();
232 : 0 : if (conn) {
233 : 0 : conn->send(msg.data);
234 : 0 : pending_messages_.pop_front();
235 : : } else {
236 : 0 : break;
237 : : }
238 : 0 : }
239 : 0 : }
240 : :
241 : 0 : bool ConnectionPool::add_pending(const ActorAddress& target,
242 : : const StreamBuffer& data) {
243 : 0 : std::lock_guard<std::mutex> lock(mutex_);
244 : 0 : if (pending_messages_.size() >= config_.max_pending) {
245 : 0 : return false;
246 : : }
247 : 0 : pending_messages_.push_back({target, data, std::chrono::steady_clock::now()});
248 : 0 : return true;
249 : 0 : }
250 : :
251 : 18 : void ConnectionPool::add_connection(ConnectionPtr conn) {
252 : 18 : std::lock_guard<std::mutex> lock(mutex_);
253 : 18 : active_connections_.push_back(conn);
254 : 18 : }
255 : :
256 : 0 : void ConnectionPool::prewarm_pool(EndPoint ep,
257 : : const std::vector<AcceptorInfo>& acceptors) {
258 : : (void)ep; // Pool already bound to remote_endpoint_ from constructor
259 : 0 : std::lock_guard<std::mutex> lock(mutex_);
260 : 0 : acceptors_ = acceptors;
261 : : // The actual connection is established asynchronously on first use.
262 : : // This just ensures the pool is ready so the first send() doesn't pay
263 : : // discovery cost.
264 : 0 : }
265 : :
266 : : } // namespace net
267 : : } // namespace hpactor
|