LCOV - code coverage report
Current view: top level - include/hpactor/net - gossip_membership.hpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 75.0 % 8 6
Test Date: 2026-05-20 02:24:49 Functions: 50.0 % 2 1
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #pragma once
      16                 :             : 
      17                 :             : #include <hpactor/adt/node_identity.hpp>
      18                 :             : #include <hpactor/net/event_loop.hpp>
      19                 :             : #include <hpactor/net/service_discovery.hpp>
      20                 :             : 
      21                 :             : #include <chrono>
      22                 :             : #include <cstdint>
      23                 :             : #include <functional>
      24                 :             : #include <shared_mutex>
      25                 :             : #include <string>
      26                 :             : #include <unordered_map>
      27                 :             : #include <unordered_set>
      28                 :             : #include <vector>
      29                 :             : 
      30                 :             : namespace hpactor::net {
      31                 :             : 
      32                 :             : // ── Gossip message types ────────────────────────────────────────
      33                 :             : enum class GossipMessageType : uint8_t {
      34                 :             :     Ping = 0x01,
      35                 :             :     Ack = 0x02,
      36                 :             :     PingReq = 0x03,
      37                 :             :     IndirectAck = 0x04,
      38                 :             :     Join = 0x05,
      39                 :             :     SyncRsp = 0x06,
      40                 :             :     Leave = 0x07,
      41                 :             : };
      42                 :             : 
      43                 :             : enum class PiggybackType : uint8_t {
      44                 :             :     Alive = 0x01,
      45                 :             :     Suspicious = 0x02,
      46                 :             :     Dead = 0x03,
      47                 :             :     Metadata = 0x04,
      48                 :             : };
      49                 :             : 
      50                 :             : struct PiggybackEntry {
      51                 :             :     PiggybackType type;
      52                 :             :     NodeIdentity identity;
      53                 :             :     uint64_t incarnation;
      54                 :             :     // Metadata-only fields:
      55                 :             :     std::vector<std::string> actor_types;
      56                 :             :     uint32_t load = 0;
      57                 :             : };
      58                 :             : 
      59                 :             : // ── Configuration ───────────────────────────────────────────────
      60                 :             : struct GossipConfig {
      61                 :             :     uint16_t gossip_port = 5354;
      62                 :          12 :     std::chrono::milliseconds protocol_period{1000};
      63                 :          12 :     std::chrono::milliseconds ping_timeout{200};
      64                 :          12 :     std::chrono::milliseconds suspicion_timeout{3000};
      65                 :          12 :     std::chrono::milliseconds dead_timeout{30000};
      66                 :             :     uint32_t fanout = 3;
      67                 :             :     uint32_t indirect_probes = 3;
      68                 :             :     std::vector<EndPoint> seeds;
      69                 :             :     // Only endpoint, host, tcp_port, uds_path, acceptors, actor_types are
      70                 :             :     // used as config. incarnation, status, last_seen are set at startup.
      71                 :             :     Member local_state;
      72                 :             : };
      73                 :             : 
      74                 :             : constexpr uint32_t GossipMagic = 0x48504743; // "HPGC"
      75                 :             : constexpr uint8_t GossipVersion = 0x01;
      76                 :             : constexpr size_t kGossipMaxMsgSize = 1400;
      77                 :             : 
      78                 :             : struct PendingPing {
      79                 :             :     std::chrono::steady_clock::time_point expires_at;
      80                 :             :     bool indirect_requested = false;
      81                 :             :     std::chrono::steady_clock::time_point indirect_expires_at;
      82                 :             :     // Note: indirect timeout = direct timeout + ping_timeout (same timeout).
      83                 :             : };
      84                 :             : 
      85                 :             : // ── GossipMembership ────────────────────────────────────────────
      86                 :             : class GossipMembership : public IServiceDiscovery {
      87                 :             :   public:
      88                 :             :     GossipMembership(const GossipConfig& cfg, EventLoop* loop);
      89                 :             :     ~GossipMembership() override;
      90                 :             : 
      91                 :             :     void start() override;
      92                 :             :     void stop() override;
      93                 :             :     std::vector<Member> discover_all() const override;
      94                 :             :     const Member* discover(EndPoint) const override;
      95                 :             :     void announce(Member) override;
      96                 :             :     void on_member_change(MemberChangeCallback) override;
      97                 :           2 :     std::string backend_name() const override {
      98                 :           4 :         return "gossip";
      99                 :             :     }
     100                 :           0 :     const std::unordered_map<EndPoint, Member>* raw_members() const override {
     101                 :           0 :         return &members_;
     102                 :             :     }
     103                 :             : 
     104                 :             :   private:
     105                 :             :     void protocol_round();
     106                 :             :     void handle_packet(const StreamBuffer& data, const std::string& from_host,
     107                 :             :                        uint16_t from_port);
     108                 :             : 
     109                 :             :     // Message handlers
     110                 :             :     void handle_ping(EndPoint sender, uint64_t inc, uint32_t seq,
     111                 :             :                      std::vector<PiggybackEntry> pb, const std::string& host,
     112                 :             :                      uint16_t port);
     113                 :             :     void handle_ack(EndPoint sender, uint64_t inc, std::vector<PiggybackEntry> pb);
     114                 :             :     void handle_ping_req(EndPoint sender, EndPoint target);
     115                 :             :     void handle_indirect_ack(EndPoint sender, EndPoint target);
     116                 :             :     void handle_join(EndPoint sender, uint64_t inc, std::vector<PiggybackEntry> pb);
     117                 :             :     void handle_sync_rsp(std::vector<Member> members);
     118                 :             :     void handle_leave(EndPoint sender, uint64_t inc);
     119                 :             : 
     120                 :             :     // Message sending
     121                 :             :     void send_ping(EndPoint target);
     122                 :             :     void send_ack(EndPoint target, std::vector<PiggybackEntry> pb);
     123                 :             :     void send_ping_req(EndPoint proxy, EndPoint target);
     124                 :             :     void send_indirect_ack(EndPoint target, EndPoint orig_target);
     125                 :             :     void send_join(EndPoint seed);
     126                 :             :     void send_sync_rsp(EndPoint target);
     127                 :             :     void send_leave(EndPoint target);
     128                 :             : 
     129                 :             :     // Wire protocol encode/decode
     130                 :             :     StreamBuffer encode_message(GossipMessageType type, uint64_t inc,
     131                 :             :                                 uint32_t seq, EndPoint ping_target,
     132                 :             :                                 const std::vector<PiggybackEntry>& pb) const;
     133                 :             :     bool
     134                 :             :     decode_message(const StreamBuffer& data, GossipMessageType& type,
     135                 :             :                    EndPoint& sender, uint64_t& inc, uint32_t& seq,
     136                 :             :                    EndPoint& ping_target, std::vector<PiggybackEntry>& pb) const;
     137                 :             :     StreamBuffer encode_sync_rsp(const std::vector<Member>& members) const;
     138                 :             :     bool
     139                 :             :     decode_sync_rsp(const StreamBuffer& data, std::vector<Member>& members) const;
     140                 :             : 
     141                 :             :     // State mutations
     142                 :             :     void mark_suspicious(EndPoint ep);
     143                 :             :     void mark_dead(EndPoint ep);
     144                 :             :     void merge_member(const Member& remote);
     145                 :             :     void apply_piggyback(const std::vector<PiggybackEntry>& entries);
     146                 :             :     void purge_dead_tombstones();
     147                 :             :     std::vector<EndPoint>
     148                 :             :     pick_random_peers(size_t count, std::unordered_set<EndPoint> exclude = {});
     149                 :             : 
     150                 :             :     void setup_udp_socket();
     151                 :             :     void teardown_udp_socket();
     152                 :             : 
     153                 :             :     GossipConfig config_;
     154                 :             :     EventLoop* loop_ = nullptr;
     155                 :             :     int udp_socket_ = -1;
     156                 :             :     uint64_t incarnation_ = 0;
     157                 :             :     uint32_t seq_no_ = 0;
     158                 :             : 
     159                 :             :     std::unordered_map<EndPoint, Member> members_;
     160                 :             :     std::unordered_map<EndPoint, PendingPing> pending_pings_;
     161                 :             :     MemberChangeCallback member_change_cb_;
     162                 :             :     uint64_t protocol_timer_ = 0;
     163                 :             :     bool needs_dissemination_ = false;
     164                 :             :     std::vector<uint8_t> recv_buffer_;
     165                 :             :     mutable std::shared_mutex members_mutex_;
     166                 :             : };
     167                 :             : 
     168                 :             : } // namespace hpactor::net
        

Generated by: LCOV version 2.0-1