Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/supervision/all_for_one_supervisor.hpp>
16 : : #include <hpactor/supervision/one_for_one_supervisor.hpp>
17 : : #include <hpactor/supervision/supervision.hpp>
18 : :
19 : : #include <hpactor/actor/lifecycle_actor.hpp>
20 : : #include <hpactor/messages.pb.h>
21 : : #include <hpactor/metrics/metrics_event.hpp>
22 : :
23 : : namespace hpactor {
24 : :
25 : 0 : void Supervisor::on_child_stopped(ActorId /*child_id*/) {}
26 : :
27 : 4 : OneForOneSupervisor::OneForOneSupervisor(SupervisionPolicy policy)
28 : 4 : : policy_(policy) {}
29 : :
30 : : SupervisionDirective
31 : 5 : OneForOneSupervisor::on_child_failure(const ChildFailure& failure) {
32 : 5 : return failure.directive;
33 : : }
34 : :
35 : 2 : AllForOneSupervisor::AllForOneSupervisor(SupervisionPolicy policy)
36 : 2 : : policy_(policy) {}
37 : :
38 : : SupervisionDirective
39 : 2 : AllForOneSupervisor::on_child_failure(const ChildFailure& /*failure*/) {
40 : 2 : return SupervisionDirective::Restart;
41 : : }
42 : :
43 : 1 : SupervisorActor::SupervisorActor(ActorContext* ctx, ActorSystem& sys,
44 : 1 : Supervisor& strategy, std::vector<Actor> children)
45 : 1 : : EventBasedActor(ctx, sys), strategy_(strategy),
46 : 1 : children_(mem::MemStdAllocator<Actor>(id_ptr(), mem::RegionType::kActor)),
47 : 2 : first_failure_time_(std::chrono::steady_clock::time_point::min()) {
48 : 1 : children_.reserve(children.size());
49 : 1 : for (auto& child : children) {
50 : 0 : children_.push_back(std::move(child));
51 : : }
52 : 1 : become(make_behavior());
53 : 1 : }
54 : :
55 : 1 : Behavior SupervisorActor::make_behavior() {
56 : 2 : return Behavior{[this](TypedMessage& msg) {
57 : 0 : if (msg.type_id() == TypeTag::DownMsg) {
58 : 0 : handle_child_down(msg.type_id(), msg.payload());
59 : : }
60 : 1 : }};
61 : : }
62 : :
63 : 0 : void SupervisorActor::handle_child_down(TypeTag /*tag*/,
64 : : const StreamBuffer& payload) {
65 : : auto pb =
66 : 0 : mem::allocate_shared<::hpactor::DownMessage>(id(), mem::RegionType::kActor);
67 : 0 : if (!pb->ParseFromArray(payload.data(), static_cast<int>(payload.size()))) {
68 : 0 : return;
69 : : }
70 : :
71 : 0 : ActorId child_id(pb->actor_id());
72 : 0 : error reason(pb->reason_code());
73 : :
74 : 0 : ChildFailure failure{child_id, reason, SupervisionDirective::Restart};
75 : 0 : auto directive = strategy_.on_child_failure(failure);
76 : :
77 : 0 : switch (directive) {
78 : 0 : case SupervisionDirective::Restart:
79 : 0 : restart_child(child_id, reason);
80 : 0 : break;
81 : 0 : case SupervisionDirective::Stop:
82 : 0 : children_.erase(std::remove_if(children_.begin(), children_.end(),
83 : 0 : [&child_id](const Actor& a) {
84 : 0 : return a.id() == child_id;
85 : : }),
86 : 0 : children_.end());
87 : 0 : break;
88 : 0 : case SupervisionDirective::Escalate:
89 : 0 : break;
90 : : }
91 : 0 : }
92 : :
93 : 0 : void SupervisorActor::restart_child(ActorId child_id, const error& reason) {
94 : 0 : auto now = std::chrono::steady_clock::now();
95 : 0 : auto& count = restart_counts_[child_id];
96 : :
97 : 0 : if (now - first_failure_time_ > std::chrono::milliseconds(5000)) {
98 : 0 : count = 0;
99 : 0 : first_failure_time_ = now;
100 : : }
101 : :
102 : 0 : if (count >= 10) {
103 : 0 : children_.erase(std::remove_if(children_.begin(), children_.end(),
104 : 0 : [&child_id](const Actor& a) {
105 : 0 : return a.id() == child_id;
106 : : }),
107 : 0 : children_.end());
108 : 0 : restart_counts_.erase(child_id);
109 : 0 : return;
110 : : }
111 : :
112 : 0 : ++count;
113 : :
114 : : // Drive lifecycle for the failing child
115 : 0 : if (auto actor = system().get_actor(child_id)) {
116 : 0 : if (auto* lc = actor->as_lifecycle()) {
117 : 0 : lc->set_failure_reason(reason);
118 : 0 : lc->transition(LifecycleState::kFailed);
119 : 0 : lc->bump_incarnation();
120 : 0 : lc->transition(LifecycleState::kStarting);
121 : : }
122 : 0 : }
123 : :
124 : 0 : if (metrics_ring_buffer_) [[unlikely]] {
125 : 0 : metrics::MetricEvent evt{};
126 : 0 : evt.actor_id = id();
127 : 0 : evt.event_type = metrics::MetricEventType::kSupervisorRestart;
128 : 0 : evt.value_hi = static_cast<uint32_t>(child_id.value());
129 : 0 : metrics_ring_buffer_->try_push(evt);
130 : : }
131 : : }
132 : :
133 : 0 : void SupervisorActor::restart_all_children() {
134 : 0 : for (auto& child : children_) {
135 : 0 : restart_child(child.id(), error(0));
136 : : }
137 : 0 : }
138 : :
139 : 1 : SelfSupervisingActor::SelfSupervisingActor(ActorContext* ctx, ActorSystem& sys,
140 : 1 : SupervisionPolicy policy)
141 : 1 : : EventBasedActor(ctx, sys), policy_(policy),
142 : 1 : first_failure_time_(std::chrono::steady_clock::time_point::min()) {}
143 : :
144 : 0 : void SelfSupervisingActor::add_child(Actor child) {
145 : 0 : children_.push_back(std::move(child));
146 : 0 : }
147 : :
148 : 0 : void SelfSupervisingActor::remove_child(Actor child) {
149 : 0 : children_.erase(std::remove_if(children_.begin(), children_.end(),
150 : 0 : [&child](const Actor& a) {
151 : 0 : return a.address() == child.address();
152 : : }),
153 : 0 : children_.end());
154 : 0 : }
155 : :
156 : 0 : void SelfSupervisingActor::add_remote_child(ActorRef child) {
157 : 0 : remote_children_.push_back(child);
158 : 0 : remote_child_addresses_.push_back(child.address());
159 : 0 : }
160 : :
161 : 0 : bool SelfSupervisingActor::has_remote_child(const ActorAddress& addr) const {
162 : 0 : for (const auto& child_addr : remote_child_addresses_) {
163 : 0 : if (child_addr == addr)
164 : 0 : return true;
165 : : }
166 : 0 : return false;
167 : : }
168 : :
169 : 0 : ActorRef SelfSupervisingActor::get_remote_child(const ActorAddress& addr) const {
170 : 0 : for (size_t i = 0; i < remote_child_addresses_.size(); ++i) {
171 : 0 : if (remote_child_addresses_[i] == addr) {
172 : 0 : return remote_children_[i];
173 : : }
174 : : }
175 : 0 : return ActorRef{};
176 : : }
177 : :
178 : 0 : void SelfSupervisingActor::remove_remote_child(const ActorAddress& addr) {
179 : 0 : for (size_t i = 0; i < remote_child_addresses_.size(); ++i) {
180 : 0 : if (remote_child_addresses_[i] == addr) {
181 : 0 : remote_children_.erase(remote_children_.begin() +
182 : 0 : static_cast<std::ptrdiff_t>(i));
183 : 0 : remote_child_addresses_.erase(remote_child_addresses_.begin() +
184 : 0 : static_cast<std::ptrdiff_t>(i));
185 : 0 : return;
186 : : }
187 : : }
188 : : }
189 : :
190 : : SupervisionDirective
191 : 0 : SelfSupervisingActor::on_failure(ActorId child_id, const error& err) {
192 : 0 : return decide_restart(child_id, err);
193 : : }
194 : :
195 : 0 : void SelfSupervisingActor::handle_child_down(TypeTag /*tag*/,
196 : : const StreamBuffer& payload) {
197 : : auto pb =
198 : 0 : mem::allocate_shared<::hpactor::DownMessage>(id(), mem::RegionType::kActor);
199 : 0 : if (!pb->ParseFromArray(payload.data(), static_cast<int>(payload.size()))) {
200 : 0 : return;
201 : : }
202 : 0 : decide_restart(ActorId(pb->actor_id()), error(pb->reason_code()));
203 : 0 : }
204 : :
205 : : SupervisionDirective
206 : 0 : SelfSupervisingActor::decide_restart(ActorId child_id, const error& err) {
207 : 0 : auto now = std::chrono::steady_clock::now();
208 : 0 : auto& count = restart_counts_[child_id];
209 : :
210 : 0 : if (now - first_failure_time_ > policy_.restart_interval) {
211 : 0 : count = 0;
212 : 0 : first_failure_time_ = now;
213 : : }
214 : :
215 : 0 : if (count >= policy_.max_restarts) {
216 : 0 : return SupervisionDirective::Stop;
217 : : }
218 : :
219 : 0 : ++count;
220 : 0 : return on_failure(child_id, err);
221 : : }
222 : :
223 : : } // namespace hpactor
|