Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/adt/node_identity.hpp>
18 : : #include <hpactor/net/acceptor.hpp>
19 : : #include <hpactor/net/event_loop.hpp>
20 : : #include <hpactor/net/service_discovery.hpp>
21 : : #include <hpactor/ref/actor_address.hpp>
22 : : #include <hpactor/types/types.hpp>
23 : :
24 : : #include <atomic>
25 : : #include <chrono>
26 : : #include <functional>
27 : : #include <memory>
28 : : #include <mutex>
29 : : #include <string>
30 : : #include <unordered_map>
31 : : #include <vector>
32 : :
33 : : namespace hpactor {
34 : :
35 : : namespace net {
36 : :
37 : : // -----------------------------------------------------------------------------
38 : : // RegistrarConfig - configuration for registrar
39 : : // -----------------------------------------------------------------------------
40 : : struct StaticRouteConfig {
41 : : EndPoint endpoint;
42 : : std::string address; // IP or DNS hostname (used if endpoint is empty)
43 : : uint16_t port = 0;
44 : : };
45 : :
46 : : struct RegistrarConfig {
47 : : uint16_t udp_port = 5353;
48 : : uint16_t tcp_port = 5353;
49 : 12 : std::chrono::milliseconds heartbeat_interval{5000};
50 : 12 : std::chrono::milliseconds expiration_timeout{15000};
51 : 12 : std::chrono::milliseconds probe_interval{30000};
52 : : std::vector<StaticRouteConfig> static_routes;
53 : : bool disable_server = false;
54 : : };
55 : :
56 : : // -----------------------------------------------------------------------------
57 : : // NodeEndpoint - information about a known node
58 : : // -----------------------------------------------------------------------------
59 : : struct NodeEndpoint {
60 : : NodeIdentity identity;
61 : : uint16_t tcp_port = 0;
62 : : bool is_static_route = false;
63 : : std::chrono::steady_clock::time_point last_seen;
64 : : };
65 : :
66 : : // -----------------------------------------------------------------------------
67 : : // HostResolver - hostname to IP resolution with caching
68 : : // -----------------------------------------------------------------------------
69 : : class HostResolver {
70 : : public:
71 : 14 : HostResolver() = default;
72 : :
73 : : // Resolve hostname to IP address (blocking)
74 : : std::string resolve(const std::string& hostname);
75 : :
76 : : // Async resolution - returns immediately, callback when done
77 : : void resolve_async(const std::string& hostname,
78 : : std::function<void(std::string ip)> callback);
79 : :
80 : : // Get cached IP for hostname (empty if not cached)
81 : : std::string get_cached(const std::string& hostname) const;
82 : :
83 : : // Cache hostname -> IP mapping with TTL
84 : : void cache(const std::string& hostname, const std::string& ip,
85 : : std::chrono::seconds ttl = std::chrono::seconds(300));
86 : :
87 : : // Clear expired entries
88 : : void clear_expired();
89 : :
90 : : private:
91 : : struct CacheEntry {
92 : : std::string ip;
93 : : std::chrono::steady_clock::time_point expires_at;
94 : : };
95 : :
96 : : std::unordered_map<std::string, CacheEntry> cache_;
97 : : mutable std::mutex mutex_;
98 : : };
99 : :
100 : : // -----------------------------------------------------------------------------
101 : : // NodeRegistry - registry of known nodes
102 : : // -----------------------------------------------------------------------------
103 : : class NodeRegistry {
104 : : public:
105 : : explicit NodeRegistry(const RegistrarConfig& config);
106 : :
107 : : // Add or update an endpoint
108 : : void upsert_endpoint(NodeEndpoint endpoint);
109 : :
110 : : // Remove an endpoint
111 : : bool remove_endpoint(EndPoint endpoint);
112 : :
113 : : // Get endpoint (nullptr if not found)
114 : : NodeEndpoint* get(EndPoint endpoint);
115 : :
116 : : // Check if endpoint exists
117 : : bool has(EndPoint endpoint) const;
118 : :
119 : : // Get all endpoints
120 : : std::vector<NodeEndpoint> all() const;
121 : :
122 : : // Remove expired entries
123 : : size_t remove_expired();
124 : :
125 : : private:
126 : : RegistrarConfig config_;
127 : : std::unordered_map<EndPoint, NodeEndpoint> endpoints_;
128 : : mutable std::mutex mutex_;
129 : : };
130 : :
131 : : // -----------------------------------------------------------------------------
132 : : // Registrar Protocol Messages
133 : : // -----------------------------------------------------------------------------
134 : : enum class RegistrarMessageType : uint8_t {
135 : : // TCP messages (server/client registration)
136 : : Register = 0x01,
137 : : Heartbeat = 0x02,
138 : : NodeJoin = 0x03,
139 : : NodeLeave = 0x04,
140 : : Accept = 0x05,
141 : : Error = 0x06,
142 : : // UDP messages (resolution)
143 : : ResolveQuery = 0x10,
144 : : ResolveResponse = 0x11,
145 : : };
146 : :
147 : : // Protocol constants
148 : : constexpr uint32_t RegistrarMagic = 0x48504143; // "HPAC"
149 : : constexpr uint8_t RegistrarVersion = 0x01;
150 : : constexpr size_t RegistrarHeaderSize = 12;
151 : :
152 : : // Message payloads
153 : : struct NodeAnnouncePayload {
154 : : uint16_t tcp_port;
155 : : uint16_t actor_count;
156 : : };
157 : :
158 : : struct NodeQueryPayload {
159 : : EndPoint target_endpoint;
160 : : };
161 : :
162 : : struct NodeResponsePayload {
163 : : uint16_t tcp_port;
164 : : };
165 : :
166 : : struct NodeProbePayload {
167 : : uint64_t probe_id;
168 : : uint64_t timestamp;
169 : : };
170 : :
171 : : // -----------------------------------------------------------------------------
172 : : // TCP Message Framing for RegistrarServer
173 : : // -----------------------------------------------------------------------------
174 : : constexpr uint32_t TcpRegistrarMagic = 0x48505243; // "HPRC"
175 : : constexpr uint8_t TcpRegistrarVersion = 0x01;
176 : : constexpr size_t TcpHeaderSize = 10;
177 : :
178 : : // TCP Message types (different from UDP types)
179 : : enum class TcpMessageType : uint8_t {
180 : : Register = 0x01,
181 : : Heartbeat = 0x02,
182 : : NodeJoin = 0x03,
183 : : NodeLeave = 0x04,
184 : : Accept = 0x05,
185 : : Error = 0x06,
186 : : };
187 : :
188 : : // Error codes for TCP Error messages
189 : : enum class RegistrarError : uint8_t {
190 : : None = 0,
191 : : NameTaken = 1,
192 : : InvalidMessage = 2,
193 : : };
194 : :
195 : : // -----------------------------------------------------------------------------
196 : : // Forward declarations
197 : : // -----------------------------------------------------------------------------
198 : : class RegistrarServer;
199 : : class RegistrarClient;
200 : : class NodeRegistry;
201 : :
202 : : // Forward declaration
203 : : class RegistrarConnection;
204 : : using RegistrarConnectionPtr = std::shared_ptr<RegistrarConnection>;
205 : :
206 : : // RegistrarConnection - async TCP connection for registrar protocol
207 : : class RegistrarConnection
208 : : : public std::enable_shared_from_this<RegistrarConnection> {
209 : : friend class RegistrarServer;
210 : :
211 : : public:
212 : : using message_handler =
213 : : std::function<void(TcpMessageType, const StreamBuffer&)>;
214 : : using disconnect_handler = std::function<void()>;
215 : : using send_complete_handler = std::function<void(int result)>;
216 : :
217 : : // Create from accepted server socket
218 : : static RegistrarConnectionPtr
219 : : accepted(int fd, EndPoint remote_endpoint, EventLoop* loop);
220 : :
221 : : // Create as client connection
222 : : static RegistrarConnectionPtr
223 : : connecting(int fd, EndPoint remote_endpoint, EventLoop* loop);
224 : :
225 : : ~RegistrarConnection();
226 : :
227 : : // Set handlers
228 : : void set_message_handler(message_handler h);
229 : : void set_disconnect_handler(disconnect_handler h);
230 : : void set_send_complete_handler(send_complete_handler h);
231 : :
232 : : // Send registrar message
233 : : void send_message(TcpMessageType type, const StreamBuffer& payload);
234 : :
235 : : // Close connection
236 : : void close();
237 : :
238 : : // Get remote endpoint
239 : 0 : EndPoint remote_endpoint() const {
240 : 0 : return remote_endpoint_;
241 : : }
242 : :
243 : : // Get fd
244 : 0 : int fd() const {
245 : 0 : return fd_;
246 : : }
247 : :
248 : : private:
249 : : enum class ReadState { ReadingHeader, ReadingPayload };
250 : :
251 : : RegistrarConnection(EndPoint remote_endpoint, EventLoop* loop, int fd);
252 : :
253 : : void register_with_loop();
254 : : void handle_read_event();
255 : : void handle_payload_read();
256 : : void flush_write_buffer();
257 : : void handle_send_completion(int result);
258 : :
259 : : EndPoint remote_endpoint_;
260 : : EventLoop* loop_ = nullptr;
261 : : int fd_ = -1;
262 : :
263 : : ReadState read_state_ = ReadState::ReadingHeader;
264 : : size_t header_bytes_read_ = 0;
265 : : StreamBuffer header_buffer_;
266 : :
267 : : TcpMessageType current_type_ = TcpMessageType::Register;
268 : : size_t payload_bytes_read_ = 0;
269 : : StreamBuffer payload_buffer_;
270 : :
271 : : StreamBuffer write_buffer_;
272 : : bool is_sending_ = false;
273 : :
274 : : message_handler message_handler_;
275 : : disconnect_handler disconnect_handler_;
276 : : send_complete_handler send_complete_handler_;
277 : : };
278 : :
279 : : // -----------------------------------------------------------------------------
280 : : // RegistrarServer - TCP-based authoritative registrar
281 : : // -----------------------------------------------------------------------------
282 : : class RegistrarServer {
283 : : public:
284 : : RegistrarServer(const RegistrarConfig& config, EndPoint local_endpoint,
285 : : EventLoop* loop = nullptr);
286 : : ~RegistrarServer();
287 : :
288 : : // Non-copyable
289 : : RegistrarServer(const RegistrarServer&) = delete;
290 : : RegistrarServer& operator=(const RegistrarServer&) = delete;
291 : :
292 : : // Start TCP server and UDP listener
293 : : void start();
294 : : void stop();
295 : :
296 : : // Get registry for reading
297 : 0 : NodeRegistry* registry() {
298 : 0 : return ®istry_;
299 : : }
300 : :
301 : : // Set event loop for async I/O (can be changed before start())
302 : : void set_event_loop(EventLoop* loop) {
303 : : loop_ = loop;
304 : : }
305 : :
306 : : // Handle incoming TCP connection
307 : : void handle_accept(int client_fd, EndPoint remote_endpoint);
308 : :
309 : : // Broadcast event to all connected clients
310 : : void broadcast_node_joined(EndPoint endpoint, const NodeEndpoint& ep);
311 : : void broadcast_node_left(EndPoint endpoint);
312 : :
313 : : private:
314 : : void handle_tcp_message(RegistrarConnectionPtr conn, TcpMessageType type,
315 : : const StreamBuffer& data);
316 : : void handle_disconnect(RegistrarConnectionPtr conn);
317 : :
318 : : RegistrarConfig config_;
319 : : [[maybe_unused]] EndPoint local_endpoint_;
320 : : NodeRegistry registry_;
321 : : EventLoop* loop_ = nullptr;
322 : : TcpAcceptor acceptor_;
323 : :
324 : : std::atomic<bool> running_{false};
325 : :
326 : : // Connected clients (endpoint -> connection)
327 : : std::unordered_map<EndPoint, RegistrarConnectionPtr> clients_;
328 : : // fd -> connection map for completion routing
329 : : std::unordered_map<int, RegistrarConnectionPtr> fd_to_connection_;
330 : : std::mutex clients_mutex_;
331 : : };
332 : :
333 : : // -----------------------------------------------------------------------------
334 : : // UdpRegistrar - Dual-mode registrar (server or client)
335 : : // -----------------------------------------------------------------------------
336 : : class UdpRegistrar : public IServiceDiscovery {
337 : : public:
338 : : UdpRegistrar(const RegistrarConfig& config, EndPoint local_endpoint,
339 : : EventLoop* loop = nullptr);
340 : : ~UdpRegistrar();
341 : :
342 : : // Non-copyable
343 : : UdpRegistrar(const UdpRegistrar&) = delete;
344 : : UdpRegistrar& operator=(const UdpRegistrar&) = delete;
345 : :
346 : : // Set event loop for async I/O (can be changed before start())
347 : : void set_event_loop(EventLoop* loop) {
348 : : loop_ = loop;
349 : : }
350 : :
351 : : // Start listening - determines server vs client mode based on bind result
352 : : void start() override;
353 : : void stop() override;
354 : :
355 : : // Query endpoint
356 : : NodeEndpoint* get_endpoint(EndPoint endpoint);
357 : :
358 : : // Get all known endpoints
359 : : std::vector<NodeEndpoint> get_all_endpoints() const;
360 : :
361 : : // Set callback for node online/offline events
362 : : using node_callback = std::function<void(EndPoint, bool online)>;
363 : : void set_node_callback(node_callback cb);
364 : :
365 : : // Handle incoming UDP packet (for resolution)
366 : : void handle_udp_packet(const StreamBuffer& data,
367 : : const std::string& from_host, uint16_t from_port);
368 : :
369 : : // ── IServiceDiscovery overrides ────────────────────────────────────
370 : : std::vector<Member> discover_all() const override;
371 : : const Member* discover(EndPoint ep) const override;
372 : : void announce(Member m) override;
373 : : void on_member_change(MemberChangeCallback cb) override;
374 : 1 : std::string backend_name() const override {
375 : 2 : return "udp-registrar";
376 : : }
377 : 0 : const std::unordered_map<EndPoint, Member>* raw_members() const override {
378 : 0 : return &endpoint_to_member_;
379 : : }
380 : :
381 : : private:
382 : : void start_server_mode();
383 : : void start_client_mode();
384 : : void start_server_mode_async();
385 : : void start_client_mode_async();
386 : : void setup_udp_socket();
387 : : void issue_async_recvfrom();
388 : : void handle_udp_read_ready();
389 : : void
390 : : handle_udp_recv_completion(const StreamBuffer& data,
391 : : const std::string& from_host, uint16_t from_port);
392 : : void
393 : : send_udp_response(const StreamBuffer& data, const struct sockaddr_in& dest);
394 : : void handle_resolve_query(const StreamBuffer& payload,
395 : : const std::string& from_host, uint16_t from_port);
396 : : void handle_resolve_response(const StreamBuffer& payload);
397 : : void send_resolve_response(const NodeEndpoint& endpoint,
398 : : const std::string& from_host,
399 : : uint16_t from_port) const;
400 : : void failover();
401 : :
402 : : // UDP receive state
403 : : static constexpr size_t kUdpRecvBufferSize = 65536;
404 : : StreamBuffer udp_recv_buffer_;
405 : : struct sockaddr_in udp_src_addr_;
406 : : socklen_t udp_src_addr_len_ = sizeof(udp_src_addr_);
407 : :
408 : : RegistrarConfig config_;
409 : : EndPoint local_endpoint_;
410 : : EventLoop* loop_ = nullptr;
411 : :
412 : : // Either server or client (not both)
413 : : std::unique_ptr<RegistrarServer> server_;
414 : : std::unique_ptr<RegistrarClient> client_;
415 : :
416 : : // Client mode registry (populated from static routes)
417 : : std::unique_ptr<NodeRegistry> client_registry_;
418 : :
419 : : // UDP socket for sending resolution responses
420 : : int udp_socket_ = -1;
421 : :
422 : : node_callback node_callback_;
423 : :
424 : : // IServiceDiscovery state
425 : : static Member to_member(const NodeEndpoint& ep);
426 : : mutable std::unordered_map<EndPoint, Member> endpoint_to_member_;
427 : : MemberChangeCallback member_change_cb_;
428 : : };
429 : :
430 : : // -----------------------------------------------------------------------------
431 : : // RegistrarClient - TCP client for connecting to RegistrarServer
432 : : // -----------------------------------------------------------------------------
433 : : class RegistrarClient {
434 : : public:
435 : : RegistrarClient(const RegistrarConfig& config, EndPoint local_endpoint,
436 : : EndPoint server_endpoint, NodeRegistry* shared_registry,
437 : : EventLoop* loop = nullptr);
438 : : ~RegistrarClient();
439 : :
440 : : // Non-copyable
441 : : RegistrarClient(const RegistrarClient&) = delete;
442 : : RegistrarClient& operator=(const RegistrarClient&) = delete;
443 : :
444 : : void start();
445 : : void stop();
446 : :
447 : : // Set event loop for async I/O (can be changed before start())
448 : : void set_event_loop(EventLoop* loop) {
449 : : loop_ = loop;
450 : : }
451 : :
452 : : // Set acceptors for registration announcement
453 : : void set_acceptors(std::vector<AcceptorInfo> acceptors);
454 : :
455 : : // Set callback invoked after repeated reconnect failures (for failover)
456 : 0 : void set_failover_callback(std::function<void()> cb) {
457 : 0 : failover_callback_ = std::move(cb);
458 : 0 : }
459 : :
460 : : // Reconnect to server (used after disconnection)
461 : : void reconnect();
462 : :
463 : : // Check if connected
464 : : bool is_connected() const {
465 : : return connected_.load();
466 : : }
467 : :
468 : : private:
469 : : void attempt_connection();
470 : : void send_registration();
471 : :
472 : : // Handle server messages
473 : : void handle_server_message(TcpMessageType type, const StreamBuffer& data);
474 : : void handle_disconnect();
475 : :
476 : : RegistrarConfig config_;
477 : : EndPoint local_endpoint_;
478 : : EndPoint server_endpoint_;
479 : : NodeRegistry* shared_registry_; // Not owned
480 : : EventLoop* loop_ = nullptr;
481 : :
482 : : RegistrarConnectionPtr server_connection_;
483 : : std::atomic<bool> running_{false};
484 : : std::atomic<bool> connected_{false};
485 : :
486 : : // Timer handles
487 : : uint64_t heartbeat_timer_ = 0;
488 : :
489 : : // For heartbeat tracking
490 : : std::chrono::steady_clock::time_point last_heartbeat_sent_;
491 : :
492 : : // Acceptors announced during registration
493 : : std::vector<AcceptorInfo> acceptors_;
494 : :
495 : : // Failover support
496 : : static constexpr int kMaxReconnectAttempts = 5;
497 : : int reconnect_attempts_ = 0;
498 : : std::function<void()> failover_callback_;
499 : : };
500 : :
501 : : } // namespace net
502 : : } // namespace hpactor
|