Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/adt/node_identity.hpp>
18 : : #include <hpactor/net/event_loop.hpp>
19 : : #include <hpactor/net/service_discovery.hpp>
20 : :
21 : : #include <chrono>
22 : : #include <cstdint>
23 : : #include <functional>
24 : : #include <shared_mutex>
25 : : #include <string>
26 : : #include <unordered_map>
27 : : #include <unordered_set>
28 : : #include <vector>
29 : :
30 : : namespace hpactor::net {
31 : :
32 : : // ── Gossip message types ────────────────────────────────────────
33 : : enum class GossipMessageType : uint8_t {
34 : : Ping = 0x01,
35 : : Ack = 0x02,
36 : : PingReq = 0x03,
37 : : IndirectAck = 0x04,
38 : : Join = 0x05,
39 : : SyncRsp = 0x06,
40 : : Leave = 0x07,
41 : : };
42 : :
43 : : enum class PiggybackType : uint8_t {
44 : : Alive = 0x01,
45 : : Suspicious = 0x02,
46 : : Dead = 0x03,
47 : : Metadata = 0x04,
48 : : };
49 : :
50 : : struct PiggybackEntry {
51 : : PiggybackType type;
52 : : NodeIdentity identity;
53 : : uint64_t incarnation;
54 : : // Metadata-only fields:
55 : : std::vector<std::string> actor_types;
56 : : uint32_t load = 0;
57 : : };
58 : :
59 : : // ── Configuration ───────────────────────────────────────────────
60 : : struct GossipConfig {
61 : : uint16_t gossip_port = 5354;
62 : 12 : std::chrono::milliseconds protocol_period{1000};
63 : 12 : std::chrono::milliseconds ping_timeout{200};
64 : 12 : std::chrono::milliseconds suspicion_timeout{3000};
65 : 12 : std::chrono::milliseconds dead_timeout{30000};
66 : : uint32_t fanout = 3;
67 : : uint32_t indirect_probes = 3;
68 : : std::vector<EndPoint> seeds;
69 : : // Only endpoint, host, tcp_port, uds_path, acceptors, actor_types are
70 : : // used as config. incarnation, status, last_seen are set at startup.
71 : : Member local_state;
72 : : };
73 : :
74 : : constexpr uint32_t GossipMagic = 0x48504743; // "HPGC"
75 : : constexpr uint8_t GossipVersion = 0x01;
76 : : constexpr size_t kGossipMaxMsgSize = 1400;
77 : :
78 : : struct PendingPing {
79 : : std::chrono::steady_clock::time_point expires_at;
80 : : bool indirect_requested = false;
81 : : std::chrono::steady_clock::time_point indirect_expires_at;
82 : : // Note: indirect timeout = direct timeout + ping_timeout (same timeout).
83 : : };
84 : :
85 : : // ── GossipMembership ────────────────────────────────────────────
86 : : class GossipMembership : public IServiceDiscovery {
87 : : public:
88 : : GossipMembership(const GossipConfig& cfg, EventLoop* loop);
89 : : ~GossipMembership() override;
90 : :
91 : : void start() override;
92 : : void stop() override;
93 : : std::vector<Member> discover_all() const override;
94 : : const Member* discover(EndPoint) const override;
95 : : void announce(Member) override;
96 : : void on_member_change(MemberChangeCallback) override;
97 : 2 : std::string backend_name() const override {
98 : 4 : return "gossip";
99 : : }
100 : 0 : const std::unordered_map<EndPoint, Member>* raw_members() const override {
101 : 0 : return &members_;
102 : : }
103 : :
104 : : private:
105 : : void protocol_round();
106 : : void handle_packet(const StreamBuffer& data, const std::string& from_host,
107 : : uint16_t from_port);
108 : :
109 : : // Message handlers
110 : : void handle_ping(EndPoint sender, uint64_t inc, uint32_t seq,
111 : : std::vector<PiggybackEntry> pb, const std::string& host,
112 : : uint16_t port);
113 : : void handle_ack(EndPoint sender, uint64_t inc, std::vector<PiggybackEntry> pb);
114 : : void handle_ping_req(EndPoint sender, EndPoint target);
115 : : void handle_indirect_ack(EndPoint sender, EndPoint target);
116 : : void handle_join(EndPoint sender, uint64_t inc, std::vector<PiggybackEntry> pb);
117 : : void handle_sync_rsp(std::vector<Member> members);
118 : : void handle_leave(EndPoint sender, uint64_t inc);
119 : :
120 : : // Message sending
121 : : void send_ping(EndPoint target);
122 : : void send_ack(EndPoint target, std::vector<PiggybackEntry> pb);
123 : : void send_ping_req(EndPoint proxy, EndPoint target);
124 : : void send_indirect_ack(EndPoint target, EndPoint orig_target);
125 : : void send_join(EndPoint seed);
126 : : void send_sync_rsp(EndPoint target);
127 : : void send_leave(EndPoint target);
128 : :
129 : : // Wire protocol encode/decode
130 : : StreamBuffer encode_message(GossipMessageType type, uint64_t inc,
131 : : uint32_t seq, EndPoint ping_target,
132 : : const std::vector<PiggybackEntry>& pb) const;
133 : : bool
134 : : decode_message(const StreamBuffer& data, GossipMessageType& type,
135 : : EndPoint& sender, uint64_t& inc, uint32_t& seq,
136 : : EndPoint& ping_target, std::vector<PiggybackEntry>& pb) const;
137 : : StreamBuffer encode_sync_rsp(const std::vector<Member>& members) const;
138 : : bool
139 : : decode_sync_rsp(const StreamBuffer& data, std::vector<Member>& members) const;
140 : :
141 : : // State mutations
142 : : void mark_suspicious(EndPoint ep);
143 : : void mark_dead(EndPoint ep);
144 : : void merge_member(const Member& remote);
145 : : void apply_piggyback(const std::vector<PiggybackEntry>& entries);
146 : : void purge_dead_tombstones();
147 : : std::vector<EndPoint>
148 : : pick_random_peers(size_t count, std::unordered_set<EndPoint> exclude = {});
149 : :
150 : : void setup_udp_socket();
151 : : void teardown_udp_socket();
152 : :
153 : : GossipConfig config_;
154 : : EventLoop* loop_ = nullptr;
155 : : int udp_socket_ = -1;
156 : : uint64_t incarnation_ = 0;
157 : : uint32_t seq_no_ = 0;
158 : :
159 : : std::unordered_map<EndPoint, Member> members_;
160 : : std::unordered_map<EndPoint, PendingPing> pending_pings_;
161 : : MemberChangeCallback member_change_cb_;
162 : : uint64_t protocol_timer_ = 0;
163 : : bool needs_dissemination_ = false;
164 : : std::vector<uint8_t> recv_buffer_;
165 : : mutable std::shared_mutex members_mutex_;
166 : : };
167 : :
168 : : } // namespace hpactor::net
|