Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/net/registrar.hpp>
16 : : #include <hpactor/net/registrar_serialization.hpp>
17 : :
18 : : #include <arpa/inet.h>
19 : : #include <ifaddrs.h>
20 : : #include <net/if.h>
21 : : #include <netinet/in.h>
22 : : #include <netinet/tcp.h>
23 : : #include <sys/socket.h>
24 : : #include <unistd.h>
25 : :
26 : : #include <cstring>
27 : :
28 : : #include <hpactor/log/logger.hpp>
29 : :
30 : : namespace hpactor {
31 : :
32 : : namespace net {
33 : :
34 : : // -----------------------------------------------------------------------------
35 : : // Helper Functions
36 : : // -----------------------------------------------------------------------------
37 : :
38 : 0 : static std::string get_local_ip() {
39 : 0 : struct ifaddrs* ifaddr = nullptr;
40 : 0 : if (getifaddrs(&ifaddr) == -1) {
41 : 0 : return "127.0.0.1"; // Fallback
42 : : }
43 : :
44 : : // Prefer non-loopback, up, running interfaces
45 : 0 : std::string result;
46 : 0 : for (struct ifaddrs* ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) {
47 : 0 : if (ifa->ifa_addr == nullptr)
48 : 0 : continue;
49 : 0 : if (!(ifa->ifa_flags & IFF_UP))
50 : 0 : continue;
51 : 0 : if (!(ifa->ifa_flags & IFF_RUNNING))
52 : 0 : continue;
53 : 0 : if (ifa->ifa_flags & IFF_LOOPBACK)
54 : 0 : continue;
55 : 0 : if (ifa->ifa_addr->sa_family != AF_INET)
56 : 0 : continue;
57 : :
58 : 0 : struct sockaddr_in* addr =
59 : : reinterpret_cast<struct sockaddr_in*>(ifa->ifa_addr);
60 : : char ip[INET_ADDRSTRLEN];
61 : 0 : if (inet_ntop(AF_INET, &addr->sin_addr, ip, sizeof(ip)) != nullptr) {
62 : 0 : result = ip;
63 : 0 : break; // Take first valid non-loopback
64 : : }
65 : : }
66 : :
67 : 0 : freeifaddrs(ifaddr);
68 : 0 : return result.empty() ? "127.0.0.1" : result;
69 : 0 : }
70 : :
71 : : // -----------------------------------------------------------------------------
72 : : // RegistrarClient Implementation
73 : : // -----------------------------------------------------------------------------
74 : :
75 : 0 : void RegistrarClient::set_acceptors(std::vector<AcceptorInfo> acceptors) {
76 : 0 : acceptors_ = std::move(acceptors);
77 : 0 : }
78 : :
79 : 0 : RegistrarClient::RegistrarClient(const RegistrarConfig& config,
80 : : EndPoint local_endpoint, EndPoint server_endpoint,
81 : 0 : NodeRegistry* shared_registry, EventLoop* loop)
82 : 0 : : config_(config), local_endpoint_(local_endpoint),
83 : 0 : server_endpoint_(server_endpoint), shared_registry_(shared_registry),
84 : 0 : loop_(loop), last_heartbeat_sent_(std::chrono::steady_clock::now()) {}
85 : :
86 : 0 : RegistrarClient::~RegistrarClient() {
87 : 0 : stop();
88 : 0 : }
89 : :
90 : 0 : void RegistrarClient::start() {
91 : 0 : if (running_.load()) {
92 : 0 : return;
93 : : }
94 : :
95 : 0 : running_.store(true);
96 : 0 : connected_.store(false);
97 : :
98 : 0 : if (loop_) {
99 : : // Schedule heartbeat using EventLoop
100 : 0 : heartbeat_timer_ = loop_->run_every(
101 : 0 : [this]() {
102 : 0 : if (connected_.load() && server_connection_) {
103 : : // Build heartbeat message (no payload, just header)
104 : 0 : StreamBuffer message;
105 : 0 : message.resize(TcpHeaderSize);
106 : :
107 : 0 : uint32_t magic_be = htonl(TcpRegistrarMagic);
108 : 0 : memcpy(message.data(), &magic_be, 4);
109 : 0 : message[4] = TcpRegistrarVersion;
110 : 0 : message[5] = static_cast<uint8_t>(TcpMessageType::Heartbeat);
111 : 0 : uint32_t len_be = htonl(0);
112 : 0 : memcpy(message.data() + 6, &len_be, 4);
113 : :
114 : 0 : server_connection_->send_message(TcpMessageType::Heartbeat,
115 : 0 : StreamBuffer{});
116 : 0 : }
117 : 0 : },
118 : 0 : static_cast<int>(config_.heartbeat_interval.count()));
119 : : }
120 : :
121 : : // Start connection attempts
122 : 0 : attempt_connection();
123 : : }
124 : :
125 : 0 : void RegistrarClient::stop() {
126 : 0 : if (!running_.load()) {
127 : 0 : return;
128 : : }
129 : :
130 : 0 : running_.store(false);
131 : 0 : connected_.store(false);
132 : :
133 : : // Cancel EventLoop timers
134 : 0 : if (loop_) {
135 : 0 : if (heartbeat_timer_ != 0) {
136 : 0 : loop_->cancel_timer(heartbeat_timer_);
137 : 0 : heartbeat_timer_ = 0;
138 : : }
139 : : }
140 : :
141 : : // Close server connection
142 : 0 : if (server_connection_) {
143 : 0 : server_connection_->close();
144 : 0 : server_connection_.reset();
145 : : }
146 : : }
147 : :
148 : 0 : void RegistrarClient::attempt_connection() {
149 : 0 : if (!running_.load()) {
150 : 0 : return;
151 : : }
152 : :
153 : : // Get server endpoint from registry
154 : 0 : NodeEndpoint* server_ep = shared_registry_->get(server_endpoint_);
155 : 0 : if (!server_ep) {
156 : : // Schedule retry if we have an event loop
157 : 0 : if (loop_) {
158 : 0 : loop_->run_after(
159 : 0 : [this]() {
160 : 0 : if (running_.load()) {
161 : 0 : attempt_connection();
162 : : }
163 : 0 : },
164 : : 1000);
165 : : }
166 : 0 : return;
167 : : }
168 : :
169 : : // Resolve server hostname
170 : 0 : std::string server_ip = server_ep->identity.host;
171 : : struct in_addr addr;
172 : 0 : if (inet_pton(AF_INET, server_ip.c_str(), &addr) != 1) {
173 : : // Try to resolve hostname
174 : 0 : HostResolver resolver;
175 : 0 : server_ip = resolver.resolve(server_ep->identity.host);
176 : 0 : if (server_ip.empty()) {
177 : : // Schedule retry
178 : 0 : if (loop_) {
179 : 0 : loop_->run_after(
180 : 0 : [this]() {
181 : 0 : if (running_.load()) {
182 : 0 : attempt_connection();
183 : : }
184 : 0 : },
185 : : 1000);
186 : : }
187 : 0 : return;
188 : : }
189 : 0 : }
190 : :
191 : : // Create TCP socket
192 : 0 : int sock = socket(AF_INET, SOCK_STREAM, 0);
193 : 0 : if (sock < 0) {
194 : 0 : return;
195 : : }
196 : :
197 : : // Set TCP_NODELAY for lower latency
198 : 0 : int nodelay = 1;
199 : 0 : setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
200 : :
201 : : // Connect to server
202 : : struct sockaddr_in server_addr;
203 : 0 : memset(&server_addr, 0, sizeof(server_addr));
204 : 0 : server_addr.sin_family = AF_INET;
205 : 0 : server_addr.sin_port = htons(server_ep->tcp_port);
206 : :
207 : 0 : if (inet_pton(AF_INET, server_ip.c_str(), &server_addr.sin_addr) <= 0) {
208 : 0 : close(sock);
209 : 0 : return;
210 : : }
211 : :
212 : 0 : if (::connect(sock, reinterpret_cast<struct sockaddr*>(&server_addr),
213 : 0 : sizeof(server_addr)) < 0) {
214 : 0 : close(sock);
215 : : // Schedule retry
216 : 0 : if (loop_) {
217 : 0 : loop_->run_after(
218 : 0 : [this]() {
219 : 0 : if (running_.load()) {
220 : 0 : attempt_connection();
221 : : }
222 : 0 : },
223 : : 1000);
224 : : }
225 : 0 : return;
226 : : }
227 : :
228 : : // Create RegistrarConnection wrapper
229 : : server_connection_ =
230 : 0 : RegistrarConnection::connecting(sock, server_endpoint_, loop_);
231 : :
232 : : // Set up message handler
233 : 0 : server_connection_->set_message_handler(
234 : 0 : [this](TcpMessageType type, const StreamBuffer& data) {
235 : 0 : handle_server_message(type, data);
236 : 0 : });
237 : :
238 : : // Set up disconnect handler
239 : 0 : server_connection_->set_disconnect_handler([this]() { handle_disconnect(); });
240 : :
241 : : // Connection successful — reset failover counter
242 : 0 : reconnect_attempts_ = 0;
243 : 0 : connected_.store(true);
244 : :
245 : : // Send registration
246 : 0 : send_registration();
247 : 0 : }
248 : :
249 : 0 : void RegistrarClient::send_registration() {
250 : 0 : if (!server_connection_ || !connected_.load()) {
251 : 0 : return;
252 : : }
253 : :
254 : : // Build registration message using protobuf
255 : 0 : std::string host = get_local_ip();
256 : 0 : uint16_t tcp_port = config_.tcp_port;
257 : :
258 : : // Create NodeEndpoint for serialization
259 : 0 : NodeEndpoint ep;
260 : 0 : ep.identity.endpoint = local_endpoint_;
261 : 0 : ep.identity.host = host;
262 : 0 : ep.tcp_port = tcp_port;
263 : 0 : ep.identity.acceptors = acceptors_;
264 : :
265 : 0 : StreamBuffer payload = serialize_register_payload(ep);
266 : 0 : server_connection_->send_message(TcpMessageType::Register, payload);
267 : 0 : }
268 : :
269 : 0 : void RegistrarClient::handle_server_message(TcpMessageType type,
270 : : const StreamBuffer& data) {
271 : 0 : switch (type) {
272 : 0 : case TcpMessageType::Accept: {
273 : : // Registration accepted - server acknowledges our registration
274 : : // Payload: [ErrorCode: 1]
275 : 0 : if (data.size() >= 1) {
276 : 0 : uint8_t error_code = data[0];
277 : 0 : if (error_code == 0) {
278 : : // Success - we're registered
279 : 0 : last_heartbeat_sent_ = std::chrono::steady_clock::now();
280 : 0 : HPACTOR_LOG_INFO(log::LogCategory::kRegistrar, ActorId{0},
281 : : static_cast<uint32_t>(
282 : : log::LogEventId::kRegistrarRegister),
283 : : "registrar client registered");
284 : : }
285 : : }
286 : 0 : break;
287 : : }
288 : :
289 : 0 : case TcpMessageType::NodeJoin: {
290 : 0 : PbNodeJoinPayload msg;
291 : 0 : if (!msg.ParseFromArray(data.data(), static_cast<int>(data.size()))) {
292 : 0 : break;
293 : : }
294 : :
295 : 0 : const auto& ep_info = msg.endpoint_info();
296 : 0 : std::string endpoint_str = ep_info.endpoint();
297 : 0 : EndPoint endpoint = endpoint_ops::parse_endpoint(endpoint_str);
298 : :
299 : 0 : std::string host = ep_info.host();
300 : 0 : uint16_t tcp_port = static_cast<uint16_t>(ep_info.tcp_port());
301 : :
302 : 0 : NodeEndpoint node_ep;
303 : 0 : node_ep.identity.endpoint = endpoint;
304 : 0 : node_ep.identity.host = host;
305 : 0 : node_ep.tcp_port = tcp_port;
306 : 0 : node_ep.last_seen = std::chrono::steady_clock::now();
307 : 0 : shared_registry_->upsert_endpoint(node_ep);
308 : 0 : break;
309 : 0 : }
310 : :
311 : 0 : case TcpMessageType::NodeLeave: {
312 : 0 : PbNodeLeavePayload msg;
313 : 0 : if (!msg.ParseFromArray(data.data(), static_cast<int>(data.size()))) {
314 : 0 : break;
315 : : }
316 : :
317 : 0 : std::string endpoint_str = msg.endpoint();
318 : 0 : EndPoint endpoint = endpoint_ops::parse_endpoint(endpoint_str);
319 : 0 : shared_registry_->remove_endpoint(endpoint);
320 : 0 : break;
321 : 0 : }
322 : :
323 : 0 : case TcpMessageType::Error: {
324 : : // Error response
325 : : // Payload: [ErrorCode: 1][MessageLen: 4][Message: N]
326 : 0 : if (data.size() < 1)
327 : 0 : break;
328 : 0 : uint8_t error_code = data[0];
329 : : (void)error_code; // Could log this
330 : 0 : break;
331 : : }
332 : :
333 : 0 : case TcpMessageType::Heartbeat:
334 : : case TcpMessageType::Register:
335 : : // These are sent by us, not received
336 : 0 : break;
337 : : }
338 : 0 : }
339 : :
340 : 0 : void RegistrarClient::handle_disconnect() {
341 : 0 : connected_.store(false);
342 : :
343 : 0 : HPACTOR_LOG_WARNING(log::LogCategory::kRegistrar, ActorId{0}, 0,
344 : : "registrar heartbeat timeout");
345 : :
346 : : // Close existing connection
347 : 0 : if (server_connection_) {
348 : 0 : server_connection_->close();
349 : 0 : server_connection_.reset();
350 : : }
351 : :
352 : 0 : reconnect_attempts_++;
353 : 0 : if (reconnect_attempts_ >= kMaxReconnectAttempts && failover_callback_) {
354 : 0 : failover_callback_();
355 : 0 : return;
356 : : }
357 : :
358 : : // Schedule reconnect if still running
359 : 0 : if (running_.load() && loop_) {
360 : 0 : loop_->run_after(
361 : 0 : [this]() {
362 : 0 : if (running_.load()) {
363 : 0 : reconnect();
364 : : }
365 : 0 : },
366 : : 1000);
367 : : }
368 : : }
369 : :
370 : 0 : void RegistrarClient::reconnect() {
371 : 0 : handle_disconnect();
372 : 0 : }
373 : :
374 : : } // namespace net
375 : : } // namespace hpactor
|