LCOV - code coverage report
Current view: top level - src/net - registrar_server.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 0.0 % 124 0
Test Date: 2026-05-20 02:24:49 Functions: 0.0 % 13 0
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/net/registrar.hpp>
      16                 :             : #include <hpactor/net/registrar_serialization.hpp>
      17                 :             : 
      18                 :             : #include <arpa/inet.h>
      19                 :             : #include <netinet/in.h>
      20                 :             : #include <netinet/tcp.h>
      21                 :             : #include <sys/socket.h>
      22                 :             : #include <unistd.h>
      23                 :             : 
      24                 :             : #include <cstring>
      25                 :             : 
      26                 :             : #include <hpactor/log/logger.hpp>
      27                 :             : 
      28                 :             : namespace hpactor {
      29                 :             : 
      30                 :             : namespace net {
      31                 :             : 
      32                 :             : // -----------------------------------------------------------------------------
      33                 :             : // RegistrarServer Implementation
      34                 :             : // -----------------------------------------------------------------------------
      35                 :             : 
      36                 :           0 : RegistrarServer::RegistrarServer(const RegistrarConfig& config,
      37                 :           0 :                                  EndPoint local_endpoint, EventLoop* loop)
      38                 :           0 :     : config_(config), local_endpoint_(local_endpoint), registry_(config),
      39                 :           0 :       loop_(loop), acceptor_(loop) {
      40                 :             :     // Set completion callback for send routing - must be done before
      41                 :             :     // connections register themselves
      42                 :           0 :     if (loop_) {
      43                 :           0 :         loop_->set_completion_callback([this](OpCompletion c) {
      44                 :           0 :             if (c.type == OpType::Send) {
      45                 :           0 :                 auto it = fd_to_connection_.find(c.fd);
      46                 :           0 :                 if (it != fd_to_connection_.end()) {
      47                 :           0 :                     it->second->handle_send_completion(c.result);
      48                 :             :                 }
      49                 :             :             }
      50                 :           0 :         });
      51                 :             :     }
      52                 :           0 : }
      53                 :             : 
      54                 :           0 : RegistrarServer::~RegistrarServer() {
      55                 :           0 :     stop();
      56                 :           0 : }
      57                 :             : 
      58                 :           0 : void RegistrarServer::start() {
      59                 :           0 :     if (running_.load()) {
      60                 :           0 :         return;
      61                 :             :     }
      62                 :             : 
      63                 :           0 :     running_.store(true);
      64                 :             : 
      65                 :           0 :     HPACTOR_LOG_INFO(log::LogCategory::kRegistrar, ActorId{0}, 0,
      66                 :             :                      "registrar server started");
      67                 :             : 
      68                 :             :     // Use Acceptor for TCP listening (async)
      69                 :             :     // The Acceptor uses the EventLoop to monitor the listening socket
      70                 :             :     // and invokes our accept handler when connections arrive
      71                 :           0 :     acceptor_.set_accept_handler(
      72                 :           0 :         [this](int fd, EndPoint remote_ep) { handle_accept(fd, remote_ep); });
      73                 :           0 :     acceptor_.listen(config_.tcp_port);
      74                 :             : }
      75                 :             : 
      76                 :           0 : void RegistrarServer::stop() {
      77                 :           0 :     if (!running_.load()) {
      78                 :           0 :         return;
      79                 :             :     }
      80                 :             : 
      81                 :           0 :     running_.store(false);
      82                 :             : 
      83                 :           0 :     HPACTOR_LOG_INFO(log::LogCategory::kRegistrar, ActorId{0}, 0,
      84                 :             :                      "registrar server stopped");
      85                 :             : 
      86                 :             :     // Close all client connections
      87                 :             :     {
      88                 :           0 :         std::lock_guard<std::mutex> lock(clients_mutex_);
      89                 :           0 :         for (auto& [endpoint, conn] : clients_) {
      90                 :             :             (void)endpoint;
      91                 :           0 :             conn->close();
      92                 :             :         }
      93                 :           0 :         clients_.clear();
      94                 :           0 :         fd_to_connection_.clear();
      95                 :           0 :     }
      96                 :             : 
      97                 :             :     // Close acceptor
      98                 :           0 :     acceptor_.close();
      99                 :             : }
     100                 :             : 
     101                 :           0 : void RegistrarServer::handle_accept(int client_fd, EndPoint remote_endpoint) {
     102                 :             :     // Set TCP_NODELAY for lower latency
     103                 :           0 :     int nodelay = 1;
     104                 :           0 :     setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
     105                 :             : 
     106                 :           0 :     auto conn = RegistrarConnection::accepted(client_fd, remote_endpoint, loop_);
     107                 :             : 
     108                 :             :     // Set message handler to process incoming messages
     109                 :           0 :     conn->set_message_handler(
     110                 :           0 :         [this, conn](TcpMessageType type, const StreamBuffer& payload) {
     111                 :           0 :             handle_tcp_message(conn, type, payload);
     112                 :           0 :         });
     113                 :             : 
     114                 :             :     // Set disconnect handler
     115                 :           0 :     conn->set_disconnect_handler([this, conn]() { handle_disconnect(conn); });
     116                 :             : 
     117                 :             :     // Store connection in both maps
     118                 :             :     {
     119                 :           0 :         std::lock_guard<std::mutex> lock(clients_mutex_);
     120                 :           0 :         clients_[remote_endpoint] = conn;
     121                 :           0 :         fd_to_connection_[client_fd] = conn;
     122                 :           0 :     }
     123                 :             : 
     124                 :             :     // Register with NodeRegistry
     125                 :             :     // Note: actual registration info comes in Register message
     126                 :           0 : }
     127                 :             : 
     128                 :           0 : void RegistrarServer::handle_tcp_message(RegistrarConnectionPtr conn,
     129                 :             :                                          TcpMessageType type,
     130                 :             :                                          const StreamBuffer& data) {
     131                 :           0 :     switch (type) {
     132                 :           0 :         case TcpMessageType::Register: {
     133                 :           0 :             PbRegisterPayload msg;
     134                 :           0 :             if (!msg.ParseFromArray(data.data(), static_cast<int>(data.size()))) {
     135                 :           0 :                 return;
     136                 :             :             }
     137                 :             : 
     138                 :           0 :             const auto& ep_info = msg.endpoint_info();
     139                 :           0 :             std::string endpoint_str = ep_info.endpoint();
     140                 :           0 :             EndPoint node_endpoint = endpoint_ops::parse_endpoint(endpoint_str);
     141                 :             : 
     142                 :           0 :             if (std::holds_alternative<Ipv4Endpoint>(node_endpoint) &&
     143                 :           0 :                 std::get<Ipv4Endpoint>(node_endpoint).is_unspecified()) {
     144                 :           0 :                 return;
     145                 :             :             }
     146                 :             : 
     147                 :           0 :             std::string client_host = ep_info.host();
     148                 :           0 :             uint16_t client_port = static_cast<uint16_t>(ep_info.tcp_port());
     149                 :             : 
     150                 :             :             // Acceptors are at top level of PbRegisterPayload (per spec)
     151                 :           0 :             std::vector<AcceptorInfo> client_acceptors;
     152                 :           0 :             for (const auto& a : msg.acceptors()) {
     153                 :           0 :                 AcceptorInfo acceptor;
     154                 :           0 :                 acceptor.port = static_cast<uint16_t>(a.port());
     155                 :           0 :                 acceptor.handshake_version =
     156                 :           0 :                     static_cast<uint8_t>(a.handshake_version());
     157                 :           0 :                 acceptor.protocol_version =
     158                 :           0 :                     static_cast<uint8_t>(a.protocol_version());
     159                 :           0 :                 acceptor.tls_required = a.tls_required();
     160                 :           0 :                 client_acceptors.push_back(acceptor);
     161                 :             :             }
     162                 :             : 
     163                 :             :             // Update clients map
     164                 :             :             {
     165                 :           0 :                 std::lock_guard<std::mutex> lock(clients_mutex_);
     166                 :           0 :                 clients_.erase(conn->remote_endpoint());
     167                 :           0 :                 clients_[node_endpoint] = conn;
     168                 :           0 :             }
     169                 :             : 
     170                 :             :             // Create and upsert endpoint
     171                 :           0 :             NodeEndpoint ep;
     172                 :           0 :             ep.identity.endpoint = node_endpoint;
     173                 :           0 :             ep.identity.host = client_host;
     174                 :           0 :             ep.tcp_port = client_port;
     175                 :           0 :             ep.identity.acceptors = std::move(client_acceptors);
     176                 :           0 :             ep.last_seen = std::chrono::steady_clock::now();
     177                 :           0 :             registry_.upsert_endpoint(ep);
     178                 :             : 
     179                 :             :             // Send Accept response using protobuf
     180                 :           0 :             StreamBuffer accept_payload = serialize_accept_payload(0);
     181                 :           0 :             conn->send_message(TcpMessageType::Accept, accept_payload);
     182                 :             : 
     183                 :             :             // Broadcast node joined
     184                 :           0 :             broadcast_node_joined(node_endpoint, ep);
     185                 :           0 :             break;
     186                 :           0 :         }
     187                 :             : 
     188                 :           0 :         case TcpMessageType::Heartbeat: {
     189                 :           0 :             EndPoint endpoint = conn->remote_endpoint();
     190                 :           0 :             bool is_valid = std::holds_alternative<Ipv4Endpoint>(endpoint)
     191                 :           0 :                                 ? !std::get<Ipv4Endpoint>(endpoint).is_unspecified()
     192                 :           0 :                                 : true;
     193                 :           0 :             if (is_valid) {
     194                 :           0 :                 NodeEndpoint* ep = registry_.get(endpoint);
     195                 :           0 :                 if (ep) {
     196                 :           0 :                     ep->last_seen = std::chrono::steady_clock::now();
     197                 :             :                 }
     198                 :             :             }
     199                 :           0 :             break;
     200                 :             :         }
     201                 :             : 
     202                 :           0 :         case TcpMessageType::NodeJoin:
     203                 :             :         case TcpMessageType::NodeLeave:
     204                 :             :         case TcpMessageType::Accept:
     205                 :             :         case TcpMessageType::Error:
     206                 :             :             // These are not expected from clients
     207                 :           0 :             break;
     208                 :             :     }
     209                 :             : }
     210                 :             : 
     211                 :           0 : void RegistrarServer::handle_disconnect(RegistrarConnectionPtr conn) {
     212                 :           0 :     EndPoint endpoint = conn->remote_endpoint();
     213                 :           0 :     int fd = conn->fd();
     214                 :             : 
     215                 :             :     // Remove from both maps
     216                 :             :     {
     217                 :           0 :         std::lock_guard<std::mutex> lock(clients_mutex_);
     218                 :           0 :         clients_.erase(endpoint);
     219                 :           0 :         fd_to_connection_.erase(fd);
     220                 :           0 :     }
     221                 :             : 
     222                 :             :     // Broadcast node left
     223                 :           0 :     broadcast_node_left(endpoint);
     224                 :             : 
     225                 :             :     // Remove endpoint from registry
     226                 :           0 :     registry_.remove_endpoint(endpoint);
     227                 :           0 : }
     228                 :             : 
     229                 :           0 : void RegistrarServer::broadcast_node_joined(EndPoint endpoint,
     230                 :             :                                             const NodeEndpoint& ep) {
     231                 :           0 :     StreamBuffer payload = serialize_node_join_payload(ep);
     232                 :             : 
     233                 :           0 :     std::lock_guard<std::mutex> lock(clients_mutex_);
     234                 :           0 :     for (const auto& [id, conn] : clients_) {
     235                 :           0 :         if (id != endpoint) {
     236                 :           0 :             conn->send_message(TcpMessageType::NodeJoin, payload);
     237                 :             :         }
     238                 :             :     }
     239                 :           0 : }
     240                 :             : 
     241                 :           0 : void RegistrarServer::broadcast_node_left(EndPoint endpoint) {
     242                 :           0 :     StreamBuffer payload = serialize_node_leave_payload(endpoint);
     243                 :             : 
     244                 :           0 :     std::lock_guard<std::mutex> lock(clients_mutex_);
     245                 :           0 :     for (const auto& [id, conn] : clients_) {
     246                 :             :         (void)id;
     247                 :           0 :         conn->send_message(TcpMessageType::NodeLeave, payload);
     248                 :             :     }
     249                 :           0 : }
     250                 :             : 
     251                 :             : } // namespace net
     252                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1