Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/net/gossip_membership.hpp>
16 : :
17 : : #include <hpactor/gossip.pb.h>
18 : : #include <hpactor/log/logger.hpp>
19 : : #include <hpactor/net/registrar.hpp> // for endpoint_ops, HostResolver
20 : :
21 : : #include <arpa/inet.h>
22 : : #include <netinet/in.h>
23 : : #include <sys/socket.h>
24 : : #include <unistd.h>
25 : :
26 : : #include <algorithm>
27 : : #include <cstring>
28 : : #include <random>
29 : :
30 : : namespace hpactor::net {
31 : :
32 : : // =============================================================================
33 : : // Internal helpers (anonymous namespace)
34 : : // =============================================================================
35 : : namespace {
36 : :
37 : : // ---- Endpoint conversion to/from PbEndpoint ----
38 : :
39 : 22 : void ep_to_pb_endpoint(PbEndpoint* pb, const EndPoint& ep) {
40 : 22 : if (auto* ipv4 = std::get_if<Ipv4Endpoint>(&ep)) {
41 : 22 : auto* pb4 = pb->mutable_ipv4();
42 : 22 : pb4->set_addr(ipv4->addr);
43 : 22 : pb4->set_port(ipv4->port());
44 : 0 : } else if (auto* ipv6 = std::get_if<Ipv6Endpoint>(&ep)) {
45 : 0 : auto* pb6 = pb->mutable_ipv6();
46 : 0 : pb6->set_addr(
47 : 0 : std::string(reinterpret_cast<const char*>(ipv6->addr.data()), 16));
48 : 0 : pb6->set_port(ipv6->port());
49 : : }
50 : 22 : }
51 : :
52 : 4 : EndPoint pb_endpoint_to_ep(const PbEndpoint& pb) {
53 : 4 : if (pb.has_ipv4()) {
54 : 4 : const auto& pb4 = pb.ipv4();
55 : 4 : return Ipv4Endpoint{pb4.addr(), htons(static_cast<uint16_t>(pb4.port()))};
56 : : }
57 : 0 : if (pb.has_ipv6()) {
58 : 0 : const auto& pb6 = pb.ipv6();
59 : 0 : std::array<uint8_t, 16> addr{};
60 : 0 : if (pb6.addr().size() == 16) {
61 : 0 : std::memcpy(addr.data(), pb6.addr().data(), 16);
62 : : }
63 : 0 : return Ipv6Endpoint{addr, htons(static_cast<uint16_t>(pb6.port()))};
64 : : }
65 : 0 : return Ipv4Endpoint{0, 0};
66 : : }
67 : :
68 : : // ---- Piggyback entry conversion to/from protobuf ----
69 : :
70 : 2 : void to_pb_piggyback(PbPiggybackEntry* pb, const PiggybackEntry& entry) {
71 : 2 : pb->set_type(static_cast<PbPiggybackType>(entry.type));
72 : 2 : ep_to_pb_endpoint(pb->mutable_endpoint(), entry.identity.endpoint);
73 : 2 : pb->set_incarnation(entry.incarnation);
74 : 4 : for (const auto& s : entry.actor_types)
75 : 2 : pb->add_actor_types(s);
76 : 2 : pb->set_load(entry.load);
77 : 3 : for (const auto& acc : entry.identity.acceptors) {
78 : 1 : auto* a = pb->add_acceptors();
79 : 1 : a->set_port(acc.port);
80 : 1 : a->set_handshake_version(acc.handshake_version);
81 : 1 : a->set_protocol_version(acc.protocol_version);
82 : 1 : a->set_tls_required(acc.tls_required);
83 : : }
84 : 2 : }
85 : :
86 : 2 : PiggybackEntry from_pb_piggyback(const PbPiggybackEntry& pb) {
87 : 2 : PiggybackEntry entry;
88 : 2 : entry.type = static_cast<PiggybackType>(pb.type());
89 : 2 : entry.identity.endpoint = pb_endpoint_to_ep(pb.endpoint());
90 : 2 : entry.incarnation = pb.incarnation();
91 : 4 : for (const auto& s : pb.actor_types())
92 : 2 : entry.actor_types.push_back(s);
93 : 2 : entry.load = pb.load();
94 : 3 : for (const auto& a : pb.acceptors()) {
95 : 1 : AcceptorInfo acc;
96 : 1 : acc.port = static_cast<uint16_t>(a.port());
97 : 1 : acc.handshake_version = static_cast<uint8_t>(a.handshake_version());
98 : 1 : acc.protocol_version = static_cast<uint8_t>(a.protocol_version());
99 : 1 : acc.tls_required = a.tls_required();
100 : 1 : entry.identity.acceptors.push_back(acc);
101 : : }
102 : 2 : return entry;
103 : : }
104 : :
105 : : // ---- Member conversion to/from protobuf ----
106 : :
107 : 0 : void to_pb_member(PbGossipMember* pb, const Member& m) {
108 : 0 : ep_to_pb_endpoint(pb->mutable_endpoint(), m.identity.endpoint);
109 : 0 : pb->set_host(m.identity.host);
110 : 0 : pb->set_uds_path(m.identity.uds_path);
111 : 0 : pb->set_incarnation(m.incarnation);
112 : 0 : pb->set_status(static_cast<uint32_t>(m.status));
113 : 0 : for (const auto& s : m.actor_types)
114 : 0 : pb->add_actor_types(s);
115 : 0 : for (const auto& acc : m.identity.acceptors) {
116 : 0 : auto* a = pb->add_acceptors();
117 : 0 : a->set_port(acc.port);
118 : 0 : a->set_handshake_version(acc.handshake_version);
119 : 0 : a->set_protocol_version(acc.protocol_version);
120 : 0 : a->set_tls_required(acc.tls_required);
121 : : }
122 : 0 : }
123 : :
124 : 0 : Member from_pb_member(const PbGossipMember& pb) {
125 : 0 : Member m;
126 : 0 : m.identity.endpoint = pb_endpoint_to_ep(pb.endpoint());
127 : 0 : m.identity.host = pb.host();
128 : 0 : m.identity.uds_path = pb.uds_path();
129 : 0 : m.incarnation = pb.incarnation();
130 : 0 : m.status = static_cast<MemberStatus>(pb.status());
131 : 0 : for (const auto& s : pb.actor_types())
132 : 0 : m.actor_types.push_back(s);
133 : 0 : for (const auto& a : pb.acceptors()) {
134 : 0 : AcceptorInfo acc;
135 : 0 : acc.port = static_cast<uint16_t>(a.port());
136 : 0 : acc.handshake_version = static_cast<uint8_t>(a.handshake_version());
137 : 0 : acc.protocol_version = static_cast<uint8_t>(a.protocol_version());
138 : 0 : acc.tls_required = a.tls_required();
139 : 0 : m.identity.acceptors.push_back(acc);
140 : : }
141 : 0 : return m;
142 : : }
143 : :
144 : : // ---- Async UDP send via EventLoop ----
145 : : // Uses the EventLoop's async_sendto (non-blocking, completion-driven).
146 : : // Falls back to blocking sendto when no EventLoop is available.
147 : :
148 : 18 : void async_udp_send(EventLoop* loop, int sock, const StreamBuffer& data,
149 : : const EndPoint& dest) {
150 : 18 : if (sock < 0 || data.empty())
151 : 18 : return;
152 : :
153 : 0 : if (loop) {
154 : : struct iovec iov;
155 : 0 : iov.iov_base = const_cast<uint8_t*>(data.data());
156 : 0 : iov.iov_len = data.size();
157 : :
158 : 0 : sockaddr_in addr{};
159 : 0 : addr.sin_family = AF_INET;
160 : 0 : if (auto* ipv4 = std::get_if<Ipv4Endpoint>(&dest)) {
161 : 0 : addr.sin_addr.s_addr = ipv4->addr;
162 : 0 : addr.sin_port = ipv4->port_nw;
163 : : }
164 : :
165 : 0 : loop->backend()->async_sendto(
166 : : sock, &iov, 1, reinterpret_cast<const sockaddr*>(&addr),
167 : : sizeof(addr), ActorId(0), static_cast<uint32_t>(OpType::SendTo));
168 : : } else {
169 : : // Legacy fallback: blocking sendto
170 : 0 : sockaddr_in addr{};
171 : 0 : addr.sin_family = AF_INET;
172 : 0 : if (auto* ipv4 = std::get_if<Ipv4Endpoint>(&dest)) {
173 : 0 : addr.sin_addr.s_addr = ipv4->addr;
174 : 0 : addr.sin_port = ipv4->port_nw;
175 : : }
176 : 0 : sendto(sock, data.data(), data.size(), 0,
177 : : reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr));
178 : : }
179 : : }
180 : :
181 : : // ---- Per-instance state (RNG, join retry, forwarded pings) ----
182 : :
183 : : struct InstanceExtras {
184 : : std::mt19937 rng;
185 : : bool rng_seeded = false;
186 : : size_t join_seed_index = 0;
187 : : uint64_t join_retry_timer = 0;
188 : : std::unordered_map<EndPoint, EndPoint> forwarded_pings;
189 : : };
190 : :
191 : : static std::unordered_map<const GossipMembership*, InstanceExtras> s_extras;
192 : :
193 : 27 : static InstanceExtras& extras_for(const GossipMembership* self) {
194 : 27 : auto& extras = s_extras[self];
195 : 27 : if (!extras.rng_seeded) {
196 : 25 : std::random_device rd;
197 : 25 : extras.rng.seed(rd());
198 : 25 : extras.rng_seeded = true;
199 : 25 : }
200 : 27 : return extras;
201 : : }
202 : :
203 : 25 : static void cleanup_extras(const GossipMembership* self) {
204 : 25 : s_extras.erase(self);
205 : 25 : }
206 : :
207 : : } // anonymous namespace
208 : :
209 : : // =============================================================================
210 : : // GossipMembership — Constructor / Destructor
211 : : // =============================================================================
212 : :
213 : 20 : GossipMembership::GossipMembership(const GossipConfig& cfg, EventLoop* loop)
214 : 40 : : config_(cfg), loop_(loop), incarnation_(1) // Start at 1 so 0 means "no
215 : : // incarnation"
216 : : ,
217 : 60 : recv_buffer_(kGossipMaxMsgSize) {}
218 : :
219 : 20 : GossipMembership::~GossipMembership() {
220 : 20 : stop();
221 : 20 : }
222 : :
223 : : // =============================================================================
224 : : // IServiceDiscovery — lifecycle
225 : : // =============================================================================
226 : :
227 : 0 : void GossipMembership::start() {
228 : : // Incarnation is a wall-clock timestamp so that restarted nodes
229 : : // automatically have a higher incarnation than any previous run.
230 : 0 : incarnation_ = static_cast<uint64_t>(
231 : 0 : std::chrono::system_clock::now().time_since_epoch().count());
232 : :
233 : 0 : setup_udp_socket();
234 : :
235 : : // Add self to the membership table.
236 : : {
237 : 0 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
238 : 0 : Member self = config_.local_state;
239 : 0 : self.incarnation = incarnation_;
240 : 0 : self.status = MemberStatus::Alive;
241 : 0 : self.last_seen = std::chrono::steady_clock::now();
242 : 0 : members_[self.identity.endpoint] = std::move(self);
243 : 0 : }
244 : :
245 : : // Bootstrap: join an existing cluster via a seed, or start a solo cluster.
246 : 0 : if (!config_.seeds.empty()) {
247 : 0 : auto& ext = extras_for(this);
248 : 0 : ext.join_seed_index = 0;
249 : 0 : send_join(config_.seeds[0]);
250 : :
251 : : // Recursive retry chain: try the next seed after 1 s if no SyncRsp
252 : : // arrived.
253 : 0 : auto retry_fn = std::make_shared<std::function<void()>>();
254 : 0 : *retry_fn = [this, retry_fn]() {
255 : 0 : auto& ext_inner = extras_for(this);
256 : 0 : ext_inner.join_seed_index++;
257 : 0 : if (ext_inner.join_seed_index >= config_.seeds.size()) {
258 : 0 : return; // No more seeds to try
259 : : }
260 : : {
261 : 0 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
262 : 0 : if (members_.size() > 1) {
263 : 0 : return; // Already received SyncRsp — joined successfully
264 : : }
265 : 0 : }
266 : 0 : send_join(config_.seeds[ext_inner.join_seed_index]);
267 : 0 : ext_inner.join_retry_timer = loop_->run_after(*retry_fn, 1000);
268 : 0 : };
269 : 0 : ext.join_retry_timer = loop_->run_after(*retry_fn, 1000);
270 : 0 : }
271 : :
272 : : // Schedule the periodic protocol round.
273 : 0 : protocol_timer_ =
274 : 0 : loop_->run_every([this] { protocol_round(); },
275 : 0 : static_cast<int>(config_.protocol_period.count()));
276 : 0 : }
277 : :
278 : 25 : void GossipMembership::stop() {
279 : : // Cancel the periodic protocol timer.
280 : 25 : if (protocol_timer_ != 0 && loop_) {
281 : 0 : loop_->cancel_timer(protocol_timer_);
282 : 0 : protocol_timer_ = 0;
283 : : }
284 : :
285 : : // Cancel the join retry timer if still active.
286 : 25 : auto& ext = extras_for(this);
287 : 25 : if (ext.join_retry_timer != 0 && loop_) {
288 : 0 : loop_->cancel_timer(ext.join_retry_timer);
289 : 0 : ext.join_retry_timer = 0;
290 : : }
291 : :
292 : : // Send Leave to all known Alive + Suspicious members (best-effort).
293 : : {
294 : 25 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
295 : 44 : for (const auto& [ep, member] : members_) {
296 : 19 : if (member.status == MemberStatus::Alive ||
297 : 1 : member.status == MemberStatus::Suspicious) {
298 : 18 : send_leave(ep);
299 : : }
300 : : }
301 : 25 : }
302 : :
303 : : // Tear down the UDP socket.
304 : 25 : teardown_udp_socket();
305 : :
306 : : // Clear local state.
307 : : {
308 : 25 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
309 : 25 : members_.clear();
310 : 25 : pending_pings_.clear();
311 : 25 : }
312 : :
313 : 25 : cleanup_extras(this);
314 : 25 : }
315 : :
316 : : // =============================================================================
317 : : // IServiceDiscovery — query
318 : : // =============================================================================
319 : :
320 : 3 : std::vector<Member> GossipMembership::discover_all() const {
321 : 3 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
322 : 3 : std::vector<Member> result;
323 : 3 : result.reserve(members_.size());
324 : 5 : for (const auto& [ep, member] : members_) {
325 : 2 : result.push_back(member);
326 : : }
327 : 3 : return result;
328 : 3 : }
329 : :
330 : 4 : const Member* GossipMembership::discover(EndPoint ep) const {
331 : 4 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
332 : 4 : auto it = members_.find(ep);
333 : 4 : if (it != members_.end()) {
334 : 2 : return &it->second;
335 : : }
336 : 2 : return nullptr;
337 : 4 : }
338 : :
339 : : // =============================================================================
340 : : // IServiceDiscovery — announce / callback
341 : : // =============================================================================
342 : :
343 : 1 : void GossipMembership::announce(Member local_state) {
344 : 1 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
345 : : // Bump incarnation on re-announce
346 : 1 : ++incarnation_;
347 : 1 : local_state.incarnation = incarnation_;
348 : 1 : local_state.last_seen = std::chrono::steady_clock::now();
349 : 1 : config_.local_state = std::move(local_state);
350 : 1 : needs_dissemination_ = true;
351 : 1 : }
352 : :
353 : 0 : void GossipMembership::on_member_change(MemberChangeCallback cb) {
354 : 0 : member_change_cb_ = std::move(cb);
355 : 0 : }
356 : :
357 : : // =============================================================================
358 : : // Wire protocol — encode_message
359 : : // =============================================================================
360 : : //
361 : : // Binary format:
362 : : // Magic (4B "HPGC") | Version (1B) | Type (1B) | Flags (2B) |
363 : : // Sender Endpoint (len-prefixed, var) |
364 : : // Incarnation (8B BE) | Sequence Number (4B BE) |
365 : : // [Ping Target Endpoint (len-prefixed, var) — PingReq only] |
366 : : // Piggyback Count (2B BE) | Piggyback entries...
367 : :
368 : : StreamBuffer
369 : 20 : GossipMembership::encode_message(GossipMessageType type, uint64_t inc,
370 : : uint32_t seq, EndPoint ping_target,
371 : : const std::vector<PiggybackEntry>& pb) const {
372 : 20 : PbGossipEnvelope env;
373 : 20 : env.set_magic(GossipMagic);
374 : 20 : env.set_version(GossipVersion);
375 : 20 : env.set_type(static_cast<PbGossipMessageType>(type));
376 : 20 : env.set_flags(0);
377 : :
378 : : // Build per-type payload
379 : 20 : switch (type) {
380 : 2 : case GossipMessageType::Ping: {
381 : 2 : auto* ping = env.mutable_ping();
382 : 2 : ep_to_pb_endpoint(ping->mutable_sender_endpoint(),
383 : 2 : config_.local_state.identity.endpoint);
384 : 2 : ping->set_incarnation(inc);
385 : 2 : ping->set_seq_no(seq);
386 : 4 : for (const auto& e : pb)
387 : 2 : to_pb_piggyback(ping->add_piggyback(), e);
388 : 2 : break;
389 : : }
390 : 0 : case GossipMessageType::Ack: {
391 : 0 : auto* ack = env.mutable_ack();
392 : 0 : ep_to_pb_endpoint(ack->mutable_sender_endpoint(),
393 : 0 : config_.local_state.identity.endpoint);
394 : 0 : ack->set_incarnation(inc);
395 : 0 : for (const auto& e : pb)
396 : 0 : to_pb_piggyback(ack->add_piggyback(), e);
397 : 0 : break;
398 : : }
399 : 0 : case GossipMessageType::PingReq: {
400 : 0 : auto* pr = env.mutable_ping_req();
401 : 0 : ep_to_pb_endpoint(pr->mutable_sender_endpoint(),
402 : 0 : config_.local_state.identity.endpoint);
403 : 0 : ep_to_pb_endpoint(pr->mutable_target_endpoint(), ping_target);
404 : 0 : pr->set_incarnation(inc);
405 : 0 : break;
406 : : }
407 : 0 : case GossipMessageType::IndirectAck: {
408 : 0 : auto* ia = env.mutable_indirect_ack();
409 : 0 : ep_to_pb_endpoint(ia->mutable_sender_endpoint(),
410 : 0 : config_.local_state.identity.endpoint);
411 : 0 : ep_to_pb_endpoint(ia->mutable_original_requester(), ping_target);
412 : 0 : ia->set_incarnation(inc);
413 : 0 : break;
414 : : }
415 : 0 : case GossipMessageType::Join: {
416 : 0 : auto* join = env.mutable_join();
417 : 0 : ep_to_pb_endpoint(join->mutable_sender_endpoint(),
418 : 0 : config_.local_state.identity.endpoint);
419 : 0 : join->set_incarnation(inc);
420 : 0 : join->set_host(config_.local_state.identity.host);
421 : 0 : join->set_uds_path(config_.local_state.identity.uds_path);
422 : 0 : for (const auto& s : config_.local_state.actor_types)
423 : 0 : join->add_actor_types(s);
424 : 0 : for (const auto& acc : config_.local_state.identity.acceptors) {
425 : 0 : auto* a = join->add_acceptors();
426 : 0 : a->set_port(acc.port);
427 : 0 : a->set_handshake_version(acc.handshake_version);
428 : 0 : a->set_protocol_version(acc.protocol_version);
429 : 0 : a->set_tls_required(acc.tls_required);
430 : : }
431 : 0 : break;
432 : : }
433 : 18 : case GossipMessageType::Leave: {
434 : 18 : auto* leave = env.mutable_leave();
435 : 18 : ep_to_pb_endpoint(leave->mutable_sender_endpoint(),
436 : 18 : config_.local_state.identity.endpoint);
437 : 18 : leave->set_incarnation(inc);
438 : 18 : break;
439 : : }
440 : 0 : default:
441 : 0 : break;
442 : : }
443 : :
444 : 40 : std::string serialized = env.SerializeAsString();
445 : 20 : return StreamBuffer(serialized.begin(), serialized.end());
446 : 20 : }
447 : :
448 : : // =============================================================================
449 : : // Wire protocol — decode_message
450 : : // =============================================================================
451 : :
452 : 2 : bool GossipMembership::decode_message(const StreamBuffer& data,
453 : : GossipMessageType& type, EndPoint& sender,
454 : : uint64_t& inc, uint32_t& seq,
455 : : EndPoint& ping_target,
456 : : std::vector<PiggybackEntry>& pb) const {
457 : 2 : PbGossipEnvelope env;
458 : 2 : if (!env.ParseFromArray(data.data(), static_cast<int>(data.size())))
459 : 0 : return false;
460 : 2 : if (env.magic() != GossipMagic || env.version() != GossipVersion)
461 : 0 : return false;
462 : :
463 : 2 : type = static_cast<GossipMessageType>(env.type());
464 : 2 : pb.clear();
465 : :
466 : 2 : switch (type) {
467 : 2 : case GossipMessageType::Ping:
468 : 2 : if (!env.has_ping())
469 : 0 : return false;
470 : 2 : sender = pb_endpoint_to_ep(env.ping().sender_endpoint());
471 : 2 : inc = env.ping().incarnation();
472 : 2 : seq = env.ping().seq_no();
473 : 4 : for (const auto& e : env.ping().piggyback())
474 : 2 : pb.push_back(from_pb_piggyback(e));
475 : 2 : break;
476 : 0 : case GossipMessageType::Ack:
477 : 0 : if (!env.has_ack())
478 : 0 : return false;
479 : 0 : sender = pb_endpoint_to_ep(env.ack().sender_endpoint());
480 : 0 : inc = env.ack().incarnation();
481 : 0 : seq = 0;
482 : 0 : for (const auto& e : env.ack().piggyback())
483 : 0 : pb.push_back(from_pb_piggyback(e));
484 : 0 : break;
485 : 0 : case GossipMessageType::PingReq:
486 : 0 : if (!env.has_ping_req())
487 : 0 : return false;
488 : 0 : sender = pb_endpoint_to_ep(env.ping_req().sender_endpoint());
489 : 0 : ping_target = pb_endpoint_to_ep(env.ping_req().target_endpoint());
490 : 0 : inc = env.ping_req().incarnation();
491 : 0 : seq = 0;
492 : 0 : break;
493 : 0 : case GossipMessageType::IndirectAck:
494 : 0 : if (!env.has_indirect_ack())
495 : 0 : return false;
496 : 0 : sender = pb_endpoint_to_ep(env.indirect_ack().sender_endpoint());
497 : 0 : ping_target =
498 : 0 : pb_endpoint_to_ep(env.indirect_ack().original_requester());
499 : 0 : inc = env.indirect_ack().incarnation();
500 : 0 : seq = 0;
501 : 0 : break;
502 : 0 : case GossipMessageType::Join:
503 : 0 : if (!env.has_join())
504 : 0 : return false;
505 : 0 : sender = pb_endpoint_to_ep(env.join().sender_endpoint());
506 : 0 : inc = env.join().incarnation();
507 : 0 : seq = 0;
508 : 0 : break;
509 : 0 : case GossipMessageType::SyncRsp:
510 : : // SyncRsp is handled by decode_sync_rsp(), not this path
511 : 0 : return false;
512 : 0 : case GossipMessageType::Leave:
513 : 0 : if (!env.has_leave())
514 : 0 : return false;
515 : 0 : sender = pb_endpoint_to_ep(env.leave().sender_endpoint());
516 : 0 : inc = env.leave().incarnation();
517 : 0 : seq = 0;
518 : 0 : break;
519 : : }
520 : :
521 : 2 : return true;
522 : 2 : }
523 : :
524 : : // =============================================================================
525 : : // Wire protocol — encode_sync_rsp (protobuf)
526 : : // =============================================================================
527 : :
528 : : StreamBuffer
529 : 0 : GossipMembership::encode_sync_rsp(const std::vector<Member>& members) const {
530 : 0 : PbGossipSyncRsp rsp;
531 : 0 : ep_to_pb_endpoint(rsp.mutable_sender_endpoint(),
532 : 0 : config_.local_state.identity.endpoint);
533 : 0 : rsp.set_incarnation(incarnation_);
534 : 0 : for (const auto& m : members) {
535 : 0 : to_pb_member(rsp.add_members(), m);
536 : : }
537 : 0 : std::string serialized = rsp.SerializeAsString();
538 : 0 : return StreamBuffer(serialized.begin(), serialized.end());
539 : 0 : }
540 : :
541 : : // =============================================================================
542 : : // Wire protocol — decode_sync_rsp (protobuf)
543 : : // =============================================================================
544 : :
545 : 0 : bool GossipMembership::decode_sync_rsp(const StreamBuffer& data,
546 : : std::vector<Member>& members) const {
547 : 0 : PbGossipSyncRsp rsp;
548 : 0 : if (!rsp.ParseFromArray(data.data(), static_cast<int>(data.size())))
549 : 0 : return false;
550 : :
551 : 0 : members.clear();
552 : 0 : members.reserve(static_cast<size_t>(rsp.members_size()));
553 : 0 : for (const auto& pb_m : rsp.members()) {
554 : 0 : Member m = from_pb_member(pb_m);
555 : 0 : m.last_seen = std::chrono::steady_clock::now();
556 : 0 : members.push_back(std::move(m));
557 : 0 : }
558 : 0 : return true;
559 : 0 : }
560 : :
561 : : // =============================================================================
562 : : // Internal piggyback builder (called from member functions)
563 : : // =============================================================================
564 : :
565 : : // This helper is used by protocol_round(), handle_ping(), and send methods to
566 : : // assemble outgoing piggyback entries. It reads members_, config_, and
567 : : // needs_dissemination_ directly. The caller is responsible for clearing
568 : : // needs_dissemination_ after the piggyback is serialised.
569 : :
570 : : static std::vector<PiggybackEntry>
571 : 0 : build_piggyback_impl(const GossipConfig& config, uint64_t incarnation,
572 : : bool& needs_dissemination,
573 : : const std::unordered_map<EndPoint, Member>& members) {
574 : 0 : std::vector<PiggybackEntry> entries;
575 : :
576 : : // Self metadata — disseminate when announce() has been called.
577 : 0 : if (needs_dissemination) {
578 : 0 : PiggybackEntry meta;
579 : 0 : meta.type = PiggybackType::Metadata;
580 : 0 : meta.identity.endpoint = config.local_state.identity.endpoint;
581 : 0 : meta.incarnation = incarnation;
582 : 0 : meta.actor_types = config.local_state.actor_types;
583 : 0 : meta.load = 0;
584 : 0 : meta.identity.acceptors = config.local_state.identity.acceptors;
585 : 0 : entries.push_back(std::move(meta));
586 : 0 : }
587 : :
588 : : // Piggyback all Suspicious and Dead members so the cluster converges
589 : : // quickly.
590 : 0 : for (const auto& [ep, m] : members) {
591 : 0 : if (m.status == MemberStatus::Suspicious) {
592 : 0 : PiggybackEntry e;
593 : 0 : e.type = PiggybackType::Suspicious;
594 : 0 : e.identity.endpoint = ep;
595 : 0 : e.incarnation = m.incarnation;
596 : 0 : entries.push_back(std::move(e));
597 : 0 : } else if (m.status == MemberStatus::Dead) {
598 : 0 : PiggybackEntry e;
599 : 0 : e.type = PiggybackType::Dead;
600 : 0 : e.identity.endpoint = ep;
601 : 0 : e.incarnation = m.incarnation;
602 : 0 : entries.push_back(std::move(e));
603 : 0 : }
604 : : }
605 : :
606 : 0 : return entries;
607 : : }
608 : :
609 : : // =============================================================================
610 : : // Protocol round
611 : : // =============================================================================
612 : :
613 : 0 : void GossipMembership::protocol_round() {
614 : 0 : auto now = std::chrono::steady_clock::now();
615 : :
616 : : // ── 1. Pick peers and send Pings ──────────────────────────────────────
617 : : {
618 : 0 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
619 : :
620 : : // Collect Alive members excluding self.
621 : 0 : std::vector<EndPoint> alive;
622 : 0 : for (const auto& [ep, m] : members_) {
623 : 0 : if (m.status == MemberStatus::Alive &&
624 : 0 : !(ep == config_.local_state.identity.endpoint)) {
625 : 0 : alive.push_back(ep);
626 : : }
627 : : }
628 : :
629 : 0 : if (!alive.empty()) {
630 : 0 : auto targets = pick_random_peers(config_.fanout, {});
631 : : // pick_random_peers already handles the alive filtering; but it
632 : : // uses the shared_lock we already hold. To avoid double-locking
633 : : // we build the targets list above and pass it directly.
634 : :
635 : : // Build piggyback entries for outgoing Pings.
636 : 0 : auto pb = build_piggyback_impl(config_, incarnation_,
637 : 0 : needs_dissemination_, members_);
638 : : // Lock must be held while reading members_ for piggyback.
639 : :
640 : 0 : for (const auto& target : targets) {
641 : : StreamBuffer msg =
642 : 0 : encode_message(GossipMessageType::Ping, incarnation_, seq_no_++,
643 : 0 : config_.local_state.identity.endpoint, pb);
644 : 0 : async_udp_send(loop_, udp_socket_, msg, target);
645 : :
646 : : // Record pending ping.
647 : : // pending_pings_ is protected by members_mutex_ in our design.
648 : 0 : PendingPing pp;
649 : 0 : pp.expires_at = now + config_.ping_timeout;
650 : 0 : pp.indirect_requested = false;
651 : 0 : pending_pings_[target] = pp;
652 : 0 : }
653 : :
654 : : // Clear dissemination flag now that we've piggybacked our metadata.
655 : 0 : if (needs_dissemination_ && !pb.empty()) {
656 : 0 : needs_dissemination_ = false;
657 : : }
658 : 0 : }
659 : 0 : } // shared_lock released
660 : :
661 : : // ── 2. Check pending pings ─────────────────────────────────────────────
662 : : // Re-acquire unique lock because we may mutate pending_pings_ and members_.
663 : : {
664 : 0 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
665 : :
666 : 0 : std::vector<EndPoint> expired_pings;
667 : 0 : for (auto& [ep, pp] : pending_pings_) {
668 : 0 : if (now >= pp.expires_at) {
669 : 0 : expired_pings.push_back(ep);
670 : : }
671 : : }
672 : :
673 : 0 : for (const auto& target : expired_pings) {
674 : 0 : auto it = pending_pings_.find(target);
675 : 0 : if (it == pending_pings_.end())
676 : 0 : continue;
677 : 0 : auto& pp = it->second;
678 : :
679 : 0 : if (!pp.indirect_requested) {
680 : : // First expiry — request indirect probes.
681 : 0 : pp.indirect_requested = true;
682 : 0 : pp.indirect_expires_at = now + config_.ping_timeout;
683 : :
684 : : // Pick indirect probe peers: Alive, not self, not the target.
685 : : std::unordered_set<EndPoint> exclude{
686 : 0 : target, config_.local_state.identity.endpoint};
687 : 0 : auto probes = pick_random_peers(config_.indirect_probes, exclude);
688 : :
689 : 0 : if (probes.empty()) {
690 : : // 2-node cluster or no other peers available — skip
691 : : // indirect probes and immediately mark suspicious.
692 : 0 : mark_suspicious(target);
693 : 0 : pending_pings_.erase(target);
694 : : } else {
695 : 0 : for (const auto& proxy : probes) {
696 : 0 : send_ping_req(proxy, target);
697 : : }
698 : : }
699 : 0 : } else {
700 : : // Second expiry — indirect probes did not succeed.
701 : 0 : mark_suspicious(target);
702 : 0 : pending_pings_.erase(it);
703 : : }
704 : : }
705 : 0 : }
706 : :
707 : : // ── 3. Check suspicious members for timeout ────────────────────────────
708 : : {
709 : 0 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
710 : 0 : std::vector<EndPoint> to_mark_dead;
711 : 0 : for (auto& [ep, m] : members_) {
712 : 0 : if (m.status == MemberStatus::Suspicious) {
713 : 0 : if (now - m.last_seen > config_.suspicion_timeout) {
714 : 0 : to_mark_dead.push_back(ep);
715 : : }
716 : : }
717 : : }
718 : 0 : for (const auto& ep : to_mark_dead) {
719 : 0 : mark_dead(ep);
720 : : }
721 : 0 : }
722 : :
723 : : // ── 4. Purge old tombstones ────────────────────────────────────────────
724 : : {
725 : 0 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
726 : 0 : purge_dead_tombstones();
727 : 0 : }
728 : 0 : }
729 : :
730 : : // =============================================================================
731 : : // Packet handler — non-blocking recvfrom loop
732 : : // =============================================================================
733 : :
734 : 0 : void GossipMembership::handle_packet(const StreamBuffer& data,
735 : : const std::string& from_host,
736 : : uint16_t from_port) {
737 : : GossipMessageType type;
738 : 0 : EndPoint sender;
739 : : uint64_t inc;
740 : : uint32_t seq;
741 : 0 : EndPoint ping_target;
742 : 0 : std::vector<PiggybackEntry> pb;
743 : :
744 : 0 : if (!decode_message(data, type, sender, inc, seq, ping_target, pb)) {
745 : 0 : return; // Malformed or unknown message
746 : : }
747 : :
748 : 0 : switch (type) {
749 : 0 : case GossipMessageType::Ping:
750 : 0 : handle_ping(sender, inc, seq, std::move(pb), from_host, from_port);
751 : 0 : break;
752 : 0 : case GossipMessageType::Ack:
753 : 0 : handle_ack(sender, inc, std::move(pb));
754 : 0 : break;
755 : 0 : case GossipMessageType::PingReq:
756 : 0 : handle_ping_req(sender, ping_target);
757 : 0 : break;
758 : 0 : case GossipMessageType::IndirectAck:
759 : 0 : handle_indirect_ack(sender, ping_target);
760 : 0 : break;
761 : 0 : case GossipMessageType::Join:
762 : 0 : handle_join(sender, inc, std::move(pb));
763 : 0 : break;
764 : 0 : case GossipMessageType::SyncRsp: {
765 : : // SyncRsp uses a different wire format (decode_sync_rsp, not
766 : : // decode_message). The data after the standard header contains the
767 : : // SyncRsp payload. However, handle_sync_rsp expects a
768 : : // vector<Member>, so we delegate to the sender to re-encode.
769 : : // Actually the SyncRsp is carried in a regular message envelope —
770 : : // decode_message parsed the header, and the piggyback entries are
771 : : // not used. We extract the member list from the raw data by
772 : : // skipping the standard header.
773 : : //
774 : : // For SyncRsp, the sender includes the full table after the
775 : : // piggyback count. We re-parse from the raw data, but
776 : : // decode_message already consumed everything. Fortunately, SyncRsp
777 : : // uses a dedicated encode_sync_rsp / decode_sync_rsp code path. The
778 : : // incoming data is the *entire* SyncRsp payload (not the standard
779 : : // message envelope). The handle_packet caller (UDP read handler)
780 : : // passes the raw data, so for SyncRsp we decode differently.
781 : :
782 : : // Actually, looking at the wire design: Join is a standard message;
783 : : // the seed responds with SyncRsp which is a SEPARATE wire format
784 : : // (not through encode_message). But it arrives on the same UDP
785 : : // socket. We need a heuristic to distinguish standard messages from
786 : : // SyncRsp.
787 : :
788 : : // Try SyncRsp format first (no magic prefix, starts with 4B count):
789 : 0 : std::vector<Member> members;
790 : 0 : if (decode_sync_rsp(data, members)) {
791 : 0 : handle_sync_rsp(std::move(members));
792 : : }
793 : 0 : break;
794 : 0 : }
795 : 0 : case GossipMessageType::Leave:
796 : 0 : handle_leave(sender, inc);
797 : 0 : break;
798 : : }
799 : 0 : }
800 : :
801 : : // =============================================================================
802 : : // Message handlers
803 : : // =============================================================================
804 : :
805 : 0 : void GossipMembership::handle_ping(EndPoint sender, uint64_t inc, uint32_t /*seq*/,
806 : : std::vector<PiggybackEntry> pb,
807 : : const std::string& /*host*/, uint16_t /*port*/) {
808 : : // Merge the sender — a Ping proves they are alive.
809 : : {
810 : 0 : Member m;
811 : 0 : m.identity.endpoint = sender;
812 : 0 : m.incarnation = inc;
813 : 0 : m.status = MemberStatus::Alive;
814 : 0 : m.last_seen = std::chrono::steady_clock::now();
815 : 0 : merge_member(m);
816 : 0 : }
817 : :
818 : : // Apply piggyback entries from the ping.
819 : 0 : apply_piggyback(pb);
820 : :
821 : : // If we had a pending ping for the sender, clear it — they just proved
822 : : // they are alive by pinging us.
823 : : {
824 : 0 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
825 : 0 : pending_pings_.erase(sender);
826 : 0 : }
827 : :
828 : : // Build piggyback for the Ack response.
829 : 0 : std::vector<PiggybackEntry> ack_pb;
830 : : {
831 : 0 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
832 : 0 : ack_pb = build_piggyback_impl(config_, incarnation_,
833 : 0 : needs_dissemination_, members_);
834 : 0 : }
835 : 0 : if (!ack_pb.empty()) {
836 : 0 : needs_dissemination_ = false;
837 : : }
838 : :
839 : : // Send Ack with piggyback.
840 : : StreamBuffer ack_msg =
841 : 0 : encode_message(GossipMessageType::Ack, incarnation_, seq_no_++,
842 : 0 : config_.local_state.identity.endpoint, ack_pb);
843 : 0 : async_udp_send(loop_, udp_socket_, ack_msg, sender);
844 : 0 : }
845 : :
846 : 0 : void GossipMembership::handle_ack(EndPoint sender, uint64_t inc,
847 : : std::vector<PiggybackEntry> pb) {
848 : : // Merge the sender — an Ack proves they are alive.
849 : : {
850 : 0 : Member m;
851 : 0 : m.identity.endpoint = sender;
852 : 0 : m.incarnation = inc;
853 : 0 : m.status = MemberStatus::Alive;
854 : 0 : m.last_seen = std::chrono::steady_clock::now();
855 : 0 : merge_member(m);
856 : 0 : }
857 : :
858 : : // Check if this ack is for a forwarded PingReq.
859 : 0 : auto& ext = extras_for(this);
860 : 0 : auto fwd_it = ext.forwarded_pings.find(sender);
861 : 0 : if (fwd_it != ext.forwarded_pings.end()) {
862 : : // We forwarded a Ping on behalf of fwd_it->second.
863 : : // The target (sender) responded, so send IndirectAck to the original
864 : : // requester.
865 : 0 : send_indirect_ack(fwd_it->second, sender);
866 : 0 : ext.forwarded_pings.erase(fwd_it);
867 : : }
868 : :
869 : : // Clear any pending ping for this sender — the probe succeeded.
870 : : {
871 : 0 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
872 : 0 : pending_pings_.erase(sender);
873 : 0 : }
874 : :
875 : : // Apply piggyback entries.
876 : 0 : apply_piggyback(pb);
877 : 0 : }
878 : :
879 : 0 : void GossipMembership::handle_ping_req(EndPoint sender, EndPoint target) {
880 : : // Merge the requester.
881 : : {
882 : 0 : Member m;
883 : 0 : m.identity.endpoint = sender;
884 : 0 : m.status = MemberStatus::Alive;
885 : 0 : m.last_seen = std::chrono::steady_clock::now();
886 : 0 : merge_member(m);
887 : 0 : }
888 : :
889 : : // Forward the ping on behalf of the requester.
890 : 0 : auto& ext = extras_for(this);
891 : 0 : ext.forwarded_pings[target] = sender;
892 : :
893 : : // Build piggyback and send Ping to target.
894 : 0 : std::vector<PiggybackEntry> pb;
895 : : {
896 : 0 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
897 : 0 : pb = build_piggyback_impl(config_, incarnation_, needs_dissemination_,
898 : 0 : members_);
899 : 0 : }
900 : :
901 : : StreamBuffer msg =
902 : 0 : encode_message(GossipMessageType::Ping, incarnation_, seq_no_++,
903 : 0 : config_.local_state.identity.endpoint, pb);
904 : 0 : async_udp_send(loop_, udp_socket_, msg, target);
905 : 0 : }
906 : :
907 : 0 : void GossipMembership::handle_indirect_ack(EndPoint sender, EndPoint target) {
908 : : // Merge the indirect ack sender.
909 : : {
910 : 0 : Member m;
911 : 0 : m.identity.endpoint = sender;
912 : 0 : m.status = MemberStatus::Alive;
913 : 0 : m.last_seen = std::chrono::steady_clock::now();
914 : 0 : merge_member(m);
915 : 0 : }
916 : :
917 : : // If we have a pending ping for target with indirect_requested=true,
918 : : // the indirect probe succeeded — clear the pending ping.
919 : : {
920 : 0 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
921 : 0 : auto it = pending_pings_.find(target);
922 : 0 : if (it != pending_pings_.end() && it->second.indirect_requested) {
923 : 0 : pending_pings_.erase(it);
924 : : }
925 : 0 : }
926 : 0 : }
927 : :
928 : 0 : void GossipMembership::handle_join(EndPoint sender, uint64_t inc,
929 : : std::vector<PiggybackEntry> pb) {
930 : : // Merge the joining node.
931 : : {
932 : 0 : Member m;
933 : 0 : m.identity.endpoint = sender;
934 : 0 : m.incarnation = inc;
935 : 0 : m.status = MemberStatus::Alive;
936 : 0 : m.last_seen = std::chrono::steady_clock::now();
937 : 0 : merge_member(m);
938 : 0 : HPACTOR_LOG_INFO(log::LogCategory::kDiscovery, ActorId{0},
939 : : static_cast<uint32_t>(log::LogEventId::kDiscoveryNodeJoined),
940 : : "discovery node joined");
941 : 0 : }
942 : :
943 : 0 : apply_piggyback(pb);
944 : :
945 : : // Send full membership table (excluding Dead and Left) as SyncRsp.
946 : 0 : send_sync_rsp(sender);
947 : 0 : }
948 : :
949 : 0 : void GossipMembership::handle_sync_rsp(std::vector<Member> members) {
950 : : // Merge all received members into our table.
951 : 0 : for (auto& m : members) {
952 : 0 : merge_member(m);
953 : : }
954 : :
955 : : // Cancel any pending join retry — we have successfully joined.
956 : 0 : auto& ext = extras_for(this);
957 : 0 : if (ext.join_retry_timer != 0 && loop_) {
958 : 0 : loop_->cancel_timer(ext.join_retry_timer);
959 : 0 : ext.join_retry_timer = 0;
960 : : }
961 : :
962 : : // Ensure self is still in the table (should always be, but be defensive).
963 : : {
964 : 0 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
965 : 0 : if (members_.find(config_.local_state.identity.endpoint) == members_.end()) {
966 : 0 : Member self = config_.local_state;
967 : 0 : self.incarnation = incarnation_;
968 : 0 : self.status = MemberStatus::Alive;
969 : 0 : self.last_seen = std::chrono::steady_clock::now();
970 : 0 : members_[self.identity.endpoint] = std::move(self);
971 : 0 : }
972 : 0 : }
973 : 0 : }
974 : :
975 : 0 : void GossipMembership::handle_leave(EndPoint sender, uint64_t inc) {
976 : 0 : Member member_to_fire;
977 : 0 : bool should_fire = false;
978 : :
979 : : {
980 : 0 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
981 : 0 : auto it = members_.find(sender);
982 : 0 : if (it != members_.end()) {
983 : : // Only accept if incarnation is >= what we have.
984 : 0 : if (inc >= it->second.incarnation) {
985 : 0 : it->second.status = MemberStatus::Left;
986 : 0 : it->second.last_seen = std::chrono::steady_clock::now();
987 : 0 : if (member_change_cb_) {
988 : 0 : member_to_fire = it->second;
989 : 0 : should_fire = true;
990 : : }
991 : : }
992 : : }
993 : 0 : }
994 : :
995 : 0 : if (should_fire) {
996 : 0 : member_change_cb_(member_to_fire, false);
997 : : }
998 : 0 : }
999 : :
1000 : : // =============================================================================
1001 : : // Message sending
1002 : : // =============================================================================
1003 : :
1004 : 0 : void GossipMembership::send_ping(EndPoint target) {
1005 : 0 : std::vector<PiggybackEntry> pb;
1006 : : {
1007 : 0 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
1008 : 0 : pb = build_piggyback_impl(config_, incarnation_, needs_dissemination_,
1009 : 0 : members_);
1010 : 0 : }
1011 : : StreamBuffer msg =
1012 : 0 : encode_message(GossipMessageType::Ping, incarnation_, seq_no_++,
1013 : 0 : config_.local_state.identity.endpoint, pb);
1014 : 0 : async_udp_send(loop_, udp_socket_, msg, target);
1015 : 0 : }
1016 : :
1017 : 0 : void GossipMembership::send_ack(EndPoint target, std::vector<PiggybackEntry> pb) {
1018 : : StreamBuffer msg =
1019 : 0 : encode_message(GossipMessageType::Ack, incarnation_, seq_no_++,
1020 : 0 : config_.local_state.identity.endpoint, pb);
1021 : 0 : async_udp_send(loop_, udp_socket_, msg, target);
1022 : 0 : }
1023 : :
1024 : 0 : void GossipMembership::send_ping_req(EndPoint proxy, EndPoint target) {
1025 : : // Build minimal piggyback for PingReq.
1026 : 0 : std::vector<PiggybackEntry> pb;
1027 : : {
1028 : 0 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
1029 : 0 : pb = build_piggyback_impl(config_, incarnation_, needs_dissemination_,
1030 : 0 : members_);
1031 : 0 : }
1032 : : StreamBuffer msg = encode_message(GossipMessageType::PingReq, incarnation_,
1033 : 0 : seq_no_++, target, pb);
1034 : 0 : async_udp_send(loop_, udp_socket_, msg, proxy);
1035 : 0 : }
1036 : :
1037 : 0 : void GossipMembership::send_indirect_ack(EndPoint target, EndPoint orig_target) {
1038 : : // IndirectAck carries the original target in the ping_target field.
1039 : 0 : std::vector<PiggybackEntry> pb; // empty piggyback for indirect ack
1040 : : StreamBuffer msg = encode_message(GossipMessageType::IndirectAck,
1041 : 0 : incarnation_, seq_no_++, orig_target, pb);
1042 : 0 : async_udp_send(loop_, udp_socket_, msg, target);
1043 : 0 : }
1044 : :
1045 : 0 : void GossipMembership::send_join(EndPoint seed) {
1046 : : // Join includes self metadata as piggyback.
1047 : 0 : std::vector<PiggybackEntry> pb;
1048 : : {
1049 : 0 : PiggybackEntry meta;
1050 : 0 : meta.type = PiggybackType::Metadata;
1051 : 0 : meta.identity.endpoint = config_.local_state.identity.endpoint;
1052 : 0 : meta.incarnation = incarnation_;
1053 : 0 : meta.actor_types = config_.local_state.actor_types;
1054 : 0 : meta.identity.acceptors = config_.local_state.identity.acceptors;
1055 : 0 : pb.push_back(std::move(meta));
1056 : 0 : }
1057 : : StreamBuffer msg =
1058 : 0 : encode_message(GossipMessageType::Join, incarnation_, seq_no_++,
1059 : 0 : config_.local_state.identity.endpoint, pb);
1060 : 0 : async_udp_send(loop_, udp_socket_, msg, seed);
1061 : 0 : }
1062 : :
1063 : 0 : void GossipMembership::send_sync_rsp(EndPoint target) {
1064 : : // Collect non-Dead, non-Left members for the sync response.
1065 : 0 : std::vector<Member> table;
1066 : : {
1067 : 0 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
1068 : 0 : for (const auto& [ep, m] : members_) {
1069 : 0 : if (m.status != MemberStatus::Dead && m.status != MemberStatus::Left) {
1070 : 0 : table.push_back(m);
1071 : : }
1072 : : }
1073 : 0 : }
1074 : 0 : StreamBuffer data = encode_sync_rsp(table);
1075 : 0 : async_udp_send(loop_, udp_socket_, data, target);
1076 : 0 : }
1077 : :
1078 : 18 : void GossipMembership::send_leave(EndPoint target) {
1079 : 18 : std::vector<PiggybackEntry> pb; // empty piggyback for leave
1080 : : StreamBuffer msg =
1081 : 18 : encode_message(GossipMessageType::Leave, incarnation_, seq_no_++,
1082 : 18 : config_.local_state.identity.endpoint, pb);
1083 : 18 : async_udp_send(loop_, udp_socket_, msg, target);
1084 : 18 : }
1085 : :
1086 : : // =============================================================================
1087 : : // State mutation
1088 : : // =============================================================================
1089 : :
1090 : 1 : void GossipMembership::mark_suspicious(EndPoint ep) {
1091 : 1 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
1092 : 1 : auto it = members_.find(ep);
1093 : 1 : if (it != members_.end()) {
1094 : 1 : it->second.status = MemberStatus::Suspicious;
1095 : 1 : it->second.last_seen = std::chrono::steady_clock::now();
1096 : 1 : HPACTOR_LOG_WARNING(log::LogCategory::kDiscovery, ActorId{0}, 0,
1097 : : "discovery node suspected");
1098 : : }
1099 : 1 : }
1100 : :
1101 : 1 : void GossipMembership::mark_dead(EndPoint ep) {
1102 : 1 : Member member_to_fire;
1103 : 1 : bool should_fire = false;
1104 : :
1105 : : {
1106 : 1 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
1107 : 1 : auto it = members_.find(ep);
1108 : 1 : if (it != members_.end()) {
1109 : 1 : it->second.status = MemberStatus::Dead;
1110 : 1 : it->second.last_seen = std::chrono::steady_clock::now();
1111 : 1 : HPACTOR_LOG_ERROR(
1112 : : log::LogCategory::kDiscovery, ActorId{0},
1113 : : static_cast<uint32_t>(log::LogEventId::kDiscoveryNodeDead),
1114 : : "discovery node dead");
1115 : 1 : if (member_change_cb_) {
1116 : 0 : member_to_fire = it->second;
1117 : 0 : should_fire = true;
1118 : : }
1119 : : }
1120 : 1 : }
1121 : :
1122 : 1 : if (should_fire) {
1123 : 0 : member_change_cb_(member_to_fire, false);
1124 : : }
1125 : 1 : }
1126 : :
1127 : 8 : void GossipMembership::merge_member(const Member& remote) {
1128 : 8 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
1129 : :
1130 : 8 : auto it = members_.find(remote.identity.endpoint);
1131 : 8 : if (it == members_.end()) {
1132 : : // First time seeing this endpoint — insert.
1133 : 5 : members_[remote.identity.endpoint] = remote;
1134 : 5 : members_[remote.identity.endpoint].last_seen =
1135 : 5 : std::chrono::steady_clock::now();
1136 : 5 : return;
1137 : : }
1138 : :
1139 : 3 : auto& existing = it->second;
1140 : :
1141 : : // Only accept updates with a strictly higher incarnation.
1142 : 3 : if (remote.incarnation <= existing.incarnation) {
1143 : 1 : return; // Stale information
1144 : : }
1145 : :
1146 : : // Higher incarnation: accept the update.
1147 : : // Special case: Dead member with higher incarnation → reactivate.
1148 : 2 : if (existing.status == MemberStatus::Dead &&
1149 : 1 : remote.incarnation > existing.incarnation) {
1150 : : // The node restarted with a higher incarnation — reactivate to Alive.
1151 : 1 : existing.status = MemberStatus::Alive;
1152 : : }
1153 : :
1154 : : // Update fields from remote that carry meaningful information.
1155 : 2 : existing.incarnation = remote.incarnation;
1156 : 2 : if (remote.status != MemberStatus::Alive ||
1157 : 2 : existing.status == MemberStatus::Dead) {
1158 : : // Accept status from remote unless we already have it as Dead and it's
1159 : : // not a reactivation.
1160 : : }
1161 : 2 : if (remote.status == MemberStatus::Suspicious ||
1162 : 2 : remote.status == MemberStatus::Dead || remote.status == MemberStatus::Left) {
1163 : 0 : existing.status = remote.status;
1164 : : }
1165 : :
1166 : : // Accept metadata if present.
1167 : 2 : if (!remote.actor_types.empty()) {
1168 : 0 : existing.actor_types = remote.actor_types;
1169 : : }
1170 : 2 : if (!remote.identity.host.empty()) {
1171 : 2 : existing.identity.host = remote.identity.host;
1172 : : }
1173 : 2 : if (!remote.identity.acceptors.empty()) {
1174 : 0 : existing.identity.acceptors = remote.identity.acceptors;
1175 : : }
1176 : :
1177 : 2 : existing.last_seen = std::chrono::steady_clock::now();
1178 : 8 : }
1179 : :
1180 : 0 : void GossipMembership::apply_piggyback(const std::vector<PiggybackEntry>& entries) {
1181 : 0 : std::unique_lock<std::shared_mutex> lock(members_mutex_);
1182 : :
1183 : 0 : for (const auto& entry : entries) {
1184 : 0 : auto it = members_.find(entry.identity.endpoint);
1185 : 0 : if (it == members_.end()) {
1186 : : // We don't know this endpoint — insert it with the piggyback info.
1187 : 0 : Member m;
1188 : 0 : m.identity.endpoint = entry.identity.endpoint;
1189 : 0 : m.incarnation = entry.incarnation;
1190 : 0 : m.last_seen = std::chrono::steady_clock::now();
1191 : :
1192 : 0 : switch (entry.type) {
1193 : 0 : case PiggybackType::Alive:
1194 : 0 : m.status = MemberStatus::Alive;
1195 : 0 : break;
1196 : 0 : case PiggybackType::Suspicious:
1197 : 0 : m.status = MemberStatus::Suspicious;
1198 : 0 : break;
1199 : 0 : case PiggybackType::Dead:
1200 : 0 : m.status = MemberStatus::Dead;
1201 : 0 : break;
1202 : 0 : case PiggybackType::Metadata:
1203 : 0 : m.status = MemberStatus::Alive;
1204 : 0 : m.actor_types = entry.actor_types;
1205 : 0 : m.identity.acceptors = entry.identity.acceptors;
1206 : 0 : break;
1207 : : }
1208 : 0 : members_[entry.identity.endpoint] = std::move(m);
1209 : 0 : continue;
1210 : 0 : }
1211 : :
1212 : 0 : auto& existing = it->second;
1213 : :
1214 : : // Only apply if the piggyback incarnation is >= what we have.
1215 : 0 : if (entry.incarnation < existing.incarnation) {
1216 : 0 : continue; // Stale
1217 : : }
1218 : :
1219 : 0 : existing.incarnation = entry.incarnation;
1220 : 0 : existing.last_seen = std::chrono::steady_clock::now();
1221 : :
1222 : 0 : switch (entry.type) {
1223 : 0 : case PiggybackType::Alive:
1224 : 0 : if (existing.status == MemberStatus::Suspicious ||
1225 : 0 : existing.status == MemberStatus::Dead) {
1226 : 0 : existing.status = MemberStatus::Alive;
1227 : : }
1228 : 0 : break;
1229 : 0 : case PiggybackType::Suspicious:
1230 : 0 : existing.status = MemberStatus::Suspicious;
1231 : 0 : break;
1232 : 0 : case PiggybackType::Dead:
1233 : 0 : existing.status = MemberStatus::Dead;
1234 : 0 : break;
1235 : 0 : case PiggybackType::Metadata:
1236 : 0 : if (!entry.actor_types.empty()) {
1237 : 0 : existing.actor_types = entry.actor_types;
1238 : : }
1239 : 0 : if (!entry.identity.acceptors.empty()) {
1240 : 0 : existing.identity.acceptors = entry.identity.acceptors;
1241 : : }
1242 : 0 : break;
1243 : : }
1244 : : }
1245 : 0 : }
1246 : :
1247 : 1 : void GossipMembership::purge_dead_tombstones() {
1248 : 1 : auto now = std::chrono::steady_clock::now();
1249 : :
1250 : 5 : for (auto it = members_.begin(); it != members_.end();) {
1251 : 4 : const auto& m = it->second;
1252 : :
1253 : 6 : if ((m.status == MemberStatus::Dead || m.status == MemberStatus::Left) &&
1254 : 6 : now - m.last_seen > config_.dead_timeout) {
1255 : 2 : it = members_.erase(it);
1256 : 2 : HPACTOR_LOG_DEBUG(log::LogCategory::kDiscovery, ActorId{0}, 0,
1257 : : "discovery cache purged");
1258 : : } else {
1259 : 2 : ++it;
1260 : : }
1261 : : }
1262 : 1 : }
1263 : :
1264 : : std::vector<EndPoint>
1265 : 2 : GossipMembership::pick_random_peers(size_t count,
1266 : : std::unordered_set<EndPoint> exclude) {
1267 : 2 : auto& ext = extras_for(this);
1268 : :
1269 : 2 : std::shared_lock<std::shared_mutex> lock(members_mutex_);
1270 : :
1271 : : // Collect Alive peers excluding self and explicitly excluded endpoints.
1272 : 2 : std::vector<EndPoint> candidates;
1273 : 7 : for (const auto& [ep, m] : members_) {
1274 : 5 : if (m.status != MemberStatus::Alive)
1275 : 0 : continue;
1276 : 5 : if (ep == config_.local_state.identity.endpoint)
1277 : 2 : continue; // exclude self
1278 : 3 : if (exclude.find(ep) != exclude.end())
1279 : 0 : continue;
1280 : 3 : candidates.push_back(ep);
1281 : : }
1282 : :
1283 : 2 : if (candidates.empty())
1284 : 1 : return {};
1285 : 1 : if (candidates.size() <= count)
1286 : 1 : return candidates;
1287 : :
1288 : : // Shuffle and pick the first 'count' elements.
1289 : 0 : std::shuffle(candidates.begin(), candidates.end(), ext.rng);
1290 : 0 : candidates.resize(count);
1291 : 0 : return candidates;
1292 : 2 : }
1293 : :
1294 : : // =============================================================================
1295 : : // Socket setup/teardown
1296 : : // =============================================================================
1297 : :
1298 : 0 : void GossipMembership::setup_udp_socket() {
1299 : 0 : udp_socket_ = socket(AF_INET, SOCK_DGRAM, 0);
1300 : 0 : if (udp_socket_ < 0)
1301 : 0 : return;
1302 : :
1303 : 0 : int reuse = 1;
1304 : 0 : setsockopt(udp_socket_, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
1305 : :
1306 : : struct sockaddr_in addr;
1307 : 0 : memset(&addr, 0, sizeof(addr));
1308 : 0 : addr.sin_family = AF_INET;
1309 : 0 : addr.sin_addr.s_addr = INADDR_ANY;
1310 : 0 : addr.sin_port = htons(config_.gossip_port);
1311 : :
1312 : 0 : if (bind(udp_socket_, reinterpret_cast<struct sockaddr*>(&addr),
1313 : 0 : sizeof(addr)) < 0) {
1314 : 0 : close(udp_socket_);
1315 : 0 : udp_socket_ = -1;
1316 : 0 : return;
1317 : : }
1318 : :
1319 : 0 : if (loop_) {
1320 : 0 : loop_->add_fd(udp_socket_, EventLoop::Event::Read);
1321 : 0 : loop_->set_read_handler(udp_socket_, [this](int /*fd*/) {
1322 : : // Non-blocking recvfrom loop (edge-triggered).
1323 : : struct sockaddr_in src_addr;
1324 : 0 : socklen_t src_addr_len = sizeof(src_addr);
1325 : :
1326 : : while (true) {
1327 : 0 : ssize_t n = recvfrom(udp_socket_, recv_buffer_.data(),
1328 : 0 : recv_buffer_.size(), MSG_DONTWAIT,
1329 : : reinterpret_cast<struct sockaddr*>(&src_addr),
1330 : : &src_addr_len);
1331 : 0 : if (n <= 0)
1332 : 0 : break;
1333 : :
1334 : 0 : StreamBuffer data(recv_buffer_.data(),
1335 : 0 : recv_buffer_.data() + static_cast<size_t>(n));
1336 : :
1337 : 0 : std::string from_host;
1338 : 0 : uint16_t from_port = 0;
1339 : : char ip_str[INET_ADDRSTRLEN];
1340 : 0 : if (inet_ntop(AF_INET, &src_addr.sin_addr, ip_str, sizeof(ip_str))) {
1341 : 0 : from_host = ip_str;
1342 : : }
1343 : 0 : from_port = ntohs(src_addr.sin_port);
1344 : :
1345 : 0 : handle_packet(data, from_host, from_port);
1346 : 0 : }
1347 : 0 : });
1348 : : }
1349 : : }
1350 : :
1351 : 25 : void GossipMembership::teardown_udp_socket() {
1352 : 25 : if (udp_socket_ >= 0) {
1353 : 0 : if (loop_) {
1354 : 0 : loop_->clear_read_handler(udp_socket_);
1355 : 0 : loop_->remove_fd(udp_socket_);
1356 : : }
1357 : 0 : close(udp_socket_);
1358 : 0 : udp_socket_ = -1;
1359 : : }
1360 : 25 : }
1361 : :
1362 : : } // namespace hpactor::net
|