LCOV - code coverage report
Current view: top level - include/hpactor/net - registrar.hpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 35.3 % 17 6
Test Date: 2026-05-20 02:24:49 Functions: 28.6 % 7 2
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #pragma once
      16                 :             : 
      17                 :             : #include <hpactor/adt/node_identity.hpp>
      18                 :             : #include <hpactor/net/acceptor.hpp>
      19                 :             : #include <hpactor/net/event_loop.hpp>
      20                 :             : #include <hpactor/net/service_discovery.hpp>
      21                 :             : #include <hpactor/ref/actor_address.hpp>
      22                 :             : #include <hpactor/types/types.hpp>
      23                 :             : 
      24                 :             : #include <atomic>
      25                 :             : #include <chrono>
      26                 :             : #include <functional>
      27                 :             : #include <memory>
      28                 :             : #include <mutex>
      29                 :             : #include <string>
      30                 :             : #include <unordered_map>
      31                 :             : #include <vector>
      32                 :             : 
      33                 :             : namespace hpactor {
      34                 :             : 
      35                 :             : namespace net {
      36                 :             : 
      37                 :             : // -----------------------------------------------------------------------------
      38                 :             : // RegistrarConfig - configuration for registrar
      39                 :             : // -----------------------------------------------------------------------------
      40                 :             : struct StaticRouteConfig {
      41                 :             :     EndPoint endpoint;
      42                 :             :     std::string address; // IP or DNS hostname (used if endpoint is empty)
      43                 :             :     uint16_t port = 0;
      44                 :             : };
      45                 :             : 
      46                 :             : struct RegistrarConfig {
      47                 :             :     uint16_t udp_port = 5353;
      48                 :             :     uint16_t tcp_port = 5353;
      49                 :          12 :     std::chrono::milliseconds heartbeat_interval{5000};
      50                 :          12 :     std::chrono::milliseconds expiration_timeout{15000};
      51                 :          12 :     std::chrono::milliseconds probe_interval{30000};
      52                 :             :     std::vector<StaticRouteConfig> static_routes;
      53                 :             :     bool disable_server = false;
      54                 :             : };
      55                 :             : 
      56                 :             : // -----------------------------------------------------------------------------
      57                 :             : // NodeEndpoint - information about a known node
      58                 :             : // -----------------------------------------------------------------------------
      59                 :             : struct NodeEndpoint {
      60                 :             :     NodeIdentity identity;
      61                 :             :     uint16_t tcp_port = 0;
      62                 :             :     bool is_static_route = false;
      63                 :             :     std::chrono::steady_clock::time_point last_seen;
      64                 :             : };
      65                 :             : 
      66                 :             : // -----------------------------------------------------------------------------
      67                 :             : // HostResolver - hostname to IP resolution with caching
      68                 :             : // -----------------------------------------------------------------------------
      69                 :             : class HostResolver {
      70                 :             :   public:
      71                 :          14 :     HostResolver() = default;
      72                 :             : 
      73                 :             :     // Resolve hostname to IP address (blocking)
      74                 :             :     std::string resolve(const std::string& hostname);
      75                 :             : 
      76                 :             :     // Async resolution - returns immediately, callback when done
      77                 :             :     void resolve_async(const std::string& hostname,
      78                 :             :                        std::function<void(std::string ip)> callback);
      79                 :             : 
      80                 :             :     // Get cached IP for hostname (empty if not cached)
      81                 :             :     std::string get_cached(const std::string& hostname) const;
      82                 :             : 
      83                 :             :     // Cache hostname -> IP mapping with TTL
      84                 :             :     void cache(const std::string& hostname, const std::string& ip,
      85                 :             :                std::chrono::seconds ttl = std::chrono::seconds(300));
      86                 :             : 
      87                 :             :     // Clear expired entries
      88                 :             :     void clear_expired();
      89                 :             : 
      90                 :             :   private:
      91                 :             :     struct CacheEntry {
      92                 :             :         std::string ip;
      93                 :             :         std::chrono::steady_clock::time_point expires_at;
      94                 :             :     };
      95                 :             : 
      96                 :             :     std::unordered_map<std::string, CacheEntry> cache_;
      97                 :             :     mutable std::mutex mutex_;
      98                 :             : };
      99                 :             : 
     100                 :             : // -----------------------------------------------------------------------------
     101                 :             : // NodeRegistry - registry of known nodes
     102                 :             : // -----------------------------------------------------------------------------
     103                 :             : class NodeRegistry {
     104                 :             :   public:
     105                 :             :     explicit NodeRegistry(const RegistrarConfig& config);
     106                 :             : 
     107                 :             :     // Add or update an endpoint
     108                 :             :     void upsert_endpoint(NodeEndpoint endpoint);
     109                 :             : 
     110                 :             :     // Remove an endpoint
     111                 :             :     bool remove_endpoint(EndPoint endpoint);
     112                 :             : 
     113                 :             :     // Get endpoint (nullptr if not found)
     114                 :             :     NodeEndpoint* get(EndPoint endpoint);
     115                 :             : 
     116                 :             :     // Check if endpoint exists
     117                 :             :     bool has(EndPoint endpoint) const;
     118                 :             : 
     119                 :             :     // Get all endpoints
     120                 :             :     std::vector<NodeEndpoint> all() const;
     121                 :             : 
     122                 :             :     // Remove expired entries
     123                 :             :     size_t remove_expired();
     124                 :             : 
     125                 :             :   private:
     126                 :             :     RegistrarConfig config_;
     127                 :             :     std::unordered_map<EndPoint, NodeEndpoint> endpoints_;
     128                 :             :     mutable std::mutex mutex_;
     129                 :             : };
     130                 :             : 
     131                 :             : // -----------------------------------------------------------------------------
     132                 :             : // Registrar Protocol Messages
     133                 :             : // -----------------------------------------------------------------------------
     134                 :             : enum class RegistrarMessageType : uint8_t {
     135                 :             :     // TCP messages (server/client registration)
     136                 :             :     Register = 0x01,
     137                 :             :     Heartbeat = 0x02,
     138                 :             :     NodeJoin = 0x03,
     139                 :             :     NodeLeave = 0x04,
     140                 :             :     Accept = 0x05,
     141                 :             :     Error = 0x06,
     142                 :             :     // UDP messages (resolution)
     143                 :             :     ResolveQuery = 0x10,
     144                 :             :     ResolveResponse = 0x11,
     145                 :             : };
     146                 :             : 
     147                 :             : // Protocol constants
     148                 :             : constexpr uint32_t RegistrarMagic = 0x48504143; // "HPAC"
     149                 :             : constexpr uint8_t RegistrarVersion = 0x01;
     150                 :             : constexpr size_t RegistrarHeaderSize = 12;
     151                 :             : 
     152                 :             : // Message payloads
     153                 :             : struct NodeAnnouncePayload {
     154                 :             :     uint16_t tcp_port;
     155                 :             :     uint16_t actor_count;
     156                 :             : };
     157                 :             : 
     158                 :             : struct NodeQueryPayload {
     159                 :             :     EndPoint target_endpoint;
     160                 :             : };
     161                 :             : 
     162                 :             : struct NodeResponsePayload {
     163                 :             :     uint16_t tcp_port;
     164                 :             : };
     165                 :             : 
     166                 :             : struct NodeProbePayload {
     167                 :             :     uint64_t probe_id;
     168                 :             :     uint64_t timestamp;
     169                 :             : };
     170                 :             : 
     171                 :             : // -----------------------------------------------------------------------------
     172                 :             : // TCP Message Framing for RegistrarServer
     173                 :             : // -----------------------------------------------------------------------------
     174                 :             : constexpr uint32_t TcpRegistrarMagic = 0x48505243; // "HPRC"
     175                 :             : constexpr uint8_t TcpRegistrarVersion = 0x01;
     176                 :             : constexpr size_t TcpHeaderSize = 10;
     177                 :             : 
     178                 :             : // TCP Message types (different from UDP types)
     179                 :             : enum class TcpMessageType : uint8_t {
     180                 :             :     Register = 0x01,
     181                 :             :     Heartbeat = 0x02,
     182                 :             :     NodeJoin = 0x03,
     183                 :             :     NodeLeave = 0x04,
     184                 :             :     Accept = 0x05,
     185                 :             :     Error = 0x06,
     186                 :             : };
     187                 :             : 
     188                 :             : // Error codes for TCP Error messages
     189                 :             : enum class RegistrarError : uint8_t {
     190                 :             :     None = 0,
     191                 :             :     NameTaken = 1,
     192                 :             :     InvalidMessage = 2,
     193                 :             : };
     194                 :             : 
     195                 :             : // -----------------------------------------------------------------------------
     196                 :             : // Forward declarations
     197                 :             : // -----------------------------------------------------------------------------
     198                 :             : class RegistrarServer;
     199                 :             : class RegistrarClient;
     200                 :             : class NodeRegistry;
     201                 :             : 
     202                 :             : // Forward declaration
     203                 :             : class RegistrarConnection;
     204                 :             : using RegistrarConnectionPtr = std::shared_ptr<RegistrarConnection>;
     205                 :             : 
     206                 :             : // RegistrarConnection - async TCP connection for registrar protocol
     207                 :             : class RegistrarConnection
     208                 :             :     : public std::enable_shared_from_this<RegistrarConnection> {
     209                 :             :     friend class RegistrarServer;
     210                 :             : 
     211                 :             :   public:
     212                 :             :     using message_handler =
     213                 :             :         std::function<void(TcpMessageType, const StreamBuffer&)>;
     214                 :             :     using disconnect_handler = std::function<void()>;
     215                 :             :     using send_complete_handler = std::function<void(int result)>;
     216                 :             : 
     217                 :             :     // Create from accepted server socket
     218                 :             :     static RegistrarConnectionPtr
     219                 :             :     accepted(int fd, EndPoint remote_endpoint, EventLoop* loop);
     220                 :             : 
     221                 :             :     // Create as client connection
     222                 :             :     static RegistrarConnectionPtr
     223                 :             :     connecting(int fd, EndPoint remote_endpoint, EventLoop* loop);
     224                 :             : 
     225                 :             :     ~RegistrarConnection();
     226                 :             : 
     227                 :             :     // Set handlers
     228                 :             :     void set_message_handler(message_handler h);
     229                 :             :     void set_disconnect_handler(disconnect_handler h);
     230                 :             :     void set_send_complete_handler(send_complete_handler h);
     231                 :             : 
     232                 :             :     // Send registrar message
     233                 :             :     void send_message(TcpMessageType type, const StreamBuffer& payload);
     234                 :             : 
     235                 :             :     // Close connection
     236                 :             :     void close();
     237                 :             : 
     238                 :             :     // Get remote endpoint
     239                 :           0 :     EndPoint remote_endpoint() const {
     240                 :           0 :         return remote_endpoint_;
     241                 :             :     }
     242                 :             : 
     243                 :             :     // Get fd
     244                 :           0 :     int fd() const {
     245                 :           0 :         return fd_;
     246                 :             :     }
     247                 :             : 
     248                 :             :   private:
     249                 :             :     enum class ReadState { ReadingHeader, ReadingPayload };
     250                 :             : 
     251                 :             :     RegistrarConnection(EndPoint remote_endpoint, EventLoop* loop, int fd);
     252                 :             : 
     253                 :             :     void register_with_loop();
     254                 :             :     void handle_read_event();
     255                 :             :     void handle_payload_read();
     256                 :             :     void flush_write_buffer();
     257                 :             :     void handle_send_completion(int result);
     258                 :             : 
     259                 :             :     EndPoint remote_endpoint_;
     260                 :             :     EventLoop* loop_ = nullptr;
     261                 :             :     int fd_ = -1;
     262                 :             : 
     263                 :             :     ReadState read_state_ = ReadState::ReadingHeader;
     264                 :             :     size_t header_bytes_read_ = 0;
     265                 :             :     StreamBuffer header_buffer_;
     266                 :             : 
     267                 :             :     TcpMessageType current_type_ = TcpMessageType::Register;
     268                 :             :     size_t payload_bytes_read_ = 0;
     269                 :             :     StreamBuffer payload_buffer_;
     270                 :             : 
     271                 :             :     StreamBuffer write_buffer_;
     272                 :             :     bool is_sending_ = false;
     273                 :             : 
     274                 :             :     message_handler message_handler_;
     275                 :             :     disconnect_handler disconnect_handler_;
     276                 :             :     send_complete_handler send_complete_handler_;
     277                 :             : };
     278                 :             : 
     279                 :             : // -----------------------------------------------------------------------------
     280                 :             : // RegistrarServer - TCP-based authoritative registrar
     281                 :             : // -----------------------------------------------------------------------------
     282                 :             : class RegistrarServer {
     283                 :             :   public:
     284                 :             :     RegistrarServer(const RegistrarConfig& config, EndPoint local_endpoint,
     285                 :             :                     EventLoop* loop = nullptr);
     286                 :             :     ~RegistrarServer();
     287                 :             : 
     288                 :             :     // Non-copyable
     289                 :             :     RegistrarServer(const RegistrarServer&) = delete;
     290                 :             :     RegistrarServer& operator=(const RegistrarServer&) = delete;
     291                 :             : 
     292                 :             :     // Start TCP server and UDP listener
     293                 :             :     void start();
     294                 :             :     void stop();
     295                 :             : 
     296                 :             :     // Get registry for reading
     297                 :           0 :     NodeRegistry* registry() {
     298                 :           0 :         return &registry_;
     299                 :             :     }
     300                 :             : 
     301                 :             :     // Set event loop for async I/O (can be changed before start())
     302                 :             :     void set_event_loop(EventLoop* loop) {
     303                 :             :         loop_ = loop;
     304                 :             :     }
     305                 :             : 
     306                 :             :     // Handle incoming TCP connection
     307                 :             :     void handle_accept(int client_fd, EndPoint remote_endpoint);
     308                 :             : 
     309                 :             :     // Broadcast event to all connected clients
     310                 :             :     void broadcast_node_joined(EndPoint endpoint, const NodeEndpoint& ep);
     311                 :             :     void broadcast_node_left(EndPoint endpoint);
     312                 :             : 
     313                 :             :   private:
     314                 :             :     void handle_tcp_message(RegistrarConnectionPtr conn, TcpMessageType type,
     315                 :             :                             const StreamBuffer& data);
     316                 :             :     void handle_disconnect(RegistrarConnectionPtr conn);
     317                 :             : 
     318                 :             :     RegistrarConfig config_;
     319                 :             :     [[maybe_unused]] EndPoint local_endpoint_;
     320                 :             :     NodeRegistry registry_;
     321                 :             :     EventLoop* loop_ = nullptr;
     322                 :             :     TcpAcceptor acceptor_;
     323                 :             : 
     324                 :             :     std::atomic<bool> running_{false};
     325                 :             : 
     326                 :             :     // Connected clients (endpoint -> connection)
     327                 :             :     std::unordered_map<EndPoint, RegistrarConnectionPtr> clients_;
     328                 :             :     // fd -> connection map for completion routing
     329                 :             :     std::unordered_map<int, RegistrarConnectionPtr> fd_to_connection_;
     330                 :             :     std::mutex clients_mutex_;
     331                 :             : };
     332                 :             : 
     333                 :             : // -----------------------------------------------------------------------------
     334                 :             : // UdpRegistrar - Dual-mode registrar (server or client)
     335                 :             : // -----------------------------------------------------------------------------
     336                 :             : class UdpRegistrar : public IServiceDiscovery {
     337                 :             :   public:
     338                 :             :     UdpRegistrar(const RegistrarConfig& config, EndPoint local_endpoint,
     339                 :             :                  EventLoop* loop = nullptr);
     340                 :             :     ~UdpRegistrar();
     341                 :             : 
     342                 :             :     // Non-copyable
     343                 :             :     UdpRegistrar(const UdpRegistrar&) = delete;
     344                 :             :     UdpRegistrar& operator=(const UdpRegistrar&) = delete;
     345                 :             : 
     346                 :             :     // Set event loop for async I/O (can be changed before start())
     347                 :             :     void set_event_loop(EventLoop* loop) {
     348                 :             :         loop_ = loop;
     349                 :             :     }
     350                 :             : 
     351                 :             :     // Start listening - determines server vs client mode based on bind result
     352                 :             :     void start() override;
     353                 :             :     void stop() override;
     354                 :             : 
     355                 :             :     // Query endpoint
     356                 :             :     NodeEndpoint* get_endpoint(EndPoint endpoint);
     357                 :             : 
     358                 :             :     // Get all known endpoints
     359                 :             :     std::vector<NodeEndpoint> get_all_endpoints() const;
     360                 :             : 
     361                 :             :     // Set callback for node online/offline events
     362                 :             :     using node_callback = std::function<void(EndPoint, bool online)>;
     363                 :             :     void set_node_callback(node_callback cb);
     364                 :             : 
     365                 :             :     // Handle incoming UDP packet (for resolution)
     366                 :             :     void handle_udp_packet(const StreamBuffer& data,
     367                 :             :                            const std::string& from_host, uint16_t from_port);
     368                 :             : 
     369                 :             :     // ── IServiceDiscovery overrides ────────────────────────────────────
     370                 :             :     std::vector<Member> discover_all() const override;
     371                 :             :     const Member* discover(EndPoint ep) const override;
     372                 :             :     void announce(Member m) override;
     373                 :             :     void on_member_change(MemberChangeCallback cb) override;
     374                 :           1 :     std::string backend_name() const override {
     375                 :           2 :         return "udp-registrar";
     376                 :             :     }
     377                 :           0 :     const std::unordered_map<EndPoint, Member>* raw_members() const override {
     378                 :           0 :         return &endpoint_to_member_;
     379                 :             :     }
     380                 :             : 
     381                 :             :   private:
     382                 :             :     void start_server_mode();
     383                 :             :     void start_client_mode();
     384                 :             :     void start_server_mode_async();
     385                 :             :     void start_client_mode_async();
     386                 :             :     void setup_udp_socket();
     387                 :             :     void issue_async_recvfrom();
     388                 :             :     void handle_udp_read_ready();
     389                 :             :     void
     390                 :             :     handle_udp_recv_completion(const StreamBuffer& data,
     391                 :             :                                const std::string& from_host, uint16_t from_port);
     392                 :             :     void
     393                 :             :     send_udp_response(const StreamBuffer& data, const struct sockaddr_in& dest);
     394                 :             :     void handle_resolve_query(const StreamBuffer& payload,
     395                 :             :                               const std::string& from_host, uint16_t from_port);
     396                 :             :     void handle_resolve_response(const StreamBuffer& payload);
     397                 :             :     void send_resolve_response(const NodeEndpoint& endpoint,
     398                 :             :                                const std::string& from_host,
     399                 :             :                                uint16_t from_port) const;
     400                 :             :     void failover();
     401                 :             : 
     402                 :             :     // UDP receive state
     403                 :             :     static constexpr size_t kUdpRecvBufferSize = 65536;
     404                 :             :     StreamBuffer udp_recv_buffer_;
     405                 :             :     struct sockaddr_in udp_src_addr_;
     406                 :             :     socklen_t udp_src_addr_len_ = sizeof(udp_src_addr_);
     407                 :             : 
     408                 :             :     RegistrarConfig config_;
     409                 :             :     EndPoint local_endpoint_;
     410                 :             :     EventLoop* loop_ = nullptr;
     411                 :             : 
     412                 :             :     // Either server or client (not both)
     413                 :             :     std::unique_ptr<RegistrarServer> server_;
     414                 :             :     std::unique_ptr<RegistrarClient> client_;
     415                 :             : 
     416                 :             :     // Client mode registry (populated from static routes)
     417                 :             :     std::unique_ptr<NodeRegistry> client_registry_;
     418                 :             : 
     419                 :             :     // UDP socket for sending resolution responses
     420                 :             :     int udp_socket_ = -1;
     421                 :             : 
     422                 :             :     node_callback node_callback_;
     423                 :             : 
     424                 :             :     // IServiceDiscovery state
     425                 :             :     static Member to_member(const NodeEndpoint& ep);
     426                 :             :     mutable std::unordered_map<EndPoint, Member> endpoint_to_member_;
     427                 :             :     MemberChangeCallback member_change_cb_;
     428                 :             : };
     429                 :             : 
     430                 :             : // -----------------------------------------------------------------------------
     431                 :             : // RegistrarClient - TCP client for connecting to RegistrarServer
     432                 :             : // -----------------------------------------------------------------------------
     433                 :             : class RegistrarClient {
     434                 :             :   public:
     435                 :             :     RegistrarClient(const RegistrarConfig& config, EndPoint local_endpoint,
     436                 :             :                     EndPoint server_endpoint, NodeRegistry* shared_registry,
     437                 :             :                     EventLoop* loop = nullptr);
     438                 :             :     ~RegistrarClient();
     439                 :             : 
     440                 :             :     // Non-copyable
     441                 :             :     RegistrarClient(const RegistrarClient&) = delete;
     442                 :             :     RegistrarClient& operator=(const RegistrarClient&) = delete;
     443                 :             : 
     444                 :             :     void start();
     445                 :             :     void stop();
     446                 :             : 
     447                 :             :     // Set event loop for async I/O (can be changed before start())
     448                 :             :     void set_event_loop(EventLoop* loop) {
     449                 :             :         loop_ = loop;
     450                 :             :     }
     451                 :             : 
     452                 :             :     // Set acceptors for registration announcement
     453                 :             :     void set_acceptors(std::vector<AcceptorInfo> acceptors);
     454                 :             : 
     455                 :             :     // Set callback invoked after repeated reconnect failures (for failover)
     456                 :           0 :     void set_failover_callback(std::function<void()> cb) {
     457                 :           0 :         failover_callback_ = std::move(cb);
     458                 :           0 :     }
     459                 :             : 
     460                 :             :     // Reconnect to server (used after disconnection)
     461                 :             :     void reconnect();
     462                 :             : 
     463                 :             :     // Check if connected
     464                 :             :     bool is_connected() const {
     465                 :             :         return connected_.load();
     466                 :             :     }
     467                 :             : 
     468                 :             :   private:
     469                 :             :     void attempt_connection();
     470                 :             :     void send_registration();
     471                 :             : 
     472                 :             :     // Handle server messages
     473                 :             :     void handle_server_message(TcpMessageType type, const StreamBuffer& data);
     474                 :             :     void handle_disconnect();
     475                 :             : 
     476                 :             :     RegistrarConfig config_;
     477                 :             :     EndPoint local_endpoint_;
     478                 :             :     EndPoint server_endpoint_;
     479                 :             :     NodeRegistry* shared_registry_; // Not owned
     480                 :             :     EventLoop* loop_ = nullptr;
     481                 :             : 
     482                 :             :     RegistrarConnectionPtr server_connection_;
     483                 :             :     std::atomic<bool> running_{false};
     484                 :             :     std::atomic<bool> connected_{false};
     485                 :             : 
     486                 :             :     // Timer handles
     487                 :             :     uint64_t heartbeat_timer_ = 0;
     488                 :             : 
     489                 :             :     // For heartbeat tracking
     490                 :             :     std::chrono::steady_clock::time_point last_heartbeat_sent_;
     491                 :             : 
     492                 :             :     // Acceptors announced during registration
     493                 :             :     std::vector<AcceptorInfo> acceptors_;
     494                 :             : 
     495                 :             :     // Failover support
     496                 :             :     static constexpr int kMaxReconnectAttempts = 5;
     497                 :             :     int reconnect_attempts_ = 0;
     498                 :             :     std::function<void()> failover_callback_;
     499                 :             : };
     500                 :             : 
     501                 :             : } // namespace net
     502                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1