Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/net/registrar.hpp>
16 : : #include <hpactor/net/registrar_serialization.hpp>
17 : :
18 : : #include <arpa/inet.h>
19 : : #include <netinet/in.h>
20 : : #include <netinet/tcp.h>
21 : : #include <sys/socket.h>
22 : : #include <unistd.h>
23 : :
24 : : #include <cstring>
25 : :
26 : : #include <hpactor/log/logger.hpp>
27 : :
28 : : namespace hpactor {
29 : :
30 : : namespace net {
31 : :
32 : : // -----------------------------------------------------------------------------
33 : : // RegistrarServer Implementation
34 : : // -----------------------------------------------------------------------------
35 : :
36 : 0 : RegistrarServer::RegistrarServer(const RegistrarConfig& config,
37 : 0 : EndPoint local_endpoint, EventLoop* loop)
38 : 0 : : config_(config), local_endpoint_(local_endpoint), registry_(config),
39 : 0 : loop_(loop), acceptor_(loop) {
40 : : // Set completion callback for send routing - must be done before
41 : : // connections register themselves
42 : 0 : if (loop_) {
43 : 0 : loop_->set_completion_callback([this](OpCompletion c) {
44 : 0 : if (c.type == OpType::Send) {
45 : 0 : auto it = fd_to_connection_.find(c.fd);
46 : 0 : if (it != fd_to_connection_.end()) {
47 : 0 : it->second->handle_send_completion(c.result);
48 : : }
49 : : }
50 : 0 : });
51 : : }
52 : 0 : }
53 : :
54 : 0 : RegistrarServer::~RegistrarServer() {
55 : 0 : stop();
56 : 0 : }
57 : :
58 : 0 : void RegistrarServer::start() {
59 : 0 : if (running_.load()) {
60 : 0 : return;
61 : : }
62 : :
63 : 0 : running_.store(true);
64 : :
65 : 0 : HPACTOR_LOG_INFO(log::LogCategory::kRegistrar, ActorId{0}, 0,
66 : : "registrar server started");
67 : :
68 : : // Use Acceptor for TCP listening (async)
69 : : // The Acceptor uses the EventLoop to monitor the listening socket
70 : : // and invokes our accept handler when connections arrive
71 : 0 : acceptor_.set_accept_handler(
72 : 0 : [this](int fd, EndPoint remote_ep) { handle_accept(fd, remote_ep); });
73 : 0 : acceptor_.listen(config_.tcp_port);
74 : : }
75 : :
76 : 0 : void RegistrarServer::stop() {
77 : 0 : if (!running_.load()) {
78 : 0 : return;
79 : : }
80 : :
81 : 0 : running_.store(false);
82 : :
83 : 0 : HPACTOR_LOG_INFO(log::LogCategory::kRegistrar, ActorId{0}, 0,
84 : : "registrar server stopped");
85 : :
86 : : // Close all client connections
87 : : {
88 : 0 : std::lock_guard<std::mutex> lock(clients_mutex_);
89 : 0 : for (auto& [endpoint, conn] : clients_) {
90 : : (void)endpoint;
91 : 0 : conn->close();
92 : : }
93 : 0 : clients_.clear();
94 : 0 : fd_to_connection_.clear();
95 : 0 : }
96 : :
97 : : // Close acceptor
98 : 0 : acceptor_.close();
99 : : }
100 : :
101 : 0 : void RegistrarServer::handle_accept(int client_fd, EndPoint remote_endpoint) {
102 : : // Set TCP_NODELAY for lower latency
103 : 0 : int nodelay = 1;
104 : 0 : setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay));
105 : :
106 : 0 : auto conn = RegistrarConnection::accepted(client_fd, remote_endpoint, loop_);
107 : :
108 : : // Set message handler to process incoming messages
109 : 0 : conn->set_message_handler(
110 : 0 : [this, conn](TcpMessageType type, const StreamBuffer& payload) {
111 : 0 : handle_tcp_message(conn, type, payload);
112 : 0 : });
113 : :
114 : : // Set disconnect handler
115 : 0 : conn->set_disconnect_handler([this, conn]() { handle_disconnect(conn); });
116 : :
117 : : // Store connection in both maps
118 : : {
119 : 0 : std::lock_guard<std::mutex> lock(clients_mutex_);
120 : 0 : clients_[remote_endpoint] = conn;
121 : 0 : fd_to_connection_[client_fd] = conn;
122 : 0 : }
123 : :
124 : : // Register with NodeRegistry
125 : : // Note: actual registration info comes in Register message
126 : 0 : }
127 : :
128 : 0 : void RegistrarServer::handle_tcp_message(RegistrarConnectionPtr conn,
129 : : TcpMessageType type,
130 : : const StreamBuffer& data) {
131 : 0 : switch (type) {
132 : 0 : case TcpMessageType::Register: {
133 : 0 : PbRegisterPayload msg;
134 : 0 : if (!msg.ParseFromArray(data.data(), static_cast<int>(data.size()))) {
135 : 0 : return;
136 : : }
137 : :
138 : 0 : const auto& ep_info = msg.endpoint_info();
139 : 0 : std::string endpoint_str = ep_info.endpoint();
140 : 0 : EndPoint node_endpoint = endpoint_ops::parse_endpoint(endpoint_str);
141 : :
142 : 0 : if (std::holds_alternative<Ipv4Endpoint>(node_endpoint) &&
143 : 0 : std::get<Ipv4Endpoint>(node_endpoint).is_unspecified()) {
144 : 0 : return;
145 : : }
146 : :
147 : 0 : std::string client_host = ep_info.host();
148 : 0 : uint16_t client_port = static_cast<uint16_t>(ep_info.tcp_port());
149 : :
150 : : // Acceptors are at top level of PbRegisterPayload (per spec)
151 : 0 : std::vector<AcceptorInfo> client_acceptors;
152 : 0 : for (const auto& a : msg.acceptors()) {
153 : 0 : AcceptorInfo acceptor;
154 : 0 : acceptor.port = static_cast<uint16_t>(a.port());
155 : 0 : acceptor.handshake_version =
156 : 0 : static_cast<uint8_t>(a.handshake_version());
157 : 0 : acceptor.protocol_version =
158 : 0 : static_cast<uint8_t>(a.protocol_version());
159 : 0 : acceptor.tls_required = a.tls_required();
160 : 0 : client_acceptors.push_back(acceptor);
161 : : }
162 : :
163 : : // Update clients map
164 : : {
165 : 0 : std::lock_guard<std::mutex> lock(clients_mutex_);
166 : 0 : clients_.erase(conn->remote_endpoint());
167 : 0 : clients_[node_endpoint] = conn;
168 : 0 : }
169 : :
170 : : // Create and upsert endpoint
171 : 0 : NodeEndpoint ep;
172 : 0 : ep.identity.endpoint = node_endpoint;
173 : 0 : ep.identity.host = client_host;
174 : 0 : ep.tcp_port = client_port;
175 : 0 : ep.identity.acceptors = std::move(client_acceptors);
176 : 0 : ep.last_seen = std::chrono::steady_clock::now();
177 : 0 : registry_.upsert_endpoint(ep);
178 : :
179 : : // Send Accept response using protobuf
180 : 0 : StreamBuffer accept_payload = serialize_accept_payload(0);
181 : 0 : conn->send_message(TcpMessageType::Accept, accept_payload);
182 : :
183 : : // Broadcast node joined
184 : 0 : broadcast_node_joined(node_endpoint, ep);
185 : 0 : break;
186 : 0 : }
187 : :
188 : 0 : case TcpMessageType::Heartbeat: {
189 : 0 : EndPoint endpoint = conn->remote_endpoint();
190 : 0 : bool is_valid = std::holds_alternative<Ipv4Endpoint>(endpoint)
191 : 0 : ? !std::get<Ipv4Endpoint>(endpoint).is_unspecified()
192 : 0 : : true;
193 : 0 : if (is_valid) {
194 : 0 : NodeEndpoint* ep = registry_.get(endpoint);
195 : 0 : if (ep) {
196 : 0 : ep->last_seen = std::chrono::steady_clock::now();
197 : : }
198 : : }
199 : 0 : break;
200 : : }
201 : :
202 : 0 : case TcpMessageType::NodeJoin:
203 : : case TcpMessageType::NodeLeave:
204 : : case TcpMessageType::Accept:
205 : : case TcpMessageType::Error:
206 : : // These are not expected from clients
207 : 0 : break;
208 : : }
209 : : }
210 : :
211 : 0 : void RegistrarServer::handle_disconnect(RegistrarConnectionPtr conn) {
212 : 0 : EndPoint endpoint = conn->remote_endpoint();
213 : 0 : int fd = conn->fd();
214 : :
215 : : // Remove from both maps
216 : : {
217 : 0 : std::lock_guard<std::mutex> lock(clients_mutex_);
218 : 0 : clients_.erase(endpoint);
219 : 0 : fd_to_connection_.erase(fd);
220 : 0 : }
221 : :
222 : : // Broadcast node left
223 : 0 : broadcast_node_left(endpoint);
224 : :
225 : : // Remove endpoint from registry
226 : 0 : registry_.remove_endpoint(endpoint);
227 : 0 : }
228 : :
229 : 0 : void RegistrarServer::broadcast_node_joined(EndPoint endpoint,
230 : : const NodeEndpoint& ep) {
231 : 0 : StreamBuffer payload = serialize_node_join_payload(ep);
232 : :
233 : 0 : std::lock_guard<std::mutex> lock(clients_mutex_);
234 : 0 : for (const auto& [id, conn] : clients_) {
235 : 0 : if (id != endpoint) {
236 : 0 : conn->send_message(TcpMessageType::NodeJoin, payload);
237 : : }
238 : : }
239 : 0 : }
240 : :
241 : 0 : void RegistrarServer::broadcast_node_left(EndPoint endpoint) {
242 : 0 : StreamBuffer payload = serialize_node_leave_payload(endpoint);
243 : :
244 : 0 : std::lock_guard<std::mutex> lock(clients_mutex_);
245 : 0 : for (const auto& [id, conn] : clients_) {
246 : : (void)id;
247 : 0 : conn->send_message(TcpMessageType::NodeLeave, payload);
248 : : }
249 : 0 : }
250 : :
251 : : } // namespace net
252 : : } // namespace hpactor
|