LCOV - code coverage report
Current view: top level - src/net - registrar_client.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 0.0 % 200 0
Test Date: 2026-05-20 02:24:49 Functions: 0.0 % 18 0
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/net/registrar.hpp>
      16                 :             : #include <hpactor/net/registrar_serialization.hpp>
      17                 :             : 
      18                 :             : #include <arpa/inet.h>
      19                 :             : #include <ifaddrs.h>
      20                 :             : #include <net/if.h>
      21                 :             : #include <netinet/in.h>
      22                 :             : #include <netinet/tcp.h>
      23                 :             : #include <sys/socket.h>
      24                 :             : #include <unistd.h>
      25                 :             : 
      26                 :             : #include <cstring>
      27                 :             : 
      28                 :             : #include <hpactor/log/logger.hpp>
      29                 :             : 
      30                 :             : namespace hpactor {
      31                 :             : 
      32                 :             : namespace net {
      33                 :             : 
      34                 :             : // -----------------------------------------------------------------------------
      35                 :             : // Helper Functions
      36                 :             : // -----------------------------------------------------------------------------
      37                 :             : 
      38                 :           0 : static std::string get_local_ip() {
      39                 :           0 :     struct ifaddrs* ifaddr = nullptr;
      40                 :           0 :     if (getifaddrs(&ifaddr) == -1) {
      41                 :           0 :         return "127.0.0.1"; // Fallback
      42                 :             :     }
      43                 :             : 
      44                 :             :     // Prefer non-loopback, up, running interfaces
      45                 :           0 :     std::string result;
      46                 :           0 :     for (struct ifaddrs* ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) {
      47                 :           0 :         if (ifa->ifa_addr == nullptr)
      48                 :           0 :             continue;
      49                 :           0 :         if (!(ifa->ifa_flags & IFF_UP))
      50                 :           0 :             continue;
      51                 :           0 :         if (!(ifa->ifa_flags & IFF_RUNNING))
      52                 :           0 :             continue;
      53                 :           0 :         if (ifa->ifa_flags & IFF_LOOPBACK)
      54                 :           0 :             continue;
      55                 :           0 :         if (ifa->ifa_addr->sa_family != AF_INET)
      56                 :           0 :             continue;
      57                 :             : 
      58                 :           0 :         struct sockaddr_in* addr =
      59                 :             :             reinterpret_cast<struct sockaddr_in*>(ifa->ifa_addr);
      60                 :             :         char ip[INET_ADDRSTRLEN];
      61                 :           0 :         if (inet_ntop(AF_INET, &addr->sin_addr, ip, sizeof(ip)) != nullptr) {
      62                 :           0 :             result = ip;
      63                 :           0 :             break; // Take first valid non-loopback
      64                 :             :         }
      65                 :             :     }
      66                 :             : 
      67                 :           0 :     freeifaddrs(ifaddr);
      68                 :           0 :     return result.empty() ? "127.0.0.1" : result;
      69                 :           0 : }
      70                 :             : 
      71                 :             : // -----------------------------------------------------------------------------
      72                 :             : // RegistrarClient Implementation
      73                 :             : // -----------------------------------------------------------------------------
      74                 :             : 
      75                 :           0 : void RegistrarClient::set_acceptors(std::vector<AcceptorInfo> acceptors) {
      76                 :           0 :     acceptors_ = std::move(acceptors);
      77                 :           0 : }
      78                 :             : 
      79                 :           0 : RegistrarClient::RegistrarClient(const RegistrarConfig& config,
      80                 :             :                                  EndPoint local_endpoint, EndPoint server_endpoint,
      81                 :           0 :                                  NodeRegistry* shared_registry, EventLoop* loop)
      82                 :           0 :     : config_(config), local_endpoint_(local_endpoint),
      83                 :           0 :       server_endpoint_(server_endpoint), shared_registry_(shared_registry),
      84                 :           0 :       loop_(loop), last_heartbeat_sent_(std::chrono::steady_clock::now()) {}
      85                 :             : 
      86                 :           0 : RegistrarClient::~RegistrarClient() {
      87                 :           0 :     stop();
      88                 :           0 : }
      89                 :             : 
      90                 :           0 : void RegistrarClient::start() {
      91                 :           0 :     if (running_.load()) {
      92                 :           0 :         return;
      93                 :             :     }
      94                 :             : 
      95                 :           0 :     running_.store(true);
      96                 :           0 :     connected_.store(false);
      97                 :             : 
      98                 :           0 :     if (loop_) {
      99                 :             :         // Schedule heartbeat using EventLoop
     100                 :           0 :         heartbeat_timer_ = loop_->run_every(
     101                 :           0 :             [this]() {
     102                 :           0 :                 if (connected_.load() && server_connection_) {
     103                 :             :                     // Build heartbeat message (no payload, just header)
     104                 :           0 :                     StreamBuffer message;
     105                 :           0 :                     message.resize(TcpHeaderSize);
     106                 :             : 
     107                 :           0 :                     uint32_t magic_be = htonl(TcpRegistrarMagic);
     108                 :           0 :                     memcpy(message.data(), &magic_be, 4);
     109                 :           0 :                     message[4] = TcpRegistrarVersion;
     110                 :           0 :                     message[5] = static_cast<uint8_t>(TcpMessageType::Heartbeat);
     111                 :           0 :                     uint32_t len_be = htonl(0);
     112                 :           0 :                     memcpy(message.data() + 6, &len_be, 4);
     113                 :             : 
     114                 :           0 :                     server_connection_->send_message(TcpMessageType::Heartbeat,
     115                 :           0 :                                                      StreamBuffer{});
     116                 :           0 :                 }
     117                 :           0 :             },
     118                 :           0 :             static_cast<int>(config_.heartbeat_interval.count()));
     119                 :             :     }
     120                 :             : 
     121                 :             :     // Start connection attempts
     122                 :           0 :     attempt_connection();
     123                 :             : }
     124                 :             : 
     125                 :           0 : void RegistrarClient::stop() {
     126                 :           0 :     if (!running_.load()) {
     127                 :           0 :         return;
     128                 :             :     }
     129                 :             : 
     130                 :           0 :     running_.store(false);
     131                 :           0 :     connected_.store(false);
     132                 :             : 
     133                 :             :     // Cancel EventLoop timers
     134                 :           0 :     if (loop_) {
     135                 :           0 :         if (heartbeat_timer_ != 0) {
     136                 :           0 :             loop_->cancel_timer(heartbeat_timer_);
     137                 :           0 :             heartbeat_timer_ = 0;
     138                 :             :         }
     139                 :             :     }
     140                 :             : 
     141                 :             :     // Close server connection
     142                 :           0 :     if (server_connection_) {
     143                 :           0 :         server_connection_->close();
     144                 :           0 :         server_connection_.reset();
     145                 :             :     }
     146                 :             : }
     147                 :             : 
     148                 :           0 : void RegistrarClient::attempt_connection() {
     149                 :           0 :     if (!running_.load()) {
     150                 :           0 :         return;
     151                 :             :     }
     152                 :             : 
     153                 :             :     // Get server endpoint from registry
     154                 :           0 :     NodeEndpoint* server_ep = shared_registry_->get(server_endpoint_);
     155                 :           0 :     if (!server_ep) {
     156                 :             :         // Schedule retry if we have an event loop
     157                 :           0 :         if (loop_) {
     158                 :           0 :             loop_->run_after(
     159                 :           0 :                 [this]() {
     160                 :           0 :                     if (running_.load()) {
     161                 :           0 :                         attempt_connection();
     162                 :             :                     }
     163                 :           0 :                 },
     164                 :             :                 1000);
     165                 :             :         }
     166                 :           0 :         return;
     167                 :             :     }
     168                 :             : 
     169                 :             :     // Resolve server hostname
     170                 :           0 :     std::string server_ip = server_ep->identity.host;
     171                 :             :     struct in_addr addr;
     172                 :           0 :     if (inet_pton(AF_INET, server_ip.c_str(), &addr) != 1) {
     173                 :             :         // Try to resolve hostname
     174                 :           0 :         HostResolver resolver;
     175                 :           0 :         server_ip = resolver.resolve(server_ep->identity.host);
     176                 :           0 :         if (server_ip.empty()) {
     177                 :             :             // Schedule retry
     178                 :           0 :             if (loop_) {
     179                 :           0 :                 loop_->run_after(
     180                 :           0 :                     [this]() {
     181                 :           0 :                         if (running_.load()) {
     182                 :           0 :                             attempt_connection();
     183                 :             :                         }
     184                 :           0 :                     },
     185                 :             :                     1000);
     186                 :             :             }
     187                 :           0 :             return;
     188                 :             :         }
     189                 :           0 :     }
     190                 :             : 
     191                 :             :     // Create TCP socket
     192                 :           0 :     int sock = socket(AF_INET, SOCK_STREAM, 0);
     193                 :           0 :     if (sock < 0) {
     194                 :           0 :         return;
     195                 :             :     }
     196                 :             : 
     197                 :             :     // Set TCP_NODELAY for lower latency
     198                 :           0 :     int nodelay = 1;
     199                 :           0 :     setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
     200                 :             : 
     201                 :             :     // Connect to server
     202                 :             :     struct sockaddr_in server_addr;
     203                 :           0 :     memset(&server_addr, 0, sizeof(server_addr));
     204                 :           0 :     server_addr.sin_family = AF_INET;
     205                 :           0 :     server_addr.sin_port = htons(server_ep->tcp_port);
     206                 :             : 
     207                 :           0 :     if (inet_pton(AF_INET, server_ip.c_str(), &server_addr.sin_addr) <= 0) {
     208                 :           0 :         close(sock);
     209                 :           0 :         return;
     210                 :             :     }
     211                 :             : 
     212                 :           0 :     if (::connect(sock, reinterpret_cast<struct sockaddr*>(&server_addr),
     213                 :           0 :                   sizeof(server_addr)) < 0) {
     214                 :           0 :         close(sock);
     215                 :             :         // Schedule retry
     216                 :           0 :         if (loop_) {
     217                 :           0 :             loop_->run_after(
     218                 :           0 :                 [this]() {
     219                 :           0 :                     if (running_.load()) {
     220                 :           0 :                         attempt_connection();
     221                 :             :                     }
     222                 :           0 :                 },
     223                 :             :                 1000);
     224                 :             :         }
     225                 :           0 :         return;
     226                 :             :     }
     227                 :             : 
     228                 :             :     // Create RegistrarConnection wrapper
     229                 :             :     server_connection_ =
     230                 :           0 :         RegistrarConnection::connecting(sock, server_endpoint_, loop_);
     231                 :             : 
     232                 :             :     // Set up message handler
     233                 :           0 :     server_connection_->set_message_handler(
     234                 :           0 :         [this](TcpMessageType type, const StreamBuffer& data) {
     235                 :           0 :             handle_server_message(type, data);
     236                 :           0 :         });
     237                 :             : 
     238                 :             :     // Set up disconnect handler
     239                 :           0 :     server_connection_->set_disconnect_handler([this]() { handle_disconnect(); });
     240                 :             : 
     241                 :             :     // Connection successful — reset failover counter
     242                 :           0 :     reconnect_attempts_ = 0;
     243                 :           0 :     connected_.store(true);
     244                 :             : 
     245                 :             :     // Send registration
     246                 :           0 :     send_registration();
     247                 :           0 : }
     248                 :             : 
     249                 :           0 : void RegistrarClient::send_registration() {
     250                 :           0 :     if (!server_connection_ || !connected_.load()) {
     251                 :           0 :         return;
     252                 :             :     }
     253                 :             : 
     254                 :             :     // Build registration message using protobuf
     255                 :           0 :     std::string host = get_local_ip();
     256                 :           0 :     uint16_t tcp_port = config_.tcp_port;
     257                 :             : 
     258                 :             :     // Create NodeEndpoint for serialization
     259                 :           0 :     NodeEndpoint ep;
     260                 :           0 :     ep.identity.endpoint = local_endpoint_;
     261                 :           0 :     ep.identity.host = host;
     262                 :           0 :     ep.tcp_port = tcp_port;
     263                 :           0 :     ep.identity.acceptors = acceptors_;
     264                 :             : 
     265                 :           0 :     StreamBuffer payload = serialize_register_payload(ep);
     266                 :           0 :     server_connection_->send_message(TcpMessageType::Register, payload);
     267                 :           0 : }
     268                 :             : 
     269                 :           0 : void RegistrarClient::handle_server_message(TcpMessageType type,
     270                 :             :                                             const StreamBuffer& data) {
     271                 :           0 :     switch (type) {
     272                 :           0 :         case TcpMessageType::Accept: {
     273                 :             :             // Registration accepted - server acknowledges our registration
     274                 :             :             // Payload: [ErrorCode: 1]
     275                 :           0 :             if (data.size() >= 1) {
     276                 :           0 :                 uint8_t error_code = data[0];
     277                 :           0 :                 if (error_code == 0) {
     278                 :             :                     // Success - we're registered
     279                 :           0 :                     last_heartbeat_sent_ = std::chrono::steady_clock::now();
     280                 :           0 :                     HPACTOR_LOG_INFO(log::LogCategory::kRegistrar, ActorId{0},
     281                 :             :                                      static_cast<uint32_t>(
     282                 :             :                                          log::LogEventId::kRegistrarRegister),
     283                 :             :                                      "registrar client registered");
     284                 :             :                 }
     285                 :             :             }
     286                 :           0 :             break;
     287                 :             :         }
     288                 :             : 
     289                 :           0 :         case TcpMessageType::NodeJoin: {
     290                 :           0 :             PbNodeJoinPayload msg;
     291                 :           0 :             if (!msg.ParseFromArray(data.data(), static_cast<int>(data.size()))) {
     292                 :           0 :                 break;
     293                 :             :             }
     294                 :             : 
     295                 :           0 :             const auto& ep_info = msg.endpoint_info();
     296                 :           0 :             std::string endpoint_str = ep_info.endpoint();
     297                 :           0 :             EndPoint endpoint = endpoint_ops::parse_endpoint(endpoint_str);
     298                 :             : 
     299                 :           0 :             std::string host = ep_info.host();
     300                 :           0 :             uint16_t tcp_port = static_cast<uint16_t>(ep_info.tcp_port());
     301                 :             : 
     302                 :           0 :             NodeEndpoint node_ep;
     303                 :           0 :             node_ep.identity.endpoint = endpoint;
     304                 :           0 :             node_ep.identity.host = host;
     305                 :           0 :             node_ep.tcp_port = tcp_port;
     306                 :           0 :             node_ep.last_seen = std::chrono::steady_clock::now();
     307                 :           0 :             shared_registry_->upsert_endpoint(node_ep);
     308                 :           0 :             break;
     309                 :           0 :         }
     310                 :             : 
     311                 :           0 :         case TcpMessageType::NodeLeave: {
     312                 :           0 :             PbNodeLeavePayload msg;
     313                 :           0 :             if (!msg.ParseFromArray(data.data(), static_cast<int>(data.size()))) {
     314                 :           0 :                 break;
     315                 :             :             }
     316                 :             : 
     317                 :           0 :             std::string endpoint_str = msg.endpoint();
     318                 :           0 :             EndPoint endpoint = endpoint_ops::parse_endpoint(endpoint_str);
     319                 :           0 :             shared_registry_->remove_endpoint(endpoint);
     320                 :           0 :             break;
     321                 :           0 :         }
     322                 :             : 
     323                 :           0 :         case TcpMessageType::Error: {
     324                 :             :             // Error response
     325                 :             :             // Payload: [ErrorCode: 1][MessageLen: 4][Message: N]
     326                 :           0 :             if (data.size() < 1)
     327                 :           0 :                 break;
     328                 :           0 :             uint8_t error_code = data[0];
     329                 :             :             (void)error_code; // Could log this
     330                 :           0 :             break;
     331                 :             :         }
     332                 :             : 
     333                 :           0 :         case TcpMessageType::Heartbeat:
     334                 :             :         case TcpMessageType::Register:
     335                 :             :             // These are sent by us, not received
     336                 :           0 :             break;
     337                 :             :     }
     338                 :           0 : }
     339                 :             : 
     340                 :           0 : void RegistrarClient::handle_disconnect() {
     341                 :           0 :     connected_.store(false);
     342                 :             : 
     343                 :           0 :     HPACTOR_LOG_WARNING(log::LogCategory::kRegistrar, ActorId{0}, 0,
     344                 :             :                         "registrar heartbeat timeout");
     345                 :             : 
     346                 :             :     // Close existing connection
     347                 :           0 :     if (server_connection_) {
     348                 :           0 :         server_connection_->close();
     349                 :           0 :         server_connection_.reset();
     350                 :             :     }
     351                 :             : 
     352                 :           0 :     reconnect_attempts_++;
     353                 :           0 :     if (reconnect_attempts_ >= kMaxReconnectAttempts && failover_callback_) {
     354                 :           0 :         failover_callback_();
     355                 :           0 :         return;
     356                 :             :     }
     357                 :             : 
     358                 :             :     // Schedule reconnect if still running
     359                 :           0 :     if (running_.load() && loop_) {
     360                 :           0 :         loop_->run_after(
     361                 :           0 :             [this]() {
     362                 :           0 :                 if (running_.load()) {
     363                 :           0 :                     reconnect();
     364                 :             :                 }
     365                 :           0 :             },
     366                 :             :             1000);
     367                 :             :     }
     368                 :             : }
     369                 :             : 
     370                 :           0 : void RegistrarClient::reconnect() {
     371                 :           0 :     handle_disconnect();
     372                 :           0 : }
     373                 :             : 
     374                 :             : } // namespace net
     375                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1