Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/actor/event_based_actor.hpp>
16 : : #include <hpactor/actor_context.hpp>
17 : : #include <hpactor/core/actor_system.hpp>
18 : : #include <hpactor/hpactor_config.hpp>
19 : : #include <hpactor/metrics/metrics_event.hpp>
20 : : #include <hpactor/ref/actor_proxy.hpp>
21 : : #include <hpactor/tracing/trace_manager.hpp>
22 : :
23 : : #include <google/protobuf/message.h>
24 : :
25 : : #include <chrono>
26 : : #include <thread>
27 : :
28 : : namespace hpactor {
29 : :
30 : 152 : ActorContext::ActorContext(Actor owner, ActorSystem* system)
31 : 152 : : owner_(std::move(owner)), system_(system) {}
32 : :
33 : 152 : ActorContext::~ActorContext() = default;
34 : :
35 : 2 : ActorContext::TraceScope::TraceScope(ActorContext* ctx, const TraceContext& next) noexcept
36 : 2 : : ctx_(ctx) {
37 : 2 : if (ctx_ == nullptr) {
38 : 0 : return;
39 : : }
40 : 2 : previous_ = ctx_->current_trace_context_;
41 : 2 : previous_valid_ = ctx_->has_current_trace_context_;
42 : 2 : ctx_->set_current_trace_context(next);
43 : : }
44 : :
45 : 4 : ActorContext::TraceScope::~TraceScope() {
46 : 2 : if (ctx_ == nullptr) {
47 : 0 : return;
48 : : }
49 : 2 : if (previous_valid_) {
50 : 0 : ctx_->set_current_trace_context(previous_);
51 : : } else {
52 : 2 : ctx_->clear_current_trace_context();
53 : : }
54 : 2 : }
55 : :
56 : 332 : ActorRef ActorContext::resolve(const ActorAddress& target) {
57 : : // 1. Check cache (hot path)
58 : 332 : auto cached = ref_cache_.get(target.id);
59 : 332 : if (cached.has_value()) {
60 : 301 : return *cached;
61 : : }
62 : :
63 : : // 2. Resolve system pointer
64 : : // NOLINTNEXTLINE(readability-avoid-nested-conditional-operator)
65 : 31 : auto* system = system_ != nullptr
66 : 31 : ? system_
67 : 31 : : (owner_ ? &owner_.get()->system() : nullptr);
68 : 31 : if (system == nullptr) {
69 : 0 : return ActorRef{};
70 : : }
71 : :
72 : : // 3. If target endpoint differs from our own, it's a remote actor.
73 : : // Skip local lookup — actor IDs are only unique within a system.
74 : 31 : if (!(target.endpoint == system->endpoint())) {
75 : 1 : ActorProxy proxy(target, system);
76 : 1 : ActorRef ref(proxy);
77 : : // Only cache if transport was resolved successfully
78 : 1 : if (ref.get_proxy() != nullptr && ref.get_proxy()->transport() != nullptr) {
79 : 0 : ref_cache_.put(target.id, ref);
80 : : }
81 : 1 : return ref;
82 : 1 : }
83 : :
84 : : // 4. Local endpoint: check local actors
85 : 30 : auto actor = system->get_actor(target.id);
86 : 30 : if (actor != nullptr) {
87 : 29 : ActorRef ref{Actor(actor)};
88 : 29 : ref_cache_.put(target.id, ref);
89 : 29 : return ref;
90 : 29 : }
91 : :
92 : 1 : return ActorRef{};
93 : 332 : }
94 : :
95 : 4 : void ActorContext::send(ActorRef& target, TypedMessage msg) {
96 : : // Stamp sender identity for reply tracking
97 : 4 : if (owner_) {
98 : 4 : msg.set_sender_address(owner_.address());
99 : : }
100 : :
101 : 4 : auto* system = system_ != nullptr
102 : 4 : ? system_
103 : 4 : : (owner_ ? &owner_.get()->system() : nullptr);
104 : 4 : if (system != nullptr && system->trace_manager() != nullptr) {
105 : 0 : system->trace_manager()->inject_message_context(
106 : : msg, this,
107 : 0 : system->trace_manager()->config().create_roots_for_actor_context_sends);
108 : : }
109 : :
110 : 4 : target.send(target.address(), std::move(msg));
111 : 4 : }
112 : :
113 : 318 : void ActorContext::send(const ActorAddress& target, TypedMessage msg) {
114 : 318 : (void)try_send(target, std::move(msg));
115 : 318 : }
116 : :
117 : 0 : void ActorContext::send(const ActorAddress& target, TypeTag tag,
118 : : const google::protobuf::Message& proto_msg) {
119 : 0 : TypedMessage msg(tag, proto_msg);
120 : 0 : send(target, std::move(msg));
121 : 0 : }
122 : :
123 : 0 : void ActorContext::send_with_priority(const ActorAddress& target, TypedMessage msg,
124 : : uint8_t priority, int64_t deadline_ns) {
125 : 0 : (void)try_send_with_priority(target, std::move(msg), priority, deadline_ns);
126 : 0 : }
127 : :
128 : : mailbox::EnqueueResult
129 : 329 : ActorContext::try_send(const ActorAddress& target, TypedMessage msg,
130 : : mailbox::DeliveryOptions options) {
131 : 329 : auto ref = resolve(target);
132 : 329 : if (!ref) {
133 : 1 : return {mailbox::EnqueueResultCode::ActorNotFound, target.id};
134 : : }
135 : :
136 : 328 : if (owner_) {
137 : 328 : msg.set_sender_address(owner_.address());
138 : : }
139 : :
140 : 328 : auto* system = owner_ ? &owner_.get()->system() : system_;
141 : 328 : if (system != nullptr && system->trace_manager() != nullptr) {
142 : 1 : system->trace_manager()->inject_message_context(
143 : : msg, this,
144 : 1 : system->trace_manager()->config().create_roots_for_actor_context_sends);
145 : : }
146 : :
147 : 328 : return ref.try_send(ref.address(), std::move(msg), options);
148 : 329 : }
149 : :
150 : : mailbox::EnqueueResult
151 : 1 : ActorContext::try_send_with_priority(const ActorAddress& target, TypedMessage msg,
152 : : uint8_t priority, int64_t deadline_ns,
153 : : mailbox::DeliveryOptions options) {
154 : 1 : auto ref = resolve(target);
155 : 1 : if (!ref) {
156 : 0 : return {mailbox::EnqueueResultCode::ActorNotFound, target.id};
157 : : }
158 : :
159 : 1 : if (owner_) {
160 : 1 : msg.set_sender_address(owner_.address());
161 : : }
162 : :
163 : 1 : auto* system = owner_ ? &owner_.get()->system() : system_;
164 : 1 : if (system != nullptr && system->trace_manager() != nullptr) {
165 : 0 : system->trace_manager()->inject_message_context(
166 : : msg, this,
167 : 0 : system->trace_manager()->config().create_roots_for_actor_context_sends);
168 : : }
169 : :
170 : 1 : if (ref.is_local()) {
171 : 1 : if (system != nullptr) {
172 : 2 : return system->try_deliver_local(target.id, std::move(msg),
173 : 1 : priority, deadline_ns, options);
174 : : }
175 : 0 : return {mailbox::EnqueueResultCode::ActorNotFound, target.id};
176 : : }
177 : :
178 : 0 : return ref.try_send(ref.address(), std::move(msg), options);
179 : 1 : }
180 : :
181 : 4 : void ActorContext::reply(TypedMessage msg) {
182 : 4 : if (current_sender_.id != ActorId{0}) {
183 : 3 : send(current_sender_, std::move(msg));
184 : : }
185 : 4 : }
186 : :
187 : 0 : void ActorContext::reply(TypeTag tag, const google::protobuf::Message& proto_msg) {
188 : 0 : TypedMessage msg(tag, proto_msg);
189 : 0 : reply(std::move(msg));
190 : 0 : }
191 : :
192 : 1 : void ActorContext::reply_with_error(const error& err) {
193 : 1 : if (current_sender_.id == ActorId{0}) {
194 : 0 : return;
195 : : }
196 : :
197 : : // Wire format: [4 bytes: error code BE][error message string]
198 : : // A protobuf error message can replace this payload later without
199 : : // changing the TypeTag or dispatch path.
200 : 1 : StreamBuffer payload;
201 : 1 : const uint32_t code = err.code();
202 : 1 : payload.push_back(static_cast<uint8_t>((code >> 24) & 0xFF));
203 : 1 : payload.push_back(static_cast<uint8_t>((code >> 16) & 0xFF));
204 : 1 : payload.push_back(static_cast<uint8_t>((code >> 8) & 0xFF));
205 : 1 : payload.push_back(static_cast<uint8_t>(code & 0xFF));
206 : 1 : const auto& msg = err.message();
207 : 1 : payload.insert(payload.end(), msg.begin(), msg.end());
208 : :
209 : 1 : TypedMessage error_msg(TypeTag::ErrorMsg, std::move(payload));
210 : 1 : send(current_sender_, std::move(error_msg));
211 : 1 : }
212 : :
213 : : AlarmHandle
214 : 2 : ActorContext::schedule(std::chrono::milliseconds delay, TypedMessage msg) {
215 : 2 : auto* sched = system_->scheduler();
216 : 2 : if (!sched)
217 : 0 : return AlarmHandle{};
218 : :
219 : 2 : msg.set_sender_address(owner_.address());
220 : :
221 : 2 : ActorId self_id = owner_.id();
222 : 2 : ActorSystem* sys = system_;
223 : :
224 : : // Wrap in shared_ptr because TypedMessage is move-only and std::function
225 : : // requires a copyable callable.
226 : 2 : auto msg_ptr = std::make_shared<TypedMessage>(std::move(msg));
227 : 0 : auto callback = [sys, self_id, msg_ptr]() {
228 : 0 : sys->deliver_local(self_id, std::move(*msg_ptr));
229 : 2 : };
230 : :
231 : : int64_t delay_ns =
232 : 2 : std::chrono::duration_cast<std::chrono::nanoseconds>(delay).count();
233 : 2 : auto handle = sched->schedule_after(std::move(callback), delay_ns);
234 : 2 : return AlarmHandle{handle.value()};
235 : 2 : }
236 : :
237 : 5 : void ActorContext::cancel_schedule(AlarmHandle handle) {
238 : 5 : if (handle.value() == 0)
239 : 2 : return;
240 : 3 : auto* sched = system_->scheduler();
241 : 3 : if (!sched)
242 : 0 : return;
243 : 3 : sched->cancel_timer(sched::TimerHandle{handle.value()});
244 : : }
245 : :
246 : 3 : std::vector<Actor> ActorContext::children() const {
247 : 3 : return children_;
248 : : }
249 : :
250 : 1 : void ActorContext::add_child(Actor child) {
251 : 1 : children_.push_back(std::move(child));
252 : 1 : }
253 : :
254 : 1 : void ActorContext::remove_child(Actor child) {
255 : 1 : for (auto it = children_.begin(); it != children_.end(); ++it) {
256 : 1 : if (it->address() == child.address()) {
257 : 1 : children_.erase(it);
258 : 1 : return;
259 : : }
260 : : }
261 : : }
262 : :
263 : 2 : std::vector<ActorRef> ActorContext::remote_children() const {
264 : 2 : return remote_children_;
265 : : }
266 : :
267 : 1 : void ActorContext::add_remote_child(ActorRef child) {
268 : 1 : remote_children_.push_back(std::move(child));
269 : 1 : }
270 : :
271 : 46 : std::vector<ActorAddress> ActorContext::linked_actors() const {
272 : 46 : return linked_;
273 : : }
274 : :
275 : 1 : void ActorContext::monitor(const ActorAddress& target) {
276 : 1 : add_monitored(target);
277 : 1 : }
278 : :
279 : 0 : RpcFuture<StreamBuffer> ActorContext::rpc(const ActorAddress& target,
280 : : const StreamBuffer& encoded_request,
281 : : std::chrono::milliseconds timeout_ms) {
282 : : const TraceContext* trace =
283 : 0 : has_current_trace_context() ? ¤t_trace_context() : nullptr;
284 : 0 : return system_->rpc_channel().call_raw(target, encoded_request, timeout_ms,
285 : 0 : trace);
286 : : }
287 : :
288 : : RpcFuture<StreamBuffer>
289 : 0 : ActorContext::http_get(const std::string& url, std::vector<net::HttpHeader> headers) {
290 : 0 : return system_->http_client().get(url, std::move(headers));
291 : : }
292 : :
293 : : RpcFuture<StreamBuffer>
294 : 0 : ActorContext::http_post(const std::string& url, StreamBuffer body,
295 : : std::vector<net::HttpHeader> headers) {
296 : 0 : return system_->http_client().post(url, std::move(body), std::move(headers));
297 : : }
298 : :
299 : : RpcFuture<StreamBuffer>
300 : 0 : ActorContext::http_put(const std::string& url, StreamBuffer body,
301 : : std::vector<net::HttpHeader> headers) {
302 : 0 : return system_->http_client().put(url, std::move(body), std::move(headers));
303 : : }
304 : :
305 : : RpcFuture<StreamBuffer>
306 : 0 : ActorContext::http_delete(const std::string& url,
307 : : std::vector<net::HttpHeader> headers) {
308 : 0 : return system_->http_client().del(url, std::move(headers));
309 : : }
310 : :
311 : : RpcFuture<StreamBuffer>
312 : 0 : ActorContext::http_request(net::HttpMethod method, const std::string& url,
313 : : std::vector<net::HttpHeader> headers, StreamBuffer body) {
314 : 0 : return system_->http_client().request(method, url, std::move(headers),
315 : 0 : std::move(body));
316 : : }
317 : :
318 : 3 : void ActorContext::on_backpressure(BackpressureHandler handler) {
319 : 3 : backpressure_handler_ = std::move(handler);
320 : 3 : }
321 : :
322 : 2 : void ActorContext::handle_backpressure(const mailbox::BackpressureSignal& signal) {
323 : 2 : if (backpressure_handler_) {
324 : 1 : backpressure_handler_(signal);
325 : : }
326 : 2 : }
327 : :
328 : 4 : void ActorContext::stop(ActorId target) {
329 : : // Resolve system pointer
330 : 4 : auto* system = system_ != nullptr
331 : 4 : ? system_
332 : 4 : : (owner_ ? &owner_.get()->system() : nullptr);
333 : 4 : if (system == nullptr) {
334 : 1 : return;
335 : : }
336 : :
337 : : // Resolve target actor
338 : 4 : auto actor = system->get_actor(target);
339 : 4 : if (actor == nullptr) {
340 : 0 : return;
341 : : }
342 : :
343 : : // Emit drain start metric event
344 : 4 : if (auto* ring_buf = system->metrics_ring_buffer()) {
345 : 4 : metrics::MetricEvent evt{};
346 : 4 : evt.actor_id = target;
347 : 4 : evt.event_type = metrics::MetricEventType::kActorDrainStart;
348 : 4 : evt.value_hi = 1;
349 : 4 : ring_buf->try_push(evt);
350 : : }
351 : :
352 : : // Check for lifecycle support
353 : 4 : auto* lc = actor->as_lifecycle();
354 : 4 : if (lc == nullptr) {
355 : : // No lifecycle: call on_exit directly if EventBasedActor
356 : 1 : if (actor->is_event_based_actor()) {
357 : 1 : auto* eba = static_cast<EventBasedActor*>(actor.get());
358 : 1 : eba->on_exit();
359 : : }
360 : 1 : return;
361 : : }
362 : :
363 : 3 : auto policy = lc->drain_config().policy;
364 : :
365 : 3 : if (policy == DrainPolicy::ImmediateStop) {
366 : : // Drain mailbox directly (dead-letter all messages)
367 : 2 : if (actor->is_event_based_actor()) {
368 : 2 : auto* eba = static_cast<EventBasedActor*>(actor.get());
369 : 2 : eba->drain_all_immediate();
370 : : }
371 : : // Drive lifecycle: kActive -> kStopping -> kStopped
372 : 2 : lc->transition(LifecycleState::kStopping);
373 : 2 : lc->transition(LifecycleState::kStopped);
374 : : // Notify linked/monitored actors
375 : 2 : if (actor->is_event_based_actor()) {
376 : 2 : auto* eba = static_cast<EventBasedActor*>(actor.get());
377 : 2 : eba->on_exit();
378 : : }
379 : : } else {
380 : : // Transition to kDraining (invokes on_drain() hook)
381 : 1 : lc->transition(LifecycleState::kDraining);
382 : : // Start drain timer (completes drain on timeout or when mailbox
383 : : // empties)
384 : 1 : if (actor->is_event_based_actor()) {
385 : 1 : auto* eba = static_cast<EventBasedActor*>(actor.get());
386 : 1 : eba->start_drain_timer();
387 : : } else {
388 : : // Non-EventBasedActor with lifecycle but no drain timer support:
389 : : // transition directly to stopped.
390 : 0 : lc->transition(LifecycleState::kStopping);
391 : 0 : lc->transition(LifecycleState::kStopped);
392 : : }
393 : : }
394 : 4 : }
395 : :
396 : : result<void>
397 : 2 : ActorContext::stop_sync(ActorId target, std::chrono::milliseconds timeout) {
398 : 2 : stop(target);
399 : :
400 : : // Resolve system pointer for polling
401 : 2 : auto* system = system_ != nullptr
402 : 2 : ? system_
403 : 2 : : (owner_ ? &owner_.get()->system() : nullptr);
404 : 2 : if (system == nullptr) {
405 : 0 : return result<void>::make(error(errors::actor_not_found, "no actor "
406 : : "system "
407 : 0 : "available"));
408 : : }
409 : :
410 : 2 : auto deadline = std::chrono::steady_clock::now() + timeout;
411 : :
412 : 3 : while (std::chrono::steady_clock::now() < deadline) {
413 : 2 : auto actor = system->get_actor(target);
414 : 2 : if (actor == nullptr) {
415 : : // Actor removed from registry (already fully stopped / cleaned up)
416 : 0 : return result<void>::make();
417 : : }
418 : :
419 : 2 : auto* lc = actor->as_lifecycle();
420 : 2 : if (lc != nullptr && lc->state() == LifecycleState::kStopped) {
421 : 1 : return result<void>::make();
422 : : }
423 : 1 : if (lc == nullptr) {
424 : : // Non-lifecycle actors: on_exit was called synchronously in stop(),
425 : : // consider it done.
426 : 0 : return result<void>::make();
427 : : }
428 : :
429 : 1 : std::this_thread::sleep_for(std::chrono::milliseconds(10));
430 : 2 : }
431 : :
432 : 2 : return result<void>::make(error(errors::timeout, "stop_sync timed out"));
433 : : }
434 : :
435 : : } // namespace hpactor
|