LCOV - code coverage report
Current view: top level - src/net - gossip_membership.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 27.0 % 821 222
Test Date: 2026-05-20 02:24:49 Functions: 44.0 % 50 22
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/net/gossip_membership.hpp>
      16                 :             : 
      17                 :             : #include <hpactor/gossip.pb.h>
      18                 :             : #include <hpactor/log/logger.hpp>
      19                 :             : #include <hpactor/net/registrar.hpp> // for endpoint_ops, HostResolver
      20                 :             : 
      21                 :             : #include <arpa/inet.h>
      22                 :             : #include <netinet/in.h>
      23                 :             : #include <sys/socket.h>
      24                 :             : #include <unistd.h>
      25                 :             : 
      26                 :             : #include <algorithm>
      27                 :             : #include <cstring>
      28                 :             : #include <random>
      29                 :             : 
      30                 :             : namespace hpactor::net {
      31                 :             : 
      32                 :             : // =============================================================================
      33                 :             : // Internal helpers (anonymous namespace)
      34                 :             : // =============================================================================
      35                 :             : namespace {
      36                 :             : 
      37                 :             : // ---- Endpoint conversion to/from PbEndpoint ----
      38                 :             : 
      39                 :          22 : void ep_to_pb_endpoint(PbEndpoint* pb, const EndPoint& ep) {
      40                 :          22 :     if (auto* ipv4 = std::get_if<Ipv4Endpoint>(&ep)) {
      41                 :          22 :         auto* pb4 = pb->mutable_ipv4();
      42                 :          22 :         pb4->set_addr(ipv4->addr);
      43                 :          22 :         pb4->set_port(ipv4->port());
      44                 :           0 :     } else if (auto* ipv6 = std::get_if<Ipv6Endpoint>(&ep)) {
      45                 :           0 :         auto* pb6 = pb->mutable_ipv6();
      46                 :           0 :         pb6->set_addr(
      47                 :           0 :             std::string(reinterpret_cast<const char*>(ipv6->addr.data()), 16));
      48                 :           0 :         pb6->set_port(ipv6->port());
      49                 :             :     }
      50                 :          22 : }
      51                 :             : 
      52                 :           4 : EndPoint pb_endpoint_to_ep(const PbEndpoint& pb) {
      53                 :           4 :     if (pb.has_ipv4()) {
      54                 :           4 :         const auto& pb4 = pb.ipv4();
      55                 :           4 :         return Ipv4Endpoint{pb4.addr(), htons(static_cast<uint16_t>(pb4.port()))};
      56                 :             :     }
      57                 :           0 :     if (pb.has_ipv6()) {
      58                 :           0 :         const auto& pb6 = pb.ipv6();
      59                 :           0 :         std::array<uint8_t, 16> addr{};
      60                 :           0 :         if (pb6.addr().size() == 16) {
      61                 :           0 :             std::memcpy(addr.data(), pb6.addr().data(), 16);
      62                 :             :         }
      63                 :           0 :         return Ipv6Endpoint{addr, htons(static_cast<uint16_t>(pb6.port()))};
      64                 :             :     }
      65                 :           0 :     return Ipv4Endpoint{0, 0};
      66                 :             : }
      67                 :             : 
      68                 :             : // ---- Piggyback entry conversion to/from protobuf ----
      69                 :             : 
      70                 :           2 : void to_pb_piggyback(PbPiggybackEntry* pb, const PiggybackEntry& entry) {
      71                 :           2 :     pb->set_type(static_cast<PbPiggybackType>(entry.type));
      72                 :           2 :     ep_to_pb_endpoint(pb->mutable_endpoint(), entry.identity.endpoint);
      73                 :           2 :     pb->set_incarnation(entry.incarnation);
      74                 :           4 :     for (const auto& s : entry.actor_types)
      75                 :           2 :         pb->add_actor_types(s);
      76                 :           2 :     pb->set_load(entry.load);
      77                 :           3 :     for (const auto& acc : entry.identity.acceptors) {
      78                 :           1 :         auto* a = pb->add_acceptors();
      79                 :           1 :         a->set_port(acc.port);
      80                 :           1 :         a->set_handshake_version(acc.handshake_version);
      81                 :           1 :         a->set_protocol_version(acc.protocol_version);
      82                 :           1 :         a->set_tls_required(acc.tls_required);
      83                 :             :     }
      84                 :           2 : }
      85                 :             : 
      86                 :           2 : PiggybackEntry from_pb_piggyback(const PbPiggybackEntry& pb) {
      87                 :           2 :     PiggybackEntry entry;
      88                 :           2 :     entry.type = static_cast<PiggybackType>(pb.type());
      89                 :           2 :     entry.identity.endpoint = pb_endpoint_to_ep(pb.endpoint());
      90                 :           2 :     entry.incarnation = pb.incarnation();
      91                 :           4 :     for (const auto& s : pb.actor_types())
      92                 :           2 :         entry.actor_types.push_back(s);
      93                 :           2 :     entry.load = pb.load();
      94                 :           3 :     for (const auto& a : pb.acceptors()) {
      95                 :           1 :         AcceptorInfo acc;
      96                 :           1 :         acc.port = static_cast<uint16_t>(a.port());
      97                 :           1 :         acc.handshake_version = static_cast<uint8_t>(a.handshake_version());
      98                 :           1 :         acc.protocol_version = static_cast<uint8_t>(a.protocol_version());
      99                 :           1 :         acc.tls_required = a.tls_required();
     100                 :           1 :         entry.identity.acceptors.push_back(acc);
     101                 :             :     }
     102                 :           2 :     return entry;
     103                 :             : }
     104                 :             : 
     105                 :             : // ---- Member conversion to/from protobuf ----
     106                 :             : 
     107                 :           0 : void to_pb_member(PbGossipMember* pb, const Member& m) {
     108                 :           0 :     ep_to_pb_endpoint(pb->mutable_endpoint(), m.identity.endpoint);
     109                 :           0 :     pb->set_host(m.identity.host);
     110                 :           0 :     pb->set_uds_path(m.identity.uds_path);
     111                 :           0 :     pb->set_incarnation(m.incarnation);
     112                 :           0 :     pb->set_status(static_cast<uint32_t>(m.status));
     113                 :           0 :     for (const auto& s : m.actor_types)
     114                 :           0 :         pb->add_actor_types(s);
     115                 :           0 :     for (const auto& acc : m.identity.acceptors) {
     116                 :           0 :         auto* a = pb->add_acceptors();
     117                 :           0 :         a->set_port(acc.port);
     118                 :           0 :         a->set_handshake_version(acc.handshake_version);
     119                 :           0 :         a->set_protocol_version(acc.protocol_version);
     120                 :           0 :         a->set_tls_required(acc.tls_required);
     121                 :             :     }
     122                 :           0 : }
     123                 :             : 
     124                 :           0 : Member from_pb_member(const PbGossipMember& pb) {
     125                 :           0 :     Member m;
     126                 :           0 :     m.identity.endpoint = pb_endpoint_to_ep(pb.endpoint());
     127                 :           0 :     m.identity.host = pb.host();
     128                 :           0 :     m.identity.uds_path = pb.uds_path();
     129                 :           0 :     m.incarnation = pb.incarnation();
     130                 :           0 :     m.status = static_cast<MemberStatus>(pb.status());
     131                 :           0 :     for (const auto& s : pb.actor_types())
     132                 :           0 :         m.actor_types.push_back(s);
     133                 :           0 :     for (const auto& a : pb.acceptors()) {
     134                 :           0 :         AcceptorInfo acc;
     135                 :           0 :         acc.port = static_cast<uint16_t>(a.port());
     136                 :           0 :         acc.handshake_version = static_cast<uint8_t>(a.handshake_version());
     137                 :           0 :         acc.protocol_version = static_cast<uint8_t>(a.protocol_version());
     138                 :           0 :         acc.tls_required = a.tls_required();
     139                 :           0 :         m.identity.acceptors.push_back(acc);
     140                 :             :     }
     141                 :           0 :     return m;
     142                 :             : }
     143                 :             : 
     144                 :             : // ---- Async UDP send via EventLoop ----
     145                 :             : // Uses the EventLoop's async_sendto (non-blocking, completion-driven).
     146                 :             : // Falls back to blocking sendto when no EventLoop is available.
     147                 :             : 
     148                 :          18 : void async_udp_send(EventLoop* loop, int sock, const StreamBuffer& data,
     149                 :             :                     const EndPoint& dest) {
     150                 :          18 :     if (sock < 0 || data.empty())
     151                 :          18 :         return;
     152                 :             : 
     153                 :           0 :     if (loop) {
     154                 :             :         struct iovec iov;
     155                 :           0 :         iov.iov_base = const_cast<uint8_t*>(data.data());
     156                 :           0 :         iov.iov_len = data.size();
     157                 :             : 
     158                 :           0 :         sockaddr_in addr{};
     159                 :           0 :         addr.sin_family = AF_INET;
     160                 :           0 :         if (auto* ipv4 = std::get_if<Ipv4Endpoint>(&dest)) {
     161                 :           0 :             addr.sin_addr.s_addr = ipv4->addr;
     162                 :           0 :             addr.sin_port = ipv4->port_nw;
     163                 :             :         }
     164                 :             : 
     165                 :           0 :         loop->backend()->async_sendto(
     166                 :             :             sock, &iov, 1, reinterpret_cast<const sockaddr*>(&addr),
     167                 :             :             sizeof(addr), ActorId(0), static_cast<uint32_t>(OpType::SendTo));
     168                 :             :     } else {
     169                 :             :         // Legacy fallback: blocking sendto
     170                 :           0 :         sockaddr_in addr{};
     171                 :           0 :         addr.sin_family = AF_INET;
     172                 :           0 :         if (auto* ipv4 = std::get_if<Ipv4Endpoint>(&dest)) {
     173                 :           0 :             addr.sin_addr.s_addr = ipv4->addr;
     174                 :           0 :             addr.sin_port = ipv4->port_nw;
     175                 :             :         }
     176                 :           0 :         sendto(sock, data.data(), data.size(), 0,
     177                 :             :                reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr));
     178                 :             :     }
     179                 :             : }
     180                 :             : 
     181                 :             : // ---- Per-instance state (RNG, join retry, forwarded pings) ----
     182                 :             : 
     183                 :             : struct InstanceExtras {
     184                 :             :     std::mt19937 rng;
     185                 :             :     bool rng_seeded = false;
     186                 :             :     size_t join_seed_index = 0;
     187                 :             :     uint64_t join_retry_timer = 0;
     188                 :             :     std::unordered_map<EndPoint, EndPoint> forwarded_pings;
     189                 :             : };
     190                 :             : 
     191                 :             : static std::unordered_map<const GossipMembership*, InstanceExtras> s_extras;
     192                 :             : 
     193                 :          27 : static InstanceExtras& extras_for(const GossipMembership* self) {
     194                 :          27 :     auto& extras = s_extras[self];
     195                 :          27 :     if (!extras.rng_seeded) {
     196                 :          25 :         std::random_device rd;
     197                 :          25 :         extras.rng.seed(rd());
     198                 :          25 :         extras.rng_seeded = true;
     199                 :          25 :     }
     200                 :          27 :     return extras;
     201                 :             : }
     202                 :             : 
     203                 :          25 : static void cleanup_extras(const GossipMembership* self) {
     204                 :          25 :     s_extras.erase(self);
     205                 :          25 : }
     206                 :             : 
     207                 :             : } // anonymous namespace
     208                 :             : 
     209                 :             : // =============================================================================
     210                 :             : // GossipMembership — Constructor / Destructor
     211                 :             : // =============================================================================
     212                 :             : 
     213                 :          20 : GossipMembership::GossipMembership(const GossipConfig& cfg, EventLoop* loop)
     214                 :          40 :     : config_(cfg), loop_(loop), incarnation_(1) // Start at 1 so 0 means "no
     215                 :             :                                                  // incarnation"
     216                 :             :       ,
     217                 :          60 :       recv_buffer_(kGossipMaxMsgSize) {}
     218                 :             : 
     219                 :          20 : GossipMembership::~GossipMembership() {
     220                 :          20 :     stop();
     221                 :          20 : }
     222                 :             : 
     223                 :             : // =============================================================================
     224                 :             : // IServiceDiscovery — lifecycle
     225                 :             : // =============================================================================
     226                 :             : 
     227                 :           0 : void GossipMembership::start() {
     228                 :             :     // Incarnation is a wall-clock timestamp so that restarted nodes
     229                 :             :     // automatically have a higher incarnation than any previous run.
     230                 :           0 :     incarnation_ = static_cast<uint64_t>(
     231                 :           0 :         std::chrono::system_clock::now().time_since_epoch().count());
     232                 :             : 
     233                 :           0 :     setup_udp_socket();
     234                 :             : 
     235                 :             :     // Add self to the membership table.
     236                 :             :     {
     237                 :           0 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
     238                 :           0 :         Member self = config_.local_state;
     239                 :           0 :         self.incarnation = incarnation_;
     240                 :           0 :         self.status = MemberStatus::Alive;
     241                 :           0 :         self.last_seen = std::chrono::steady_clock::now();
     242                 :           0 :         members_[self.identity.endpoint] = std::move(self);
     243                 :           0 :     }
     244                 :             : 
     245                 :             :     // Bootstrap: join an existing cluster via a seed, or start a solo cluster.
     246                 :           0 :     if (!config_.seeds.empty()) {
     247                 :           0 :         auto& ext = extras_for(this);
     248                 :           0 :         ext.join_seed_index = 0;
     249                 :           0 :         send_join(config_.seeds[0]);
     250                 :             : 
     251                 :             :         // Recursive retry chain: try the next seed after 1 s if no SyncRsp
     252                 :             :         // arrived.
     253                 :           0 :         auto retry_fn = std::make_shared<std::function<void()>>();
     254                 :           0 :         *retry_fn = [this, retry_fn]() {
     255                 :           0 :             auto& ext_inner = extras_for(this);
     256                 :           0 :             ext_inner.join_seed_index++;
     257                 :           0 :             if (ext_inner.join_seed_index >= config_.seeds.size()) {
     258                 :           0 :                 return; // No more seeds to try
     259                 :             :             }
     260                 :             :             {
     261                 :           0 :                 std::shared_lock<std::shared_mutex> lock(members_mutex_);
     262                 :           0 :                 if (members_.size() > 1) {
     263                 :           0 :                     return; // Already received SyncRsp — joined successfully
     264                 :             :                 }
     265                 :           0 :             }
     266                 :           0 :             send_join(config_.seeds[ext_inner.join_seed_index]);
     267                 :           0 :             ext_inner.join_retry_timer = loop_->run_after(*retry_fn, 1000);
     268                 :           0 :         };
     269                 :           0 :         ext.join_retry_timer = loop_->run_after(*retry_fn, 1000);
     270                 :           0 :     }
     271                 :             : 
     272                 :             :     // Schedule the periodic protocol round.
     273                 :           0 :     protocol_timer_ =
     274                 :           0 :         loop_->run_every([this] { protocol_round(); },
     275                 :           0 :                          static_cast<int>(config_.protocol_period.count()));
     276                 :           0 : }
     277                 :             : 
     278                 :          25 : void GossipMembership::stop() {
     279                 :             :     // Cancel the periodic protocol timer.
     280                 :          25 :     if (protocol_timer_ != 0 && loop_) {
     281                 :           0 :         loop_->cancel_timer(protocol_timer_);
     282                 :           0 :         protocol_timer_ = 0;
     283                 :             :     }
     284                 :             : 
     285                 :             :     // Cancel the join retry timer if still active.
     286                 :          25 :     auto& ext = extras_for(this);
     287                 :          25 :     if (ext.join_retry_timer != 0 && loop_) {
     288                 :           0 :         loop_->cancel_timer(ext.join_retry_timer);
     289                 :           0 :         ext.join_retry_timer = 0;
     290                 :             :     }
     291                 :             : 
     292                 :             :     // Send Leave to all known Alive + Suspicious members (best-effort).
     293                 :             :     {
     294                 :          25 :         std::shared_lock<std::shared_mutex> lock(members_mutex_);
     295                 :          44 :         for (const auto& [ep, member] : members_) {
     296                 :          19 :             if (member.status == MemberStatus::Alive ||
     297                 :           1 :                 member.status == MemberStatus::Suspicious) {
     298                 :          18 :                 send_leave(ep);
     299                 :             :             }
     300                 :             :         }
     301                 :          25 :     }
     302                 :             : 
     303                 :             :     // Tear down the UDP socket.
     304                 :          25 :     teardown_udp_socket();
     305                 :             : 
     306                 :             :     // Clear local state.
     307                 :             :     {
     308                 :          25 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
     309                 :          25 :         members_.clear();
     310                 :          25 :         pending_pings_.clear();
     311                 :          25 :     }
     312                 :             : 
     313                 :          25 :     cleanup_extras(this);
     314                 :          25 : }
     315                 :             : 
     316                 :             : // =============================================================================
     317                 :             : // IServiceDiscovery — query
     318                 :             : // =============================================================================
     319                 :             : 
     320                 :           3 : std::vector<Member> GossipMembership::discover_all() const {
     321                 :           3 :     std::shared_lock<std::shared_mutex> lock(members_mutex_);
     322                 :           3 :     std::vector<Member> result;
     323                 :           3 :     result.reserve(members_.size());
     324                 :           5 :     for (const auto& [ep, member] : members_) {
     325                 :           2 :         result.push_back(member);
     326                 :             :     }
     327                 :           3 :     return result;
     328                 :           3 : }
     329                 :             : 
     330                 :           4 : const Member* GossipMembership::discover(EndPoint ep) const {
     331                 :           4 :     std::shared_lock<std::shared_mutex> lock(members_mutex_);
     332                 :           4 :     auto it = members_.find(ep);
     333                 :           4 :     if (it != members_.end()) {
     334                 :           2 :         return &it->second;
     335                 :             :     }
     336                 :           2 :     return nullptr;
     337                 :           4 : }
     338                 :             : 
     339                 :             : // =============================================================================
     340                 :             : // IServiceDiscovery — announce / callback
     341                 :             : // =============================================================================
     342                 :             : 
     343                 :           1 : void GossipMembership::announce(Member local_state) {
     344                 :           1 :     std::unique_lock<std::shared_mutex> lock(members_mutex_);
     345                 :             :     // Bump incarnation on re-announce
     346                 :           1 :     ++incarnation_;
     347                 :           1 :     local_state.incarnation = incarnation_;
     348                 :           1 :     local_state.last_seen = std::chrono::steady_clock::now();
     349                 :           1 :     config_.local_state = std::move(local_state);
     350                 :           1 :     needs_dissemination_ = true;
     351                 :           1 : }
     352                 :             : 
     353                 :           0 : void GossipMembership::on_member_change(MemberChangeCallback cb) {
     354                 :           0 :     member_change_cb_ = std::move(cb);
     355                 :           0 : }
     356                 :             : 
     357                 :             : // =============================================================================
     358                 :             : // Wire protocol — encode_message
     359                 :             : // =============================================================================
     360                 :             : //
     361                 :             : // Binary format:
     362                 :             : //   Magic (4B "HPGC") | Version (1B) | Type (1B) | Flags (2B) |
     363                 :             : //   Sender Endpoint (len-prefixed, var) |
     364                 :             : //   Incarnation (8B BE) | Sequence Number (4B BE) |
     365                 :             : //   [Ping Target Endpoint (len-prefixed, var) — PingReq only] |
     366                 :             : //   Piggyback Count (2B BE) | Piggyback entries...
     367                 :             : 
     368                 :             : StreamBuffer
     369                 :          20 : GossipMembership::encode_message(GossipMessageType type, uint64_t inc,
     370                 :             :                                  uint32_t seq, EndPoint ping_target,
     371                 :             :                                  const std::vector<PiggybackEntry>& pb) const {
     372                 :          20 :     PbGossipEnvelope env;
     373                 :          20 :     env.set_magic(GossipMagic);
     374                 :          20 :     env.set_version(GossipVersion);
     375                 :          20 :     env.set_type(static_cast<PbGossipMessageType>(type));
     376                 :          20 :     env.set_flags(0);
     377                 :             : 
     378                 :             :     // Build per-type payload
     379                 :          20 :     switch (type) {
     380                 :           2 :         case GossipMessageType::Ping: {
     381                 :           2 :             auto* ping = env.mutable_ping();
     382                 :           2 :             ep_to_pb_endpoint(ping->mutable_sender_endpoint(),
     383                 :           2 :                               config_.local_state.identity.endpoint);
     384                 :           2 :             ping->set_incarnation(inc);
     385                 :           2 :             ping->set_seq_no(seq);
     386                 :           4 :             for (const auto& e : pb)
     387                 :           2 :                 to_pb_piggyback(ping->add_piggyback(), e);
     388                 :           2 :             break;
     389                 :             :         }
     390                 :           0 :         case GossipMessageType::Ack: {
     391                 :           0 :             auto* ack = env.mutable_ack();
     392                 :           0 :             ep_to_pb_endpoint(ack->mutable_sender_endpoint(),
     393                 :           0 :                               config_.local_state.identity.endpoint);
     394                 :           0 :             ack->set_incarnation(inc);
     395                 :           0 :             for (const auto& e : pb)
     396                 :           0 :                 to_pb_piggyback(ack->add_piggyback(), e);
     397                 :           0 :             break;
     398                 :             :         }
     399                 :           0 :         case GossipMessageType::PingReq: {
     400                 :           0 :             auto* pr = env.mutable_ping_req();
     401                 :           0 :             ep_to_pb_endpoint(pr->mutable_sender_endpoint(),
     402                 :           0 :                               config_.local_state.identity.endpoint);
     403                 :           0 :             ep_to_pb_endpoint(pr->mutable_target_endpoint(), ping_target);
     404                 :           0 :             pr->set_incarnation(inc);
     405                 :           0 :             break;
     406                 :             :         }
     407                 :           0 :         case GossipMessageType::IndirectAck: {
     408                 :           0 :             auto* ia = env.mutable_indirect_ack();
     409                 :           0 :             ep_to_pb_endpoint(ia->mutable_sender_endpoint(),
     410                 :           0 :                               config_.local_state.identity.endpoint);
     411                 :           0 :             ep_to_pb_endpoint(ia->mutable_original_requester(), ping_target);
     412                 :           0 :             ia->set_incarnation(inc);
     413                 :           0 :             break;
     414                 :             :         }
     415                 :           0 :         case GossipMessageType::Join: {
     416                 :           0 :             auto* join = env.mutable_join();
     417                 :           0 :             ep_to_pb_endpoint(join->mutable_sender_endpoint(),
     418                 :           0 :                               config_.local_state.identity.endpoint);
     419                 :           0 :             join->set_incarnation(inc);
     420                 :           0 :             join->set_host(config_.local_state.identity.host);
     421                 :           0 :             join->set_uds_path(config_.local_state.identity.uds_path);
     422                 :           0 :             for (const auto& s : config_.local_state.actor_types)
     423                 :           0 :                 join->add_actor_types(s);
     424                 :           0 :             for (const auto& acc : config_.local_state.identity.acceptors) {
     425                 :           0 :                 auto* a = join->add_acceptors();
     426                 :           0 :                 a->set_port(acc.port);
     427                 :           0 :                 a->set_handshake_version(acc.handshake_version);
     428                 :           0 :                 a->set_protocol_version(acc.protocol_version);
     429                 :           0 :                 a->set_tls_required(acc.tls_required);
     430                 :             :             }
     431                 :           0 :             break;
     432                 :             :         }
     433                 :          18 :         case GossipMessageType::Leave: {
     434                 :          18 :             auto* leave = env.mutable_leave();
     435                 :          18 :             ep_to_pb_endpoint(leave->mutable_sender_endpoint(),
     436                 :          18 :                               config_.local_state.identity.endpoint);
     437                 :          18 :             leave->set_incarnation(inc);
     438                 :          18 :             break;
     439                 :             :         }
     440                 :           0 :         default:
     441                 :           0 :             break;
     442                 :             :     }
     443                 :             : 
     444                 :          40 :     std::string serialized = env.SerializeAsString();
     445                 :          20 :     return StreamBuffer(serialized.begin(), serialized.end());
     446                 :          20 : }
     447                 :             : 
     448                 :             : // =============================================================================
     449                 :             : // Wire protocol — decode_message
     450                 :             : // =============================================================================
     451                 :             : 
     452                 :           2 : bool GossipMembership::decode_message(const StreamBuffer& data,
     453                 :             :                                       GossipMessageType& type, EndPoint& sender,
     454                 :             :                                       uint64_t& inc, uint32_t& seq,
     455                 :             :                                       EndPoint& ping_target,
     456                 :             :                                       std::vector<PiggybackEntry>& pb) const {
     457                 :           2 :     PbGossipEnvelope env;
     458                 :           2 :     if (!env.ParseFromArray(data.data(), static_cast<int>(data.size())))
     459                 :           0 :         return false;
     460                 :           2 :     if (env.magic() != GossipMagic || env.version() != GossipVersion)
     461                 :           0 :         return false;
     462                 :             : 
     463                 :           2 :     type = static_cast<GossipMessageType>(env.type());
     464                 :           2 :     pb.clear();
     465                 :             : 
     466                 :           2 :     switch (type) {
     467                 :           2 :         case GossipMessageType::Ping:
     468                 :           2 :             if (!env.has_ping())
     469                 :           0 :                 return false;
     470                 :           2 :             sender = pb_endpoint_to_ep(env.ping().sender_endpoint());
     471                 :           2 :             inc = env.ping().incarnation();
     472                 :           2 :             seq = env.ping().seq_no();
     473                 :           4 :             for (const auto& e : env.ping().piggyback())
     474                 :           2 :                 pb.push_back(from_pb_piggyback(e));
     475                 :           2 :             break;
     476                 :           0 :         case GossipMessageType::Ack:
     477                 :           0 :             if (!env.has_ack())
     478                 :           0 :                 return false;
     479                 :           0 :             sender = pb_endpoint_to_ep(env.ack().sender_endpoint());
     480                 :           0 :             inc = env.ack().incarnation();
     481                 :           0 :             seq = 0;
     482                 :           0 :             for (const auto& e : env.ack().piggyback())
     483                 :           0 :                 pb.push_back(from_pb_piggyback(e));
     484                 :           0 :             break;
     485                 :           0 :         case GossipMessageType::PingReq:
     486                 :           0 :             if (!env.has_ping_req())
     487                 :           0 :                 return false;
     488                 :           0 :             sender = pb_endpoint_to_ep(env.ping_req().sender_endpoint());
     489                 :           0 :             ping_target = pb_endpoint_to_ep(env.ping_req().target_endpoint());
     490                 :           0 :             inc = env.ping_req().incarnation();
     491                 :           0 :             seq = 0;
     492                 :           0 :             break;
     493                 :           0 :         case GossipMessageType::IndirectAck:
     494                 :           0 :             if (!env.has_indirect_ack())
     495                 :           0 :                 return false;
     496                 :           0 :             sender = pb_endpoint_to_ep(env.indirect_ack().sender_endpoint());
     497                 :           0 :             ping_target =
     498                 :           0 :                 pb_endpoint_to_ep(env.indirect_ack().original_requester());
     499                 :           0 :             inc = env.indirect_ack().incarnation();
     500                 :           0 :             seq = 0;
     501                 :           0 :             break;
     502                 :           0 :         case GossipMessageType::Join:
     503                 :           0 :             if (!env.has_join())
     504                 :           0 :                 return false;
     505                 :           0 :             sender = pb_endpoint_to_ep(env.join().sender_endpoint());
     506                 :           0 :             inc = env.join().incarnation();
     507                 :           0 :             seq = 0;
     508                 :           0 :             break;
     509                 :           0 :         case GossipMessageType::SyncRsp:
     510                 :             :             // SyncRsp is handled by decode_sync_rsp(), not this path
     511                 :           0 :             return false;
     512                 :           0 :         case GossipMessageType::Leave:
     513                 :           0 :             if (!env.has_leave())
     514                 :           0 :                 return false;
     515                 :           0 :             sender = pb_endpoint_to_ep(env.leave().sender_endpoint());
     516                 :           0 :             inc = env.leave().incarnation();
     517                 :           0 :             seq = 0;
     518                 :           0 :             break;
     519                 :             :     }
     520                 :             : 
     521                 :           2 :     return true;
     522                 :           2 : }
     523                 :             : 
     524                 :             : // =============================================================================
     525                 :             : // Wire protocol — encode_sync_rsp (protobuf)
     526                 :             : // =============================================================================
     527                 :             : 
     528                 :             : StreamBuffer
     529                 :           0 : GossipMembership::encode_sync_rsp(const std::vector<Member>& members) const {
     530                 :           0 :     PbGossipSyncRsp rsp;
     531                 :           0 :     ep_to_pb_endpoint(rsp.mutable_sender_endpoint(),
     532                 :           0 :                       config_.local_state.identity.endpoint);
     533                 :           0 :     rsp.set_incarnation(incarnation_);
     534                 :           0 :     for (const auto& m : members) {
     535                 :           0 :         to_pb_member(rsp.add_members(), m);
     536                 :             :     }
     537                 :           0 :     std::string serialized = rsp.SerializeAsString();
     538                 :           0 :     return StreamBuffer(serialized.begin(), serialized.end());
     539                 :           0 : }
     540                 :             : 
     541                 :             : // =============================================================================
     542                 :             : // Wire protocol — decode_sync_rsp (protobuf)
     543                 :             : // =============================================================================
     544                 :             : 
     545                 :           0 : bool GossipMembership::decode_sync_rsp(const StreamBuffer& data,
     546                 :             :                                        std::vector<Member>& members) const {
     547                 :           0 :     PbGossipSyncRsp rsp;
     548                 :           0 :     if (!rsp.ParseFromArray(data.data(), static_cast<int>(data.size())))
     549                 :           0 :         return false;
     550                 :             : 
     551                 :           0 :     members.clear();
     552                 :           0 :     members.reserve(static_cast<size_t>(rsp.members_size()));
     553                 :           0 :     for (const auto& pb_m : rsp.members()) {
     554                 :           0 :         Member m = from_pb_member(pb_m);
     555                 :           0 :         m.last_seen = std::chrono::steady_clock::now();
     556                 :           0 :         members.push_back(std::move(m));
     557                 :           0 :     }
     558                 :           0 :     return true;
     559                 :           0 : }
     560                 :             : 
     561                 :             : // =============================================================================
     562                 :             : // Internal piggyback builder (called from member functions)
     563                 :             : // =============================================================================
     564                 :             : 
     565                 :             : // This helper is used by protocol_round(), handle_ping(), and send methods to
     566                 :             : // assemble outgoing piggyback entries.  It reads members_, config_, and
     567                 :             : // needs_dissemination_ directly.  The caller is responsible for clearing
     568                 :             : // needs_dissemination_ after the piggyback is serialised.
     569                 :             : 
     570                 :             : static std::vector<PiggybackEntry>
     571                 :           0 : build_piggyback_impl(const GossipConfig& config, uint64_t incarnation,
     572                 :             :                      bool& needs_dissemination,
     573                 :             :                      const std::unordered_map<EndPoint, Member>& members) {
     574                 :           0 :     std::vector<PiggybackEntry> entries;
     575                 :             : 
     576                 :             :     // Self metadata — disseminate when announce() has been called.
     577                 :           0 :     if (needs_dissemination) {
     578                 :           0 :         PiggybackEntry meta;
     579                 :           0 :         meta.type = PiggybackType::Metadata;
     580                 :           0 :         meta.identity.endpoint = config.local_state.identity.endpoint;
     581                 :           0 :         meta.incarnation = incarnation;
     582                 :           0 :         meta.actor_types = config.local_state.actor_types;
     583                 :           0 :         meta.load = 0;
     584                 :           0 :         meta.identity.acceptors = config.local_state.identity.acceptors;
     585                 :           0 :         entries.push_back(std::move(meta));
     586                 :           0 :     }
     587                 :             : 
     588                 :             :     // Piggyback all Suspicious and Dead members so the cluster converges
     589                 :             :     // quickly.
     590                 :           0 :     for (const auto& [ep, m] : members) {
     591                 :           0 :         if (m.status == MemberStatus::Suspicious) {
     592                 :           0 :             PiggybackEntry e;
     593                 :           0 :             e.type = PiggybackType::Suspicious;
     594                 :           0 :             e.identity.endpoint = ep;
     595                 :           0 :             e.incarnation = m.incarnation;
     596                 :           0 :             entries.push_back(std::move(e));
     597                 :           0 :         } else if (m.status == MemberStatus::Dead) {
     598                 :           0 :             PiggybackEntry e;
     599                 :           0 :             e.type = PiggybackType::Dead;
     600                 :           0 :             e.identity.endpoint = ep;
     601                 :           0 :             e.incarnation = m.incarnation;
     602                 :           0 :             entries.push_back(std::move(e));
     603                 :           0 :         }
     604                 :             :     }
     605                 :             : 
     606                 :           0 :     return entries;
     607                 :             : }
     608                 :             : 
     609                 :             : // =============================================================================
     610                 :             : // Protocol round
     611                 :             : // =============================================================================
     612                 :             : 
     613                 :           0 : void GossipMembership::protocol_round() {
     614                 :           0 :     auto now = std::chrono::steady_clock::now();
     615                 :             : 
     616                 :             :     // ── 1. Pick peers and send Pings ──────────────────────────────────────
     617                 :             :     {
     618                 :           0 :         std::shared_lock<std::shared_mutex> lock(members_mutex_);
     619                 :             : 
     620                 :             :         // Collect Alive members excluding self.
     621                 :           0 :         std::vector<EndPoint> alive;
     622                 :           0 :         for (const auto& [ep, m] : members_) {
     623                 :           0 :             if (m.status == MemberStatus::Alive &&
     624                 :           0 :                 !(ep == config_.local_state.identity.endpoint)) {
     625                 :           0 :                 alive.push_back(ep);
     626                 :             :             }
     627                 :             :         }
     628                 :             : 
     629                 :           0 :         if (!alive.empty()) {
     630                 :           0 :             auto targets = pick_random_peers(config_.fanout, {});
     631                 :             :             // pick_random_peers already handles the alive filtering; but it
     632                 :             :             // uses the shared_lock we already hold.  To avoid double-locking
     633                 :             :             // we build the targets list above and pass it directly.
     634                 :             : 
     635                 :             :             // Build piggyback entries for outgoing Pings.
     636                 :           0 :             auto pb = build_piggyback_impl(config_, incarnation_,
     637                 :           0 :                                            needs_dissemination_, members_);
     638                 :             :             // Lock must be held while reading members_ for piggyback.
     639                 :             : 
     640                 :           0 :             for (const auto& target : targets) {
     641                 :             :                 StreamBuffer msg =
     642                 :           0 :                     encode_message(GossipMessageType::Ping, incarnation_, seq_no_++,
     643                 :           0 :                                    config_.local_state.identity.endpoint, pb);
     644                 :           0 :                 async_udp_send(loop_, udp_socket_, msg, target);
     645                 :             : 
     646                 :             :                 // Record pending ping.
     647                 :             :                 // pending_pings_ is protected by members_mutex_ in our design.
     648                 :           0 :                 PendingPing pp;
     649                 :           0 :                 pp.expires_at = now + config_.ping_timeout;
     650                 :           0 :                 pp.indirect_requested = false;
     651                 :           0 :                 pending_pings_[target] = pp;
     652                 :           0 :             }
     653                 :             : 
     654                 :             :             // Clear dissemination flag now that we've piggybacked our metadata.
     655                 :           0 :             if (needs_dissemination_ && !pb.empty()) {
     656                 :           0 :                 needs_dissemination_ = false;
     657                 :             :             }
     658                 :           0 :         }
     659                 :           0 :     } // shared_lock released
     660                 :             : 
     661                 :             :     // ── 2. Check pending pings ─────────────────────────────────────────────
     662                 :             :     // Re-acquire unique lock because we may mutate pending_pings_ and members_.
     663                 :             :     {
     664                 :           0 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
     665                 :             : 
     666                 :           0 :         std::vector<EndPoint> expired_pings;
     667                 :           0 :         for (auto& [ep, pp] : pending_pings_) {
     668                 :           0 :             if (now >= pp.expires_at) {
     669                 :           0 :                 expired_pings.push_back(ep);
     670                 :             :             }
     671                 :             :         }
     672                 :             : 
     673                 :           0 :         for (const auto& target : expired_pings) {
     674                 :           0 :             auto it = pending_pings_.find(target);
     675                 :           0 :             if (it == pending_pings_.end())
     676                 :           0 :                 continue;
     677                 :           0 :             auto& pp = it->second;
     678                 :             : 
     679                 :           0 :             if (!pp.indirect_requested) {
     680                 :             :                 // First expiry — request indirect probes.
     681                 :           0 :                 pp.indirect_requested = true;
     682                 :           0 :                 pp.indirect_expires_at = now + config_.ping_timeout;
     683                 :             : 
     684                 :             :                 // Pick indirect probe peers: Alive, not self, not the target.
     685                 :             :                 std::unordered_set<EndPoint> exclude{
     686                 :           0 :                     target, config_.local_state.identity.endpoint};
     687                 :           0 :                 auto probes = pick_random_peers(config_.indirect_probes, exclude);
     688                 :             : 
     689                 :           0 :                 if (probes.empty()) {
     690                 :             :                     // 2-node cluster or no other peers available — skip
     691                 :             :                     // indirect probes and immediately mark suspicious.
     692                 :           0 :                     mark_suspicious(target);
     693                 :           0 :                     pending_pings_.erase(target);
     694                 :             :                 } else {
     695                 :           0 :                     for (const auto& proxy : probes) {
     696                 :           0 :                         send_ping_req(proxy, target);
     697                 :             :                     }
     698                 :             :                 }
     699                 :           0 :             } else {
     700                 :             :                 // Second expiry — indirect probes did not succeed.
     701                 :           0 :                 mark_suspicious(target);
     702                 :           0 :                 pending_pings_.erase(it);
     703                 :             :             }
     704                 :             :         }
     705                 :           0 :     }
     706                 :             : 
     707                 :             :     // ── 3. Check suspicious members for timeout ────────────────────────────
     708                 :             :     {
     709                 :           0 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
     710                 :           0 :         std::vector<EndPoint> to_mark_dead;
     711                 :           0 :         for (auto& [ep, m] : members_) {
     712                 :           0 :             if (m.status == MemberStatus::Suspicious) {
     713                 :           0 :                 if (now - m.last_seen > config_.suspicion_timeout) {
     714                 :           0 :                     to_mark_dead.push_back(ep);
     715                 :             :                 }
     716                 :             :             }
     717                 :             :         }
     718                 :           0 :         for (const auto& ep : to_mark_dead) {
     719                 :           0 :             mark_dead(ep);
     720                 :             :         }
     721                 :           0 :     }
     722                 :             : 
     723                 :             :     // ── 4. Purge old tombstones ────────────────────────────────────────────
     724                 :             :     {
     725                 :           0 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
     726                 :           0 :         purge_dead_tombstones();
     727                 :           0 :     }
     728                 :           0 : }
     729                 :             : 
     730                 :             : // =============================================================================
     731                 :             : // Packet handler — non-blocking recvfrom loop
     732                 :             : // =============================================================================
     733                 :             : 
     734                 :           0 : void GossipMembership::handle_packet(const StreamBuffer& data,
     735                 :             :                                      const std::string& from_host,
     736                 :             :                                      uint16_t from_port) {
     737                 :             :     GossipMessageType type;
     738                 :           0 :     EndPoint sender;
     739                 :             :     uint64_t inc;
     740                 :             :     uint32_t seq;
     741                 :           0 :     EndPoint ping_target;
     742                 :           0 :     std::vector<PiggybackEntry> pb;
     743                 :             : 
     744                 :           0 :     if (!decode_message(data, type, sender, inc, seq, ping_target, pb)) {
     745                 :           0 :         return; // Malformed or unknown message
     746                 :             :     }
     747                 :             : 
     748                 :           0 :     switch (type) {
     749                 :           0 :         case GossipMessageType::Ping:
     750                 :           0 :             handle_ping(sender, inc, seq, std::move(pb), from_host, from_port);
     751                 :           0 :             break;
     752                 :           0 :         case GossipMessageType::Ack:
     753                 :           0 :             handle_ack(sender, inc, std::move(pb));
     754                 :           0 :             break;
     755                 :           0 :         case GossipMessageType::PingReq:
     756                 :           0 :             handle_ping_req(sender, ping_target);
     757                 :           0 :             break;
     758                 :           0 :         case GossipMessageType::IndirectAck:
     759                 :           0 :             handle_indirect_ack(sender, ping_target);
     760                 :           0 :             break;
     761                 :           0 :         case GossipMessageType::Join:
     762                 :           0 :             handle_join(sender, inc, std::move(pb));
     763                 :           0 :             break;
     764                 :           0 :         case GossipMessageType::SyncRsp: {
     765                 :             :             // SyncRsp uses a different wire format (decode_sync_rsp, not
     766                 :             :             // decode_message). The data after the standard header contains the
     767                 :             :             // SyncRsp payload. However, handle_sync_rsp expects a
     768                 :             :             // vector<Member>, so we delegate to the sender to re-encode.
     769                 :             :             // Actually the SyncRsp is carried in a regular message envelope —
     770                 :             :             // decode_message parsed the header, and the piggyback entries are
     771                 :             :             // not used.  We extract the member list from the raw data by
     772                 :             :             // skipping the standard header.
     773                 :             :             //
     774                 :             :             // For SyncRsp, the sender includes the full table after the
     775                 :             :             // piggyback count.  We re-parse from the raw data, but
     776                 :             :             // decode_message already consumed everything.  Fortunately, SyncRsp
     777                 :             :             // uses a dedicated encode_sync_rsp / decode_sync_rsp code path. The
     778                 :             :             // incoming data is the *entire* SyncRsp payload (not the standard
     779                 :             :             // message envelope). The handle_packet caller (UDP read handler)
     780                 :             :             // passes the raw data, so for SyncRsp we decode differently.
     781                 :             : 
     782                 :             :             // Actually, looking at the wire design: Join is a standard message;
     783                 :             :             // the seed responds with SyncRsp which is a SEPARATE wire format
     784                 :             :             // (not through encode_message).  But it arrives on the same UDP
     785                 :             :             // socket. We need a heuristic to distinguish standard messages from
     786                 :             :             // SyncRsp.
     787                 :             : 
     788                 :             :             // Try SyncRsp format first (no magic prefix, starts with 4B count):
     789                 :           0 :             std::vector<Member> members;
     790                 :           0 :             if (decode_sync_rsp(data, members)) {
     791                 :           0 :                 handle_sync_rsp(std::move(members));
     792                 :             :             }
     793                 :           0 :             break;
     794                 :           0 :         }
     795                 :           0 :         case GossipMessageType::Leave:
     796                 :           0 :             handle_leave(sender, inc);
     797                 :           0 :             break;
     798                 :             :     }
     799                 :           0 : }
     800                 :             : 
     801                 :             : // =============================================================================
     802                 :             : // Message handlers
     803                 :             : // =============================================================================
     804                 :             : 
     805                 :           0 : void GossipMembership::handle_ping(EndPoint sender, uint64_t inc, uint32_t /*seq*/,
     806                 :             :                                    std::vector<PiggybackEntry> pb,
     807                 :             :                                    const std::string& /*host*/, uint16_t /*port*/) {
     808                 :             :     // Merge the sender — a Ping proves they are alive.
     809                 :             :     {
     810                 :           0 :         Member m;
     811                 :           0 :         m.identity.endpoint = sender;
     812                 :           0 :         m.incarnation = inc;
     813                 :           0 :         m.status = MemberStatus::Alive;
     814                 :           0 :         m.last_seen = std::chrono::steady_clock::now();
     815                 :           0 :         merge_member(m);
     816                 :           0 :     }
     817                 :             : 
     818                 :             :     // Apply piggyback entries from the ping.
     819                 :           0 :     apply_piggyback(pb);
     820                 :             : 
     821                 :             :     // If we had a pending ping for the sender, clear it — they just proved
     822                 :             :     // they are alive by pinging us.
     823                 :             :     {
     824                 :           0 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
     825                 :           0 :         pending_pings_.erase(sender);
     826                 :           0 :     }
     827                 :             : 
     828                 :             :     // Build piggyback for the Ack response.
     829                 :           0 :     std::vector<PiggybackEntry> ack_pb;
     830                 :             :     {
     831                 :           0 :         std::shared_lock<std::shared_mutex> lock(members_mutex_);
     832                 :           0 :         ack_pb = build_piggyback_impl(config_, incarnation_,
     833                 :           0 :                                       needs_dissemination_, members_);
     834                 :           0 :     }
     835                 :           0 :     if (!ack_pb.empty()) {
     836                 :           0 :         needs_dissemination_ = false;
     837                 :             :     }
     838                 :             : 
     839                 :             :     // Send Ack with piggyback.
     840                 :             :     StreamBuffer ack_msg =
     841                 :           0 :         encode_message(GossipMessageType::Ack, incarnation_, seq_no_++,
     842                 :           0 :                        config_.local_state.identity.endpoint, ack_pb);
     843                 :           0 :     async_udp_send(loop_, udp_socket_, ack_msg, sender);
     844                 :           0 : }
     845                 :             : 
     846                 :           0 : void GossipMembership::handle_ack(EndPoint sender, uint64_t inc,
     847                 :             :                                   std::vector<PiggybackEntry> pb) {
     848                 :             :     // Merge the sender — an Ack proves they are alive.
     849                 :             :     {
     850                 :           0 :         Member m;
     851                 :           0 :         m.identity.endpoint = sender;
     852                 :           0 :         m.incarnation = inc;
     853                 :           0 :         m.status = MemberStatus::Alive;
     854                 :           0 :         m.last_seen = std::chrono::steady_clock::now();
     855                 :           0 :         merge_member(m);
     856                 :           0 :     }
     857                 :             : 
     858                 :             :     // Check if this ack is for a forwarded PingReq.
     859                 :           0 :     auto& ext = extras_for(this);
     860                 :           0 :     auto fwd_it = ext.forwarded_pings.find(sender);
     861                 :           0 :     if (fwd_it != ext.forwarded_pings.end()) {
     862                 :             :         // We forwarded a Ping on behalf of fwd_it->second.
     863                 :             :         // The target (sender) responded, so send IndirectAck to the original
     864                 :             :         // requester.
     865                 :           0 :         send_indirect_ack(fwd_it->second, sender);
     866                 :           0 :         ext.forwarded_pings.erase(fwd_it);
     867                 :             :     }
     868                 :             : 
     869                 :             :     // Clear any pending ping for this sender — the probe succeeded.
     870                 :             :     {
     871                 :           0 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
     872                 :           0 :         pending_pings_.erase(sender);
     873                 :           0 :     }
     874                 :             : 
     875                 :             :     // Apply piggyback entries.
     876                 :           0 :     apply_piggyback(pb);
     877                 :           0 : }
     878                 :             : 
     879                 :           0 : void GossipMembership::handle_ping_req(EndPoint sender, EndPoint target) {
     880                 :             :     // Merge the requester.
     881                 :             :     {
     882                 :           0 :         Member m;
     883                 :           0 :         m.identity.endpoint = sender;
     884                 :           0 :         m.status = MemberStatus::Alive;
     885                 :           0 :         m.last_seen = std::chrono::steady_clock::now();
     886                 :           0 :         merge_member(m);
     887                 :           0 :     }
     888                 :             : 
     889                 :             :     // Forward the ping on behalf of the requester.
     890                 :           0 :     auto& ext = extras_for(this);
     891                 :           0 :     ext.forwarded_pings[target] = sender;
     892                 :             : 
     893                 :             :     // Build piggyback and send Ping to target.
     894                 :           0 :     std::vector<PiggybackEntry> pb;
     895                 :             :     {
     896                 :           0 :         std::shared_lock<std::shared_mutex> lock(members_mutex_);
     897                 :           0 :         pb = build_piggyback_impl(config_, incarnation_, needs_dissemination_,
     898                 :           0 :                                   members_);
     899                 :           0 :     }
     900                 :             : 
     901                 :             :     StreamBuffer msg =
     902                 :           0 :         encode_message(GossipMessageType::Ping, incarnation_, seq_no_++,
     903                 :           0 :                        config_.local_state.identity.endpoint, pb);
     904                 :           0 :     async_udp_send(loop_, udp_socket_, msg, target);
     905                 :           0 : }
     906                 :             : 
     907                 :           0 : void GossipMembership::handle_indirect_ack(EndPoint sender, EndPoint target) {
     908                 :             :     // Merge the indirect ack sender.
     909                 :             :     {
     910                 :           0 :         Member m;
     911                 :           0 :         m.identity.endpoint = sender;
     912                 :           0 :         m.status = MemberStatus::Alive;
     913                 :           0 :         m.last_seen = std::chrono::steady_clock::now();
     914                 :           0 :         merge_member(m);
     915                 :           0 :     }
     916                 :             : 
     917                 :             :     // If we have a pending ping for target with indirect_requested=true,
     918                 :             :     // the indirect probe succeeded — clear the pending ping.
     919                 :             :     {
     920                 :           0 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
     921                 :           0 :         auto it = pending_pings_.find(target);
     922                 :           0 :         if (it != pending_pings_.end() && it->second.indirect_requested) {
     923                 :           0 :             pending_pings_.erase(it);
     924                 :             :         }
     925                 :           0 :     }
     926                 :           0 : }
     927                 :             : 
     928                 :           0 : void GossipMembership::handle_join(EndPoint sender, uint64_t inc,
     929                 :             :                                    std::vector<PiggybackEntry> pb) {
     930                 :             :     // Merge the joining node.
     931                 :             :     {
     932                 :           0 :         Member m;
     933                 :           0 :         m.identity.endpoint = sender;
     934                 :           0 :         m.incarnation = inc;
     935                 :           0 :         m.status = MemberStatus::Alive;
     936                 :           0 :         m.last_seen = std::chrono::steady_clock::now();
     937                 :           0 :         merge_member(m);
     938                 :           0 :         HPACTOR_LOG_INFO(log::LogCategory::kDiscovery, ActorId{0},
     939                 :             :                          static_cast<uint32_t>(log::LogEventId::kDiscoveryNodeJoined),
     940                 :             :                          "discovery node joined");
     941                 :           0 :     }
     942                 :             : 
     943                 :           0 :     apply_piggyback(pb);
     944                 :             : 
     945                 :             :     // Send full membership table (excluding Dead and Left) as SyncRsp.
     946                 :           0 :     send_sync_rsp(sender);
     947                 :           0 : }
     948                 :             : 
     949                 :           0 : void GossipMembership::handle_sync_rsp(std::vector<Member> members) {
     950                 :             :     // Merge all received members into our table.
     951                 :           0 :     for (auto& m : members) {
     952                 :           0 :         merge_member(m);
     953                 :             :     }
     954                 :             : 
     955                 :             :     // Cancel any pending join retry — we have successfully joined.
     956                 :           0 :     auto& ext = extras_for(this);
     957                 :           0 :     if (ext.join_retry_timer != 0 && loop_) {
     958                 :           0 :         loop_->cancel_timer(ext.join_retry_timer);
     959                 :           0 :         ext.join_retry_timer = 0;
     960                 :             :     }
     961                 :             : 
     962                 :             :     // Ensure self is still in the table (should always be, but be defensive).
     963                 :             :     {
     964                 :           0 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
     965                 :           0 :         if (members_.find(config_.local_state.identity.endpoint) == members_.end()) {
     966                 :           0 :             Member self = config_.local_state;
     967                 :           0 :             self.incarnation = incarnation_;
     968                 :           0 :             self.status = MemberStatus::Alive;
     969                 :           0 :             self.last_seen = std::chrono::steady_clock::now();
     970                 :           0 :             members_[self.identity.endpoint] = std::move(self);
     971                 :           0 :         }
     972                 :           0 :     }
     973                 :           0 : }
     974                 :             : 
     975                 :           0 : void GossipMembership::handle_leave(EndPoint sender, uint64_t inc) {
     976                 :           0 :     Member member_to_fire;
     977                 :           0 :     bool should_fire = false;
     978                 :             : 
     979                 :             :     {
     980                 :           0 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
     981                 :           0 :         auto it = members_.find(sender);
     982                 :           0 :         if (it != members_.end()) {
     983                 :             :             // Only accept if incarnation is >= what we have.
     984                 :           0 :             if (inc >= it->second.incarnation) {
     985                 :           0 :                 it->second.status = MemberStatus::Left;
     986                 :           0 :                 it->second.last_seen = std::chrono::steady_clock::now();
     987                 :           0 :                 if (member_change_cb_) {
     988                 :           0 :                     member_to_fire = it->second;
     989                 :           0 :                     should_fire = true;
     990                 :             :                 }
     991                 :             :             }
     992                 :             :         }
     993                 :           0 :     }
     994                 :             : 
     995                 :           0 :     if (should_fire) {
     996                 :           0 :         member_change_cb_(member_to_fire, false);
     997                 :             :     }
     998                 :           0 : }
     999                 :             : 
    1000                 :             : // =============================================================================
    1001                 :             : // Message sending
    1002                 :             : // =============================================================================
    1003                 :             : 
    1004                 :           0 : void GossipMembership::send_ping(EndPoint target) {
    1005                 :           0 :     std::vector<PiggybackEntry> pb;
    1006                 :             :     {
    1007                 :           0 :         std::shared_lock<std::shared_mutex> lock(members_mutex_);
    1008                 :           0 :         pb = build_piggyback_impl(config_, incarnation_, needs_dissemination_,
    1009                 :           0 :                                   members_);
    1010                 :           0 :     }
    1011                 :             :     StreamBuffer msg =
    1012                 :           0 :         encode_message(GossipMessageType::Ping, incarnation_, seq_no_++,
    1013                 :           0 :                        config_.local_state.identity.endpoint, pb);
    1014                 :           0 :     async_udp_send(loop_, udp_socket_, msg, target);
    1015                 :           0 : }
    1016                 :             : 
    1017                 :           0 : void GossipMembership::send_ack(EndPoint target, std::vector<PiggybackEntry> pb) {
    1018                 :             :     StreamBuffer msg =
    1019                 :           0 :         encode_message(GossipMessageType::Ack, incarnation_, seq_no_++,
    1020                 :           0 :                        config_.local_state.identity.endpoint, pb);
    1021                 :           0 :     async_udp_send(loop_, udp_socket_, msg, target);
    1022                 :           0 : }
    1023                 :             : 
    1024                 :           0 : void GossipMembership::send_ping_req(EndPoint proxy, EndPoint target) {
    1025                 :             :     // Build minimal piggyback for PingReq.
    1026                 :           0 :     std::vector<PiggybackEntry> pb;
    1027                 :             :     {
    1028                 :           0 :         std::shared_lock<std::shared_mutex> lock(members_mutex_);
    1029                 :           0 :         pb = build_piggyback_impl(config_, incarnation_, needs_dissemination_,
    1030                 :           0 :                                   members_);
    1031                 :           0 :     }
    1032                 :             :     StreamBuffer msg = encode_message(GossipMessageType::PingReq, incarnation_,
    1033                 :           0 :                                       seq_no_++, target, pb);
    1034                 :           0 :     async_udp_send(loop_, udp_socket_, msg, proxy);
    1035                 :           0 : }
    1036                 :             : 
    1037                 :           0 : void GossipMembership::send_indirect_ack(EndPoint target, EndPoint orig_target) {
    1038                 :             :     // IndirectAck carries the original target in the ping_target field.
    1039                 :           0 :     std::vector<PiggybackEntry> pb; // empty piggyback for indirect ack
    1040                 :             :     StreamBuffer msg = encode_message(GossipMessageType::IndirectAck,
    1041                 :           0 :                                       incarnation_, seq_no_++, orig_target, pb);
    1042                 :           0 :     async_udp_send(loop_, udp_socket_, msg, target);
    1043                 :           0 : }
    1044                 :             : 
    1045                 :           0 : void GossipMembership::send_join(EndPoint seed) {
    1046                 :             :     // Join includes self metadata as piggyback.
    1047                 :           0 :     std::vector<PiggybackEntry> pb;
    1048                 :             :     {
    1049                 :           0 :         PiggybackEntry meta;
    1050                 :           0 :         meta.type = PiggybackType::Metadata;
    1051                 :           0 :         meta.identity.endpoint = config_.local_state.identity.endpoint;
    1052                 :           0 :         meta.incarnation = incarnation_;
    1053                 :           0 :         meta.actor_types = config_.local_state.actor_types;
    1054                 :           0 :         meta.identity.acceptors = config_.local_state.identity.acceptors;
    1055                 :           0 :         pb.push_back(std::move(meta));
    1056                 :           0 :     }
    1057                 :             :     StreamBuffer msg =
    1058                 :           0 :         encode_message(GossipMessageType::Join, incarnation_, seq_no_++,
    1059                 :           0 :                        config_.local_state.identity.endpoint, pb);
    1060                 :           0 :     async_udp_send(loop_, udp_socket_, msg, seed);
    1061                 :           0 : }
    1062                 :             : 
    1063                 :           0 : void GossipMembership::send_sync_rsp(EndPoint target) {
    1064                 :             :     // Collect non-Dead, non-Left members for the sync response.
    1065                 :           0 :     std::vector<Member> table;
    1066                 :             :     {
    1067                 :           0 :         std::shared_lock<std::shared_mutex> lock(members_mutex_);
    1068                 :           0 :         for (const auto& [ep, m] : members_) {
    1069                 :           0 :             if (m.status != MemberStatus::Dead && m.status != MemberStatus::Left) {
    1070                 :           0 :                 table.push_back(m);
    1071                 :             :             }
    1072                 :             :         }
    1073                 :           0 :     }
    1074                 :           0 :     StreamBuffer data = encode_sync_rsp(table);
    1075                 :           0 :     async_udp_send(loop_, udp_socket_, data, target);
    1076                 :           0 : }
    1077                 :             : 
    1078                 :          18 : void GossipMembership::send_leave(EndPoint target) {
    1079                 :          18 :     std::vector<PiggybackEntry> pb; // empty piggyback for leave
    1080                 :             :     StreamBuffer msg =
    1081                 :          18 :         encode_message(GossipMessageType::Leave, incarnation_, seq_no_++,
    1082                 :          18 :                        config_.local_state.identity.endpoint, pb);
    1083                 :          18 :     async_udp_send(loop_, udp_socket_, msg, target);
    1084                 :          18 : }
    1085                 :             : 
    1086                 :             : // =============================================================================
    1087                 :             : // State mutation
    1088                 :             : // =============================================================================
    1089                 :             : 
    1090                 :           1 : void GossipMembership::mark_suspicious(EndPoint ep) {
    1091                 :           1 :     std::unique_lock<std::shared_mutex> lock(members_mutex_);
    1092                 :           1 :     auto it = members_.find(ep);
    1093                 :           1 :     if (it != members_.end()) {
    1094                 :           1 :         it->second.status = MemberStatus::Suspicious;
    1095                 :           1 :         it->second.last_seen = std::chrono::steady_clock::now();
    1096                 :           1 :         HPACTOR_LOG_WARNING(log::LogCategory::kDiscovery, ActorId{0}, 0,
    1097                 :             :                             "discovery node suspected");
    1098                 :             :     }
    1099                 :           1 : }
    1100                 :             : 
    1101                 :           1 : void GossipMembership::mark_dead(EndPoint ep) {
    1102                 :           1 :     Member member_to_fire;
    1103                 :           1 :     bool should_fire = false;
    1104                 :             : 
    1105                 :             :     {
    1106                 :           1 :         std::unique_lock<std::shared_mutex> lock(members_mutex_);
    1107                 :           1 :         auto it = members_.find(ep);
    1108                 :           1 :         if (it != members_.end()) {
    1109                 :           1 :             it->second.status = MemberStatus::Dead;
    1110                 :           1 :             it->second.last_seen = std::chrono::steady_clock::now();
    1111                 :           1 :             HPACTOR_LOG_ERROR(
    1112                 :             :                 log::LogCategory::kDiscovery, ActorId{0},
    1113                 :             :                 static_cast<uint32_t>(log::LogEventId::kDiscoveryNodeDead),
    1114                 :             :                 "discovery node dead");
    1115                 :           1 :             if (member_change_cb_) {
    1116                 :           0 :                 member_to_fire = it->second;
    1117                 :           0 :                 should_fire = true;
    1118                 :             :             }
    1119                 :             :         }
    1120                 :           1 :     }
    1121                 :             : 
    1122                 :           1 :     if (should_fire) {
    1123                 :           0 :         member_change_cb_(member_to_fire, false);
    1124                 :             :     }
    1125                 :           1 : }
    1126                 :             : 
    1127                 :           8 : void GossipMembership::merge_member(const Member& remote) {
    1128                 :           8 :     std::unique_lock<std::shared_mutex> lock(members_mutex_);
    1129                 :             : 
    1130                 :           8 :     auto it = members_.find(remote.identity.endpoint);
    1131                 :           8 :     if (it == members_.end()) {
    1132                 :             :         // First time seeing this endpoint — insert.
    1133                 :           5 :         members_[remote.identity.endpoint] = remote;
    1134                 :           5 :         members_[remote.identity.endpoint].last_seen =
    1135                 :           5 :             std::chrono::steady_clock::now();
    1136                 :           5 :         return;
    1137                 :             :     }
    1138                 :             : 
    1139                 :           3 :     auto& existing = it->second;
    1140                 :             : 
    1141                 :             :     // Only accept updates with a strictly higher incarnation.
    1142                 :           3 :     if (remote.incarnation <= existing.incarnation) {
    1143                 :           1 :         return; // Stale information
    1144                 :             :     }
    1145                 :             : 
    1146                 :             :     // Higher incarnation: accept the update.
    1147                 :             :     // Special case: Dead member with higher incarnation → reactivate.
    1148                 :           2 :     if (existing.status == MemberStatus::Dead &&
    1149                 :           1 :         remote.incarnation > existing.incarnation) {
    1150                 :             :         // The node restarted with a higher incarnation — reactivate to Alive.
    1151                 :           1 :         existing.status = MemberStatus::Alive;
    1152                 :             :     }
    1153                 :             : 
    1154                 :             :     // Update fields from remote that carry meaningful information.
    1155                 :           2 :     existing.incarnation = remote.incarnation;
    1156                 :           2 :     if (remote.status != MemberStatus::Alive ||
    1157                 :           2 :         existing.status == MemberStatus::Dead) {
    1158                 :             :         // Accept status from remote unless we already have it as Dead and it's
    1159                 :             :         // not a reactivation.
    1160                 :             :     }
    1161                 :           2 :     if (remote.status == MemberStatus::Suspicious ||
    1162                 :           2 :         remote.status == MemberStatus::Dead || remote.status == MemberStatus::Left) {
    1163                 :           0 :         existing.status = remote.status;
    1164                 :             :     }
    1165                 :             : 
    1166                 :             :     // Accept metadata if present.
    1167                 :           2 :     if (!remote.actor_types.empty()) {
    1168                 :           0 :         existing.actor_types = remote.actor_types;
    1169                 :             :     }
    1170                 :           2 :     if (!remote.identity.host.empty()) {
    1171                 :           2 :         existing.identity.host = remote.identity.host;
    1172                 :             :     }
    1173                 :           2 :     if (!remote.identity.acceptors.empty()) {
    1174                 :           0 :         existing.identity.acceptors = remote.identity.acceptors;
    1175                 :             :     }
    1176                 :             : 
    1177                 :           2 :     existing.last_seen = std::chrono::steady_clock::now();
    1178                 :           8 : }
    1179                 :             : 
    1180                 :           0 : void GossipMembership::apply_piggyback(const std::vector<PiggybackEntry>& entries) {
    1181                 :           0 :     std::unique_lock<std::shared_mutex> lock(members_mutex_);
    1182                 :             : 
    1183                 :           0 :     for (const auto& entry : entries) {
    1184                 :           0 :         auto it = members_.find(entry.identity.endpoint);
    1185                 :           0 :         if (it == members_.end()) {
    1186                 :             :             // We don't know this endpoint — insert it with the piggyback info.
    1187                 :           0 :             Member m;
    1188                 :           0 :             m.identity.endpoint = entry.identity.endpoint;
    1189                 :           0 :             m.incarnation = entry.incarnation;
    1190                 :           0 :             m.last_seen = std::chrono::steady_clock::now();
    1191                 :             : 
    1192                 :           0 :             switch (entry.type) {
    1193                 :           0 :                 case PiggybackType::Alive:
    1194                 :           0 :                     m.status = MemberStatus::Alive;
    1195                 :           0 :                     break;
    1196                 :           0 :                 case PiggybackType::Suspicious:
    1197                 :           0 :                     m.status = MemberStatus::Suspicious;
    1198                 :           0 :                     break;
    1199                 :           0 :                 case PiggybackType::Dead:
    1200                 :           0 :                     m.status = MemberStatus::Dead;
    1201                 :           0 :                     break;
    1202                 :           0 :                 case PiggybackType::Metadata:
    1203                 :           0 :                     m.status = MemberStatus::Alive;
    1204                 :           0 :                     m.actor_types = entry.actor_types;
    1205                 :           0 :                     m.identity.acceptors = entry.identity.acceptors;
    1206                 :           0 :                     break;
    1207                 :             :             }
    1208                 :           0 :             members_[entry.identity.endpoint] = std::move(m);
    1209                 :           0 :             continue;
    1210                 :           0 :         }
    1211                 :             : 
    1212                 :           0 :         auto& existing = it->second;
    1213                 :             : 
    1214                 :             :         // Only apply if the piggyback incarnation is >= what we have.
    1215                 :           0 :         if (entry.incarnation < existing.incarnation) {
    1216                 :           0 :             continue; // Stale
    1217                 :             :         }
    1218                 :             : 
    1219                 :           0 :         existing.incarnation = entry.incarnation;
    1220                 :           0 :         existing.last_seen = std::chrono::steady_clock::now();
    1221                 :             : 
    1222                 :           0 :         switch (entry.type) {
    1223                 :           0 :             case PiggybackType::Alive:
    1224                 :           0 :                 if (existing.status == MemberStatus::Suspicious ||
    1225                 :           0 :                     existing.status == MemberStatus::Dead) {
    1226                 :           0 :                     existing.status = MemberStatus::Alive;
    1227                 :             :                 }
    1228                 :           0 :                 break;
    1229                 :           0 :             case PiggybackType::Suspicious:
    1230                 :           0 :                 existing.status = MemberStatus::Suspicious;
    1231                 :           0 :                 break;
    1232                 :           0 :             case PiggybackType::Dead:
    1233                 :           0 :                 existing.status = MemberStatus::Dead;
    1234                 :           0 :                 break;
    1235                 :           0 :             case PiggybackType::Metadata:
    1236                 :           0 :                 if (!entry.actor_types.empty()) {
    1237                 :           0 :                     existing.actor_types = entry.actor_types;
    1238                 :             :                 }
    1239                 :           0 :                 if (!entry.identity.acceptors.empty()) {
    1240                 :           0 :                     existing.identity.acceptors = entry.identity.acceptors;
    1241                 :             :                 }
    1242                 :           0 :                 break;
    1243                 :             :         }
    1244                 :             :     }
    1245                 :           0 : }
    1246                 :             : 
    1247                 :           1 : void GossipMembership::purge_dead_tombstones() {
    1248                 :           1 :     auto now = std::chrono::steady_clock::now();
    1249                 :             : 
    1250                 :           5 :     for (auto it = members_.begin(); it != members_.end();) {
    1251                 :           4 :         const auto& m = it->second;
    1252                 :             : 
    1253                 :           6 :         if ((m.status == MemberStatus::Dead || m.status == MemberStatus::Left) &&
    1254                 :           6 :             now - m.last_seen > config_.dead_timeout) {
    1255                 :           2 :             it = members_.erase(it);
    1256                 :           2 :             HPACTOR_LOG_DEBUG(log::LogCategory::kDiscovery, ActorId{0}, 0,
    1257                 :             :                               "discovery cache purged");
    1258                 :             :         } else {
    1259                 :           2 :             ++it;
    1260                 :             :         }
    1261                 :             :     }
    1262                 :           1 : }
    1263                 :             : 
    1264                 :             : std::vector<EndPoint>
    1265                 :           2 : GossipMembership::pick_random_peers(size_t count,
    1266                 :             :                                     std::unordered_set<EndPoint> exclude) {
    1267                 :           2 :     auto& ext = extras_for(this);
    1268                 :             : 
    1269                 :           2 :     std::shared_lock<std::shared_mutex> lock(members_mutex_);
    1270                 :             : 
    1271                 :             :     // Collect Alive peers excluding self and explicitly excluded endpoints.
    1272                 :           2 :     std::vector<EndPoint> candidates;
    1273                 :           7 :     for (const auto& [ep, m] : members_) {
    1274                 :           5 :         if (m.status != MemberStatus::Alive)
    1275                 :           0 :             continue;
    1276                 :           5 :         if (ep == config_.local_state.identity.endpoint)
    1277                 :           2 :             continue; // exclude self
    1278                 :           3 :         if (exclude.find(ep) != exclude.end())
    1279                 :           0 :             continue;
    1280                 :           3 :         candidates.push_back(ep);
    1281                 :             :     }
    1282                 :             : 
    1283                 :           2 :     if (candidates.empty())
    1284                 :           1 :         return {};
    1285                 :           1 :     if (candidates.size() <= count)
    1286                 :           1 :         return candidates;
    1287                 :             : 
    1288                 :             :     // Shuffle and pick the first 'count' elements.
    1289                 :           0 :     std::shuffle(candidates.begin(), candidates.end(), ext.rng);
    1290                 :           0 :     candidates.resize(count);
    1291                 :           0 :     return candidates;
    1292                 :           2 : }
    1293                 :             : 
    1294                 :             : // =============================================================================
    1295                 :             : // Socket setup/teardown
    1296                 :             : // =============================================================================
    1297                 :             : 
    1298                 :           0 : void GossipMembership::setup_udp_socket() {
    1299                 :           0 :     udp_socket_ = socket(AF_INET, SOCK_DGRAM, 0);
    1300                 :           0 :     if (udp_socket_ < 0)
    1301                 :           0 :         return;
    1302                 :             : 
    1303                 :           0 :     int reuse = 1;
    1304                 :           0 :     setsockopt(udp_socket_, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
    1305                 :             : 
    1306                 :             :     struct sockaddr_in addr;
    1307                 :           0 :     memset(&addr, 0, sizeof(addr));
    1308                 :           0 :     addr.sin_family = AF_INET;
    1309                 :           0 :     addr.sin_addr.s_addr = INADDR_ANY;
    1310                 :           0 :     addr.sin_port = htons(config_.gossip_port);
    1311                 :             : 
    1312                 :           0 :     if (bind(udp_socket_, reinterpret_cast<struct sockaddr*>(&addr),
    1313                 :           0 :              sizeof(addr)) < 0) {
    1314                 :           0 :         close(udp_socket_);
    1315                 :           0 :         udp_socket_ = -1;
    1316                 :           0 :         return;
    1317                 :             :     }
    1318                 :             : 
    1319                 :           0 :     if (loop_) {
    1320                 :           0 :         loop_->add_fd(udp_socket_, EventLoop::Event::Read);
    1321                 :           0 :         loop_->set_read_handler(udp_socket_, [this](int /*fd*/) {
    1322                 :             :             // Non-blocking recvfrom loop (edge-triggered).
    1323                 :             :             struct sockaddr_in src_addr;
    1324                 :           0 :             socklen_t src_addr_len = sizeof(src_addr);
    1325                 :             : 
    1326                 :             :             while (true) {
    1327                 :           0 :                 ssize_t n = recvfrom(udp_socket_, recv_buffer_.data(),
    1328                 :           0 :                                      recv_buffer_.size(), MSG_DONTWAIT,
    1329                 :             :                                      reinterpret_cast<struct sockaddr*>(&src_addr),
    1330                 :             :                                      &src_addr_len);
    1331                 :           0 :                 if (n <= 0)
    1332                 :           0 :                     break;
    1333                 :             : 
    1334                 :           0 :                 StreamBuffer data(recv_buffer_.data(),
    1335                 :           0 :                                   recv_buffer_.data() + static_cast<size_t>(n));
    1336                 :             : 
    1337                 :           0 :                 std::string from_host;
    1338                 :           0 :                 uint16_t from_port = 0;
    1339                 :             :                 char ip_str[INET_ADDRSTRLEN];
    1340                 :           0 :                 if (inet_ntop(AF_INET, &src_addr.sin_addr, ip_str, sizeof(ip_str))) {
    1341                 :           0 :                     from_host = ip_str;
    1342                 :             :                 }
    1343                 :           0 :                 from_port = ntohs(src_addr.sin_port);
    1344                 :             : 
    1345                 :           0 :                 handle_packet(data, from_host, from_port);
    1346                 :           0 :             }
    1347                 :           0 :         });
    1348                 :             :     }
    1349                 :             : }
    1350                 :             : 
    1351                 :          25 : void GossipMembership::teardown_udp_socket() {
    1352                 :          25 :     if (udp_socket_ >= 0) {
    1353                 :           0 :         if (loop_) {
    1354                 :           0 :             loop_->clear_read_handler(udp_socket_);
    1355                 :           0 :             loop_->remove_fd(udp_socket_);
    1356                 :             :         }
    1357                 :           0 :         close(udp_socket_);
    1358                 :           0 :         udp_socket_ = -1;
    1359                 :             :     }
    1360                 :          25 : }
    1361                 :             : 
    1362                 :             : } // namespace hpactor::net
        

Generated by: LCOV version 2.0-1