Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/actor/event_based_actor.hpp>
16 : : #include <hpactor/actor/http_gateway_actor.hpp>
17 : : #include <hpactor/actor/local_actor.hpp>
18 : : #include <hpactor/actor/spawn_receiver.hpp>
19 : : #include <hpactor/actor_type_registry.hpp>
20 : : #include <hpactor/config/actor_factory_registry.hpp>
21 : : #include <hpactor/config/toml_parser.hpp>
22 : : #include <hpactor/core/actor_system.hpp>
23 : : #include <hpactor/hpactor_config.hpp>
24 : :
25 : : #include <hpactor/cli/cli_actor.hpp>
26 : : #include <hpactor/core/actor_system_ids.hpp>
27 : : #include <hpactor/log/log_manager.hpp>
28 : : #include <hpactor/log/logger.hpp>
29 : : #include <hpactor/mem/std_allocator.hpp>
30 : : #include <hpactor/net/async_io_fwd.hpp>
31 : : #include <hpactor/net/frame.hpp>
32 : : #include <hpactor/net/tcp_transport.hpp>
33 : : #include <hpactor/sched/scheduler.hpp>
34 : : #include <hpactor/spawn.hpp>
35 : :
36 : : // Protobuf message types for spawn serialization
37 : : #include <hpactor/common.pb.h>
38 : : #include <hpactor/messages.pb.h>
39 : :
40 : : namespace hpactor {
41 : :
42 : : // -----------------------------------------------------------------------------
43 : : // actor_registry implementation
44 : : // -----------------------------------------------------------------------------
45 : 106 : actor_registry::actor_registry(EndPoint endpoint) : endpoint_(endpoint) {}
46 : :
47 : 9 : void actor_registry::put(const std::string& name, ActorAddress addr) {
48 : 9 : actors_[name] = addr;
49 : 9 : }
50 : :
51 : 9 : ActorAddress actor_registry::get(const std::string& name) const {
52 : 9 : auto it = actors_.find(name);
53 : 9 : if (it != actors_.end()) {
54 : 9 : return it->second;
55 : : }
56 : 0 : return invalid_actor_addr;
57 : : }
58 : :
59 : 0 : void actor_registry::erase(const std::string& name) {
60 : 0 : actors_.erase(name);
61 : 0 : }
62 : :
63 : : // -----------------------------------------------------------------------------
64 : : // ActorSystem implementation
65 : : // -----------------------------------------------------------------------------
66 : 106 : ActorSystem::ActorSystem(const Config& config)
67 : 106 : : config_(config), endpoint_(config.endpoint), registry_(endpoint_),
68 : 106 : scheduler_(std::make_unique<sched::HybridScheduler>(
69 : 212 : *this, config.scheduler_threads, 4, config.timer_backend,
70 : 106 : config.scheduler_start_paused)),
71 : 212 : actor_type_registry_(std::make_unique<ActorTypeRegistry>()) {
72 : : // Register system protobuf types
73 : 106 : proto_registry_.register_system_types();
74 : :
75 : : // Initialize dead-letter queue
76 : : dead_letters_ =
77 : 106 : std::make_unique<mailbox::DeadLetterQueue>(config_.dead_letters);
78 : :
79 : : // Initialize metrics subsystem (before scheduler so instrumentation is
80 : : // ready)
81 : 106 : if (metrics_config_.enabled) {
82 : : metrics_ring_buffer_ =
83 : 106 : std::make_shared<metrics::MpscRingBuffer<metrics::MetricEvent>>();
84 : 106 : scheduler_->set_metrics_ring_buffer(metrics_ring_buffer_.get());
85 : : }
86 : :
87 : : // Initialize logging subsystem before starting the scheduler so that
88 : : // worker threads see a valid global logger, not a dangling pointer
89 : : // left over from a previous ActorSystem instance.
90 : 106 : if (logging_config_.enabled) {
91 : 106 : log_manager_ = std::make_unique<log::LogManager>(logging_config_);
92 : 106 : log_manager_->start();
93 : 106 : logger_ = &log_manager_->logger();
94 : : }
95 : :
96 : : // Wire logger to scheduler for scheduler-event logs
97 : 106 : if (logger_) [[unlikely]] {
98 : 106 : scheduler_->set_logger(logger_);
99 : : }
100 : :
101 : 106 : scheduler_->start();
102 : :
103 : 106 : apply_tracing_config(config_.tracing);
104 : :
105 : 106 : if (config.enable_network) {
106 : 0 : network_loop_ = std::make_unique<net::EventLoop>();
107 : 0 : network_loop_->set_actor_system(this);
108 : :
109 : : // ── Service discovery backend ────────────────────────────
110 : 0 : if (config.service_discovery) {
111 : 0 : discovery_ = config.service_discovery;
112 : 0 : } else if (config.registrar.udp_port > 0) {
113 : : auto reg = std::make_shared<net::UdpRegistrar>(
114 : 0 : config.registrar, endpoint_, network_loop_.get());
115 : 0 : discovery_ = reg;
116 : 0 : registrar_ = reg; // shared ownership for registrar() accessor
117 : 0 : } else {
118 : : discovery_ =
119 : 0 : std::make_shared<net::StaticDiscovery>(std::vector<net::Member>{});
120 : : }
121 : :
122 : 0 : discovery_->start();
123 : :
124 : 0 : discovery_->on_member_change([this](const net::Member& m, bool joined) {
125 : 0 : if (!joined) {
126 : 0 : on_node_dead(m.identity.endpoint);
127 : : }
128 : : // Note: proactive connection pool warming (prewarm_pool) will be
129 : : // integrated in a follow-up task when ConnectionPool is updated.
130 : 0 : });
131 : :
132 : 0 : location_cache_ = std::make_shared<net::ActorLocationCache>();
133 : 0 : if (network_loop_) {
134 : 0 : cache_purge_timer_ = network_loop_->run_every(
135 : 0 : [this]() {
136 : 0 : if (location_cache_)
137 : 0 : location_cache_->purge_expired();
138 : 0 : },
139 : : 60000);
140 : : }
141 : :
142 : 0 : transport_ = std::make_unique<net::TcpTransport>(endpoint_, config.tls,
143 : 0 : config.pool, nullptr);
144 : :
145 : : rpc_channel_ =
146 : 0 : std::make_unique<RpcChannel>(transport_.get(), scheduler_.get());
147 : :
148 : 0 : if (config_.enable_http_client) {
149 : 0 : http_client_ = std::make_unique<net::HttpClient>(network_loop_.get());
150 : : }
151 : :
152 : 0 : if (config_.enable_http_gateway) {
153 : 0 : http_gateway_actor_ = spawn<net::HTTPGatewayActor>(
154 : 0 : config_.http_bind_host, config_.http_port);
155 : : }
156 : :
157 : 0 : if (config.tcp_port > 0) {
158 : 0 : transport_->set_rpc_handler(
159 : 0 : [this](const hpactor::RpcResponseFrame& response) {
160 : 0 : rpc_channel_->on_response(response);
161 : 0 : });
162 : :
163 : 0 : transport_->set_actor_message_handler([this](const net::WireFrame& frame) {
164 : 0 : this->deliver_remote(frame);
165 : 0 : });
166 : :
167 : 0 : transport_->listen(config.tcp_port);
168 : : }
169 : :
170 : 0 : network_thread_ = std::thread([this]() {
171 : 0 : while (network_loop_->wait(100) >= 0) {
172 : 0 : network_loop_->process_completions();
173 : 0 : if (!is_running())
174 : 0 : break;
175 : : }
176 : 0 : });
177 : :
178 : : auto spawn_receiver = std::make_shared<SpawnReceiver>(
179 : 0 : *this, *actor_type_registry_, transport_.get());
180 : 0 : spawn_receiver->set_address(
181 : : ActorAddress{endpoint_, SystemActorType, SpawnReceiverId, 0});
182 : :
183 : : {
184 : 0 : std::lock_guard<std::mutex> lock(actors_mutex_);
185 : 0 : actors_.emplace(SpawnReceiverId, spawn_receiver);
186 : 0 : }
187 : :
188 : : {
189 : 0 : std::lock_guard<std::mutex> lock(mailboxes_mutex_);
190 : 0 : mailboxes_.emplace(
191 : : SpawnReceiverId,
192 : 0 : std::make_unique<mailbox::MPSCActorMailbox<TypedMessage>>(
193 : 0 : SpawnReceiverId, scheduler_.get(), mailbox_config_for_spawn()));
194 : 0 : }
195 : 0 : }
196 : :
197 : : // Spawn CLI actor (runtime opt-in via config_.cli.enabled)
198 : 106 : if (config_.cli.enabled) {
199 : 0 : auto spawned = spawn<cli::CliActor>(config_.cli);
200 : 0 : cli_actor_ = std::static_pointer_cast<cli::CliActor>(spawned.get());
201 : 0 : }
202 : 106 : }
203 : :
204 : 106 : ActorSystem::~ActorSystem() {
205 : 106 : running_.store(false);
206 : 106 : if (config_.enable_network) {
207 : 0 : if (network_loop_) {
208 : 0 : network_loop_->stop();
209 : : }
210 : 0 : if (network_thread_.joinable()) {
211 : 0 : network_thread_.join();
212 : : }
213 : 0 : if (transport_) {
214 : 0 : transport_->stop_listening();
215 : : }
216 : 0 : if (discovery_) {
217 : 0 : discovery_->stop();
218 : : }
219 : : }
220 : 106 : if (log_manager_) {
221 : 106 : log_manager_->stop();
222 : : }
223 : 106 : if (trace_manager_) {
224 : 2 : trace_manager_->stop();
225 : : }
226 : 106 : scheduler_->stop();
227 : 106 : }
228 : :
229 : 113 : void ActorSystem::apply_tracing_config(const tracing::TraceConfig& config) {
230 : 113 : tracing_config_ = config;
231 : 113 : if (!tracing_config_.enabled) {
232 : 111 : if (trace_manager_) {
233 : 0 : trace_manager_->stop();
234 : 0 : trace_manager_.reset();
235 : : }
236 : 111 : return;
237 : : }
238 : 2 : trace_manager_ = std::make_unique<tracing::TraceManager>(tracing_config_, this);
239 : 2 : trace_manager_->start();
240 : : }
241 : :
242 : 0 : void ActorSystem::on_node_dead(EndPoint dead_ep) {
243 : : // Find all actors linked to or monitoring actors on the dead endpoint.
244 : : // Uses the internal actor_contexts_ map directly (actor_context() is
245 : : // a protected member of AbstractActor, not accessible from ActorSystem).
246 : 0 : std::lock_guard<std::mutex> lock(actor_contexts_mutex_);
247 : 0 : for (const auto& [id, ctx] : actor_contexts_) {
248 : 0 : if (!ctx)
249 : 0 : continue;
250 : 0 : for (const auto& addr : ctx->linked_actors()) {
251 : 0 : if (addr.endpoint == dead_ep) {
252 : 0 : TypedMessage down(TypeTag::DownMsg, StreamBuffer{});
253 : 0 : down.set_sender_address(ActorAddress{dead_ep, 0, ActorId(0), 0});
254 : 0 : deliver_local(id, std::move(down));
255 : 0 : break;
256 : 0 : }
257 : 0 : }
258 : : }
259 : 0 : if (location_cache_)
260 : 0 : location_cache_->evict_node(dead_ep);
261 : 0 : }
262 : :
263 : 3 : void ActorSystem::signal_backpressure(const mailbox::BackpressureSignal& signal) {
264 : 3 : if (signal.sender.id == ActorId{0})
265 : 1 : return; // no sender
266 : :
267 : 2 : std::lock_guard<std::mutex> lock(actor_contexts_mutex_);
268 : 2 : auto it = actor_contexts_.find(signal.sender.id);
269 : 2 : if (it != actor_contexts_.end() && it->second) {
270 : 2 : it->second->handle_backpressure(signal);
271 : : }
272 : 2 : }
273 : :
274 : 0 : void ActorSystem::register_actor(const std::string& name, Actor actor) {
275 : 0 : registry_.put(name, actor.address());
276 : 0 : }
277 : :
278 : 0 : Actor ActorSystem::resolve_actor(const std::string& name) {
279 : 0 : ActorAddress addr = registry_.get(name);
280 : 0 : if (!addr) {
281 : 0 : return Actor{};
282 : : }
283 : 0 : return Actor{};
284 : : }
285 : :
286 : 0 : void ActorSystem::unregister_actor(const std::string& name) {
287 : 0 : registry_.erase(name);
288 : 0 : }
289 : :
290 : 0 : void ActorSystem::register_actor_type(const ActorTypeDef& def) {
291 : 0 : actor_types_[def.id] = def;
292 : 0 : }
293 : :
294 : 0 : ActorTypeDef ActorSystem::get_actor_type(ActorType type) const {
295 : 0 : auto it = actor_types_.find(type);
296 : 0 : if (it != actor_types_.end()) {
297 : 0 : return it->second;
298 : : }
299 : 0 : return ActorTypeDef{};
300 : : }
301 : :
302 : 602 : std::shared_ptr<AbstractActor> ActorSystem::get_actor(ActorId id) {
303 : 602 : std::lock_guard<std::mutex> lock(actors_mutex_);
304 : 602 : auto it = actors_.find(id);
305 : 602 : if (it != actors_.end()) {
306 : 580 : return it->second;
307 : : }
308 : 22 : return nullptr;
309 : 602 : }
310 : :
311 : 723 : mailbox::MPSCActorMailbox<TypedMessage>* ActorSystem::get_mailbox(ActorId id) {
312 : 723 : std::lock_guard<std::mutex> lock(mailboxes_mutex_);
313 : 723 : auto it = mailboxes_.find(id);
314 : 723 : if (it != mailboxes_.end()) {
315 : 722 : return it->second.get();
316 : : }
317 : 1 : return nullptr;
318 : 723 : }
319 : :
320 : 0 : size_t ActorSystem::actor_count() const {
321 : 0 : std::lock_guard<std::mutex> lock(actors_mutex_);
322 : 0 : return actors_.size();
323 : 0 : }
324 : :
325 : 6 : void ActorSystem::for_each_actor(
326 : : std::function<void(ActorId, AbstractActor&)> callback) const {
327 : 6 : std::lock_guard<std::mutex> lock(actors_mutex_);
328 : 19 : for (auto& [id, actor] : actors_) {
329 : 13 : callback(id, *actor);
330 : : }
331 : 6 : }
332 : :
333 : 0 : cli::CliActor* ActorSystem::cli_actor() const {
334 : 0 : return cli_actor_.get();
335 : : }
336 : :
337 : : // -----------------------------------------------------------------------------
338 : : // Dead-letter queue
339 : : // -----------------------------------------------------------------------------
340 : 34 : bool ActorSystem::dead_letter(mailbox::DeadLetterRecord record) noexcept {
341 : 34 : if (!dead_letters_) {
342 : 0 : return false;
343 : : }
344 : 34 : return dead_letters_->try_push(std::move(record));
345 : : }
346 : :
347 : 5 : mailbox::DeadLetterQueueSnapshot ActorSystem::dead_letter_snapshot() const noexcept {
348 : 5 : if (!dead_letters_) {
349 : 0 : return {};
350 : : }
351 : 5 : return dead_letters_->snapshot();
352 : : }
353 : :
354 : 23 : bool ActorSystem::pop_dead_letter(mailbox::DeadLetterRecord& out) noexcept {
355 : 23 : if (!dead_letters_) {
356 : 0 : return false;
357 : : }
358 : 23 : return dead_letters_->try_pop(out);
359 : : }
360 : :
361 : : // -----------------------------------------------------------------------------
362 : : // Mailbox config helpers
363 : : // -----------------------------------------------------------------------------
364 : 118 : mailbox::MailboxConfig ActorSystem::mailbox_config_for_spawn() const {
365 : 118 : mailbox::MailboxConfig cfg;
366 : 118 : cfg.capacity.max_messages = config_.mailbox.default_capacity;
367 : 118 : cfg.capacity.max_bytes = config_.mailbox.default_byte_capacity;
368 : 118 : cfg.overflow_policy = config_.mailbox.default_policy;
369 : 118 : cfg.high_watermark = config_.mailbox.high_watermark;
370 : 118 : cfg.low_watermark = config_.mailbox.low_watermark;
371 : 118 : cfg.protected_system_messages = config_.mailbox.protected_system_messages;
372 : 118 : cfg.backpressure_mode = config_.mailbox.backpressure_mode;
373 : 118 : return cfg;
374 : : }
375 : :
376 : : mailbox::MailboxConfig
377 : 10 : ActorSystem::mailbox_config_for_actor_def(const config::ActorDef& def) const {
378 : 10 : auto cfg = mailbox_config_for_spawn();
379 : 10 : if (def.mailbox_capacity != 0) {
380 : 1 : cfg.capacity.max_messages = def.mailbox_capacity;
381 : : }
382 : 10 : if (def.mailbox.policy != mailbox::OverflowPolicy::RejectNewest) {
383 : 0 : cfg.overflow_policy = def.mailbox.policy;
384 : : }
385 : 10 : cfg.priority_aware = def.mailbox.priority_aware;
386 : 10 : cfg.max_overflow_depth = def.mailbox.max_overflow_depth;
387 : 10 : return cfg;
388 : : }
389 : :
390 : : // -----------------------------------------------------------------------------
391 : : // try_deliver_local — bounded admission boundary
392 : : // -----------------------------------------------------------------------------
393 : : mailbox::EnqueueResult
394 : 365 : ActorSystem::try_deliver_local(ActorId target, TypedMessage msg,
395 : : uint8_t priority, int64_t deadline_ns,
396 : : mailbox::DeliveryOptions options) {
397 : 365 : auto* mailbox = get_mailbox(target);
398 : 365 : if (mailbox == nullptr) {
399 : : // Capture dead letter for missing actor
400 : 1 : if (dead_letters_) {
401 : 1 : mailbox::DeadLetterRecord dl;
402 : 1 : dl.reason = mailbox::DeadLetterReason::ActorNotFound;
403 : 1 : dl.source = mailbox::DeadLetterSource::LocalDelivery;
404 : 1 : dl.sender = msg.sender_address();
405 : 1 : dl.target = ActorAddress{endpoint_, ActorType{0}, target, 0};
406 : 1 : dl.type_tag = msg.type_id();
407 : 1 : dl.message_id = options.message_id;
408 : 1 : dl.frame_flags = options.flags;
409 : 1 : dl.priority = priority;
410 : 1 : dl.deadline_ns = deadline_ns;
411 : 1 : dl.payload_sample = msg.payload();
412 : 1 : (void)dead_letter(std::move(dl));
413 : 1 : }
414 : :
415 : 1 : mailbox::EnqueueResult r;
416 : 1 : r.code = mailbox::EnqueueResultCode::ActorNotFound;
417 : 1 : r.target = target;
418 : 1 : return r;
419 : : }
420 : :
421 : 364 : mailbox::MailboxEnvelopeMeta meta;
422 : 364 : meta.sender = msg.sender_address();
423 : 364 : meta.type_tag = msg.type_id();
424 : 364 : meta.priority = priority;
425 : 364 : meta.deadline_ns = deadline_ns;
426 : 364 : meta.flags = options.flags;
427 : 364 : meta.message_id = options.message_id;
428 : 364 : if (options.no_drop) {
429 : 0 : meta.flags |= net::WireFrame::NoDrop;
430 : : }
431 : :
432 : 364 : auto result = mailbox->try_push(std::move(msg), meta);
433 : :
434 : : // Capture dead letter when mailbox rejects and policy is DeadLetter
435 : 364 : if (!result.accepted() && mailbox->config().overflow_policy ==
436 : : mailbox::OverflowPolicy::DeadLetter) {
437 : 0 : if (dead_letters_) {
438 : 0 : mailbox::DeadLetterRecord dl;
439 : 0 : dl.reason = mailbox::DeadLetterReason::OverflowPolicy;
440 : 0 : dl.source = mailbox::DeadLetterSource::MailboxAdmission;
441 : 0 : dl.sender = meta.sender;
442 : 0 : dl.target = ActorAddress{endpoint_, ActorType{0}, target, 0};
443 : 0 : dl.type_tag = meta.type_tag;
444 : 0 : dl.message_id = meta.message_id;
445 : 0 : dl.frame_flags = meta.flags;
446 : 0 : dl.priority = meta.priority;
447 : 0 : dl.deadline_ns = meta.deadline_ns;
448 : 0 : dl.mailbox_depth = result.depth;
449 : 0 : dl.mailbox_capacity = result.capacity;
450 : 0 : (void)dead_letter(std::move(dl));
451 : 0 : }
452 : : }
453 : :
454 : : // Emit backpressure signal when target mailbox is under soft pressure
455 : 364 : if (result.code == mailbox::EnqueueResultCode::AcceptedWithSoftPressure &&
456 : 4 : options.emit_backpressure) {
457 : 3 : mailbox::BackpressureSignal signal;
458 : 3 : signal.target = ActorAddress{endpoint_, ActorType{0}, target, 0};
459 : 3 : signal.sender = meta.sender;
460 : 3 : signal.reason = mailbox::BackpressureReason::HighWatermark;
461 : 3 : signal.depth = result.depth;
462 : 3 : signal.capacity = result.capacity;
463 : 3 : signal.pressure_ratio = result.pressure_ratio;
464 : 3 : signal.retry_after = result.retry_after;
465 : 3 : signal_backpressure(signal);
466 : : }
467 : :
468 : 364 : return result;
469 : : }
470 : :
471 : 28 : void ActorSystem::deliver_local(ActorId target, TypedMessage msg) {
472 : 28 : (void)try_deliver_local(target, std::move(msg));
473 : 28 : }
474 : :
475 : 0 : void ActorSystem::deliver_local(ActorId target, TypedMessage msg,
476 : : uint8_t priority, int64_t deadline_ns) {
477 : 0 : (void)try_deliver_local(target, std::move(msg), priority, deadline_ns, {});
478 : 0 : }
479 : :
480 : 1 : void ActorSystem::deliver_remote(const net::WireFrame& frame) {
481 : 1 : StreamBuffer payload(frame.pb_frame.payload().begin(),
482 : 2 : frame.pb_frame.payload().end());
483 : 1 : TypedMessage msg(static_cast<TypeTag>(frame.pb_frame.type_tag()),
484 : 2 : std::move(payload));
485 : 1 : msg.set_sender_address(net::from_proto(frame.pb_frame.sender()));
486 : 1 : if (frame.pb_frame.has_trace_context()) {
487 : 0 : uint16_t max_state = tracing_config_.max_tracestate_len;
488 : : auto parsed = net::trace_context_from_proto(
489 : 0 : frame.pb_frame.trace_context(), max_state);
490 : 0 : if (parsed.has_value()) {
491 : 0 : msg.set_trace_context(parsed.value());
492 : : }
493 : 0 : }
494 : 1 : deliver_local(net::from_proto(frame.pb_frame.receiver()).id, std::move(msg));
495 : 1 : }
496 : :
497 : 0 : void ActorSystem::enqueue_completion(net::OpCompletion completion) {
498 : : // Pack the completion into a StreamBuffer for mailbox delivery.
499 : : // The binary layout is a flat memcpy of the OpCompletion struct —
500 : : // it is local-only (same process, same address space), so no
501 : : // endianness or portability concerns.
502 : 0 : StreamBuffer payload(sizeof(net::OpCompletion));
503 : 0 : std::memcpy(payload.data(), &completion, sizeof(net::OpCompletion));
504 : :
505 : 0 : TypedMessage msg(TypeTag::IoCompletionTag, std::move(payload));
506 : 0 : deliver_local(completion.actor, std::move(msg));
507 : 0 : }
508 : :
509 : 3 : net::Transport* ActorSystem::get_transport_for(const EndPoint& /*endpoint*/) {
510 : : // TcpTransport already handles per-endpoint routing via its internal pools_
511 : : // map — TcpTransport::send() calls get_or_create_pool(target.endpoint)
512 : : // internally. Return the single transport_ for all remote endpoints.
513 : 3 : if (!config_.enable_network) {
514 : 3 : return nullptr;
515 : : }
516 : 0 : return transport_.get();
517 : : }
518 : :
519 : 0 : result<ActorRef> ActorSystem::spawn_remote(const std::string& node_name,
520 : : const std::string& actor_type,
521 : : const StreamBuffer& /*args*/) {
522 : 0 : return spawn_remote_async(node_name, actor_type, StreamBuffer{}).get();
523 : : }
524 : :
525 : 0 : AsyncActor ActorSystem::spawn_remote_async(const std::string& node_name,
526 : : const std::string& actor_type,
527 : : const StreamBuffer& /*args*/) {
528 : 0 : AsyncActor handle(endpoint_, config_.spawn_timeout_ms);
529 : :
530 : 0 : if (!config_.enable_network || !transport_) {
531 : 0 : SpawnResponse resp;
532 : 0 : resp.error_code = spawn_errors::node_unreachable;
533 : 0 : handle.set_response(resp);
534 : 0 : return handle;
535 : : }
536 : :
537 : 0 : auto remote_endpoint = endpoint_ops::parse_endpoint(node_name);
538 : :
539 : : // Serialize spawn request using protobuf
540 : 0 : ::hpactor::SpawnRequestMessage pb_req;
541 : : pb_req.set_actor_type_name(actor_type);
542 : 0 : pb_req.set_args_type(static_cast<uint32_t>(TypeTag::User));
543 : 0 : net::to_proto(pb_req.mutable_supervisor(), system_actor_.address());
544 : :
545 : 0 : StreamBuffer request_bytes = proto_registry_.serialize(pb_req);
546 : 0 : uint64_t msg_id = generate_message_id().value();
547 : :
548 : 0 : net::WireFrame frame;
549 : 0 : net::to_proto(frame.pb_frame.mutable_sender(), system_actor_.address());
550 : 0 : net::to_proto(
551 : : frame.pb_frame.mutable_receiver(),
552 : 0 : ActorAddress{remote_endpoint, SystemActorType, SpawnReceiverId, 0});
553 : 0 : frame.pb_frame.set_message_id(msg_id);
554 : 0 : frame.pb_frame.set_flags(net::WireFrame::RpcRequest);
555 : 0 : frame.pb_frame.set_payload(reinterpret_cast<const char*>(request_bytes.data()),
556 : : request_bytes.size());
557 : :
558 : 0 : auto pending = std::make_shared<AsyncActor>(std::move(handle));
559 : 0 : pending->set_message_id(msg_id);
560 : :
561 : : {
562 : 0 : std::lock_guard<std::mutex> lock(pending_spawns_mutex_);
563 : 0 : pending_spawns_.emplace(msg_id, pending);
564 : 0 : }
565 : :
566 : 0 : transport_->send(net::from_proto(frame.pb_frame.receiver()), frame.encode());
567 : :
568 : 0 : return std::move(*pending);
569 : 0 : }
570 : :
571 : : // -----------------------------------------------------------------------------
572 : : // spawn_configured — spawn a pre-constructed actor with ActorDef config
573 : : // -----------------------------------------------------------------------------
574 : 10 : Actor ActorSystem::spawn_configured(std::shared_ptr<AbstractActor> actor,
575 : : const config::ActorDef& def) {
576 : 20 : ActorId id(next_actor_id_.fetch_add(1));
577 : 10 : actor->set_address(ActorAddress(endpoint_, actor->type(), id, 0));
578 : 10 : actor->set_type_name(def.behavior);
579 : :
580 : : {
581 : 10 : std::lock_guard<std::mutex> lock(actors_mutex_);
582 : 10 : actors_.emplace(id, actor);
583 : 10 : }
584 : :
585 : : // Create mailbox with capacity from ActorDef
586 : : {
587 : 10 : std::lock_guard<std::mutex> lock(mailboxes_mutex_);
588 : 10 : mailboxes_.emplace(
589 : 10 : id, std::make_unique<mailbox::MPSCActorMailbox<TypedMessage>>(
590 : 10 : id, scheduler_.get(), mailbox_config_for_actor_def(def)));
591 : 10 : }
592 : :
593 : : // Create actor context and set it on the actor
594 : 10 : auto* local = static_cast<LocalActor*>(actor.get());
595 : 10 : auto actor_ctx = std::make_unique<ActorContext>(Actor(actor), this);
596 : 10 : local->set_context(actor_ctx.get());
597 : 10 : actor_contexts_.emplace(id, std::move(actor_ctx));
598 : :
599 : : // Set scheduler and mailbox on actor
600 : 10 : actor->set_scheduler(scheduler_.get());
601 : 10 : actor->set_mailbox(mailboxes_[id].get());
602 : :
603 : : // Register with scheduler. Actor class policy is authoritative for
604 : : // specialized actors such as DaemonActor and DenseComputingActor; TOML can
605 : : // only upgrade otherwise-cooperative actors.
606 : 10 : auto policy = actor->dispatch_policy();
607 : 10 : auto hints = actor->dispatch_hints();
608 : 10 : if (policy == sched::DispatchPolicy::Cooperative) {
609 : 9 : switch (def.dispatch_policy) {
610 : 9 : case config::DispatchPolicy::Cooperative:
611 : 9 : break;
612 : 0 : case config::DispatchPolicy::DedicatedThread:
613 : 0 : policy = sched::DispatchPolicy::DedicatedThread;
614 : 0 : break;
615 : 0 : case config::DispatchPolicy::DedicatedPool:
616 : 0 : policy = sched::DispatchPolicy::DedicatedPool;
617 : 0 : break;
618 : : }
619 : : }
620 : :
621 : 10 : switch (policy) {
622 : 9 : case sched::DispatchPolicy::Cooperative:
623 : 9 : scheduler_->notify_ready(id, 0, INT64_MAX);
624 : 9 : break;
625 : 0 : case sched::DispatchPolicy::DedicatedThread:
626 : 0 : scheduler_->register_dedicated_thread(id, hints.cpu_affinity);
627 : 0 : break;
628 : 1 : case sched::DispatchPolicy::DedicatedPool:
629 : 1 : scheduler_->register_dedicated_pool(id, hints.pool_size);
630 : 1 : break;
631 : : }
632 : :
633 : : // Activate the actor (DaemonActor starts its thread here, etc.)
634 : 10 : local->on_activate();
635 : :
636 : 10 : return Actor(actor);
637 : 10 : }
638 : :
639 : : // -----------------------------------------------------------------------------
640 : : // load_topology — convenience entry point for TOML-based bootstrapping
641 : : // -----------------------------------------------------------------------------
642 : 7 : result<void> ActorSystem::load_topology(const std::string& toml_path) {
643 : 7 : auto parse_result = config::TomlParser::parse(toml_path);
644 : 7 : if (!parse_result.has_value()) {
645 : 0 : return result<void>::make(parse_result.error());
646 : : }
647 : :
648 : 7 : auto& model = parse_result.value();
649 : :
650 : : // Apply system-level metrics config from topology
651 : 7 : if (model.system.metrics_enabled) {
652 : 7 : metrics_config_.enabled = model.system.metrics_enabled;
653 : 7 : metrics_config_.ring_buffer_capacity =
654 : 7 : model.system.metrics_ring_buffer_capacity;
655 : 7 : metrics_config_.metrics_path = model.system.metrics_path;
656 : : }
657 : :
658 : : // Apply system-level logging config from topology
659 : 7 : logging_config_ = model.system.logging;
660 : :
661 : : // Apply shared system fields from topology
662 : : // NOLINTBEGIN(cppcoreguidelines-macro-usage)
663 : : #define HPACTOR_SYSTEM_TOML_FIELD(name, type, toml, def) \
664 : : config_.name = static_cast<decltype(config_.name)>(model.system.name);
665 : : #include <hpactor/config/system_toml_fields.def>
666 : : #undef HPACTOR_SYSTEM_TOML_FIELD
667 : :
668 : : // Apply mailbox defaults from topology
669 : : #define HPACTOR_MAILBOX_FIELD(name, type, toml, def) \
670 : : config_.mailbox.name = model.system.mailbox.name;
671 : : #include <hpactor/config/mailbox_fields.def>
672 : : #undef HPACTOR_MAILBOX_FIELD
673 : : // NOLINTEND(cppcoreguidelines-macro-usage)
674 : :
675 : 7 : config_.dead_letters = model.system.dead_letters;
676 : : dead_letters_ =
677 : 7 : std::make_unique<mailbox::DeadLetterQueue>(config_.dead_letters);
678 : :
679 : 7 : apply_tracing_config(model.system.tracing);
680 : :
681 : 7 : HPACTOR_LOG_INFO(log::LogCategory::kConfig, ActorId{0}, 0,
682 : : "topology bootstrap complete");
683 : :
684 : : // Validate all behaviors are registered
685 : 7 : auto& registry = config::ActorFactoryRegistry::instance();
686 : 16 : for (const auto& actor_def : model.actors) {
687 : 10 : if (!registry.has(actor_def.behavior)) {
688 : 1 : error err(errors::unknown);
689 : 1 : return result<void>::make(std::move(err));
690 : 1 : }
691 : : }
692 : :
693 : : // Spawn actors in topological order; track numeric IDs for SystemInit
694 : : std::vector<ActorId, mem::MemStdAllocator<ActorId>> spawned_ids(
695 : 6 : mem::MemStdAllocator<ActorId>(system_actor_.id(),
696 : 6 : mem::RegionType::kInternal));
697 : 15 : for (const auto& actor_def : model.actors) {
698 : 9 : auto factory = registry.get_factory(actor_def.behavior);
699 : 9 : auto actor_ptr = factory(nullptr, *this);
700 : :
701 : 9 : Actor actor_handle = spawn_configured(std::move(actor_ptr), actor_def);
702 : :
703 : : // Register in name registry
704 : 9 : registry_.put(actor_def.id, actor_handle.address());
705 : :
706 : 9 : spawned_ids.push_back(actor_handle.id());
707 : 9 : }
708 : :
709 : : // Broadcast SystemInit to all spawned actors
710 : 6 : ActorAddress sys_addr = system_actor_.address();
711 : 6 : StreamBuffer empty_payload;
712 : 15 : for (ActorId id : spawned_ids) {
713 : 9 : TypedMessage init_msg(TypeTag::SystemInitTag, std::move(empty_payload));
714 : 9 : init_msg.set_sender_address(sys_addr);
715 : 9 : deliver_local(id, std::move(init_msg));
716 : 9 : empty_payload = StreamBuffer{};
717 : 9 : }
718 : :
719 : 6 : return result<void>::make();
720 : 7 : }
721 : :
722 : : } // namespace hpactor
723 : :
724 : : // ═══════════════════════════════════════════════════════════════════════════════
725 : : // Shutdown helpers (anonymous namespace, uses public ActorSystem API only)
726 : : // ═══════════════════════════════════════════════════════════════════════════════
727 : :
728 : : namespace hpactor {
729 : : namespace {
730 : :
731 : : struct ActorDrainInfo {
732 : : ActorId id;
733 : : bool is_system;
734 : : };
735 : :
736 : 13 : void initiate_actor_drain(ActorSystem& sys, ActorId id) {
737 : 13 : auto actor = sys.get_actor(id);
738 : 13 : if (!actor)
739 : 0 : return;
740 : :
741 : 13 : auto* lc = actor->as_lifecycle();
742 : 13 : if (lc == nullptr) {
743 : : // No lifecycle: call on_exit directly if EventBasedActor
744 : 0 : if (actor->is_event_based_actor()) {
745 : 0 : static_cast<EventBasedActor*>(actor.get())->on_exit();
746 : : }
747 : 0 : return;
748 : : }
749 : :
750 : 13 : auto state = lc->state();
751 : : // Skip actors that are already stopping or stopped
752 : 13 : if (state == LifecycleState::kStopping || state == LifecycleState::kStopped)
753 : 2 : return;
754 : :
755 : 11 : auto policy = lc->drain_config().policy;
756 : :
757 : 11 : if (policy == DrainPolicy::ImmediateStop) {
758 : : // Drain mailbox synchronously (dead-letter all messages)
759 : 7 : if (actor->is_event_based_actor()) {
760 : 7 : static_cast<EventBasedActor*>(actor.get())->drain_all_immediate();
761 : : } else {
762 : 0 : auto* mailbox = sys.get_mailbox(id);
763 : 0 : if (mailbox) {
764 : 0 : TypedMessage msg;
765 : 0 : while (mailbox->try_pop(msg)) {
766 : : // Messages dropped — equivalent to dead-lettering
767 : : }
768 : 0 : }
769 : : }
770 : : // Drive lifecycle: kActive -> kStopping -> kStopped
771 : 7 : lc->transition(LifecycleState::kStopping);
772 : 7 : lc->transition(LifecycleState::kStopped);
773 : : // Notify linked/monitored actors
774 : 7 : if (actor->is_event_based_actor()) {
775 : 7 : static_cast<EventBasedActor*>(actor.get())->on_exit();
776 : : }
777 : : } else {
778 : : // Drain / DropUserMessages / deferred policies:
779 : : // transition to kDraining, let EventBasedActor::receive() / drain
780 : : // timer handle completion.
781 : 4 : if (state == LifecycleState::kActive) {
782 : 4 : lc->transition(LifecycleState::kDraining);
783 : : }
784 : : // Start drain timer if EventBasedActor
785 : 4 : if (actor->is_event_based_actor()) {
786 : 4 : static_cast<EventBasedActor*>(actor.get())->start_drain_timer();
787 : : } else {
788 : : // Non-EventBasedActor with lifecycle but no drain timer:
789 : : // transition directly to stopped.
790 : 0 : lc->transition(LifecycleState::kStopping);
791 : 0 : lc->transition(LifecycleState::kStopped);
792 : : }
793 : : }
794 : 13 : }
795 : :
796 : 12 : void poll_drain_complete(ActorSystem& sys, ActorId id,
797 : : std::chrono::steady_clock::time_point deadline) {
798 : 33 : while (std::chrono::steady_clock::now() < deadline) {
799 : 30 : auto actor = sys.get_actor(id);
800 : 30 : if (!actor)
801 : 0 : return; // Actor removed from registry
802 : :
803 : 30 : auto* lc = actor->as_lifecycle();
804 : 30 : if (lc == nullptr)
805 : 0 : return; // No lifecycle — already handled
806 : :
807 : 30 : if (lc->state() == LifecycleState::kStopped)
808 : 9 : return; // Drain complete
809 : :
810 : 21 : std::this_thread::sleep_for(std::chrono::milliseconds(1));
811 : 30 : }
812 : : }
813 : :
814 : : } // anonymous namespace
815 : : } // namespace hpactor
816 : :
817 : : // ═══════════════════════════════════════════════════════════════════════════════
818 : : // ActorSystem — shutdown implementation
819 : : // ═══════════════════════════════════════════════════════════════════════════════
820 : :
821 : : namespace hpactor {
822 : :
823 : 0 : result<void> ActorSystem::shutdown() {
824 : 0 : return shutdown(ShutdownOptions{});
825 : : }
826 : :
827 : 6 : result<void> ActorSystem::shutdown(const ShutdownOptions& opts) {
828 : 6 : ShutdownPhase phase = ShutdownPhase::Running;
829 : :
830 : : // Helper: check if we should force-stop (modifies phase/running in place)
831 : 58 : auto check_force = [&](std::chrono::steady_clock::time_point deadline) -> bool {
832 : 58 : if (!opts.force_after_timeout)
833 : 0 : return false;
834 : 58 : if (std::chrono::steady_clock::now() < deadline)
835 : 46 : return false;
836 : 12 : phase = ShutdownPhase::ForcedStop;
837 : 12 : shutdown_phase_.store(ShutdownPhase::ForcedStop, std::memory_order_release);
838 : 12 : running_.store(false, std::memory_order_release);
839 : 12 : return true;
840 : 6 : };
841 : :
842 : : // ── Phase: DrainingIngress ──────────────────────────────────────────
843 : 6 : phase = ShutdownPhase::DrainingIngress;
844 : 6 : shutdown_phase_.store(ShutdownPhase::DrainingIngress, std::memory_order_release);
845 : 6 : is_ready_.store(false, std::memory_order_release);
846 : :
847 : 6 : auto ingress_deadline = std::chrono::steady_clock::now() + opts.ingress_timeout;
848 : : // (HTTP gateway / remote spawn gating deferred to follow-up tasks)
849 : 6 : if (check_force(ingress_deadline))
850 : 0 : return result<void>::make();
851 : :
852 : : // ── Phase: DrainingActors ──────────────────────────────────────────
853 : 6 : phase = ShutdownPhase::DrainingActors;
854 : 6 : shutdown_phase_.store(ShutdownPhase::DrainingActors, std::memory_order_release);
855 : : auto actor_deadline =
856 : 6 : std::chrono::steady_clock::now() + opts.actor_drain_timeout;
857 : :
858 : : // Collect actor IDs under lock, then drain in order
859 : : {
860 : 6 : std::vector<ActorDrainInfo> actors;
861 : 6 : for_each_actor([&](ActorId id, AbstractActor& actor) {
862 : 13 : actors.push_back({id, actor.is_system_actor()});
863 : 13 : });
864 : : // Lock released — safe to call into actors
865 : :
866 : : // Pass 1: initiate drain for non-system actors
867 : 19 : for (auto& info : actors) {
868 : 13 : if (info.is_system)
869 : 1 : continue;
870 : 12 : initiate_actor_drain(*this, info.id);
871 : 12 : if (check_force(actor_deadline))
872 : 0 : break;
873 : : }
874 : : // Poll non-system actors to completion
875 : 6 : if (!check_force(actor_deadline)) {
876 : 15 : for (const auto& info : actors) {
877 : 12 : if (info.is_system)
878 : 1 : continue;
879 : 11 : poll_drain_complete(*this, info.id, actor_deadline);
880 : 11 : if (check_force(actor_deadline))
881 : 3 : break;
882 : : }
883 : : }
884 : :
885 : : // Pass 2: initiate drain for system actors (last)
886 : 6 : if (!check_force(actor_deadline)) {
887 : 12 : for (auto& info : actors) {
888 : 9 : if (!info.is_system)
889 : 8 : continue;
890 : 1 : initiate_actor_drain(*this, info.id);
891 : 1 : if (check_force(actor_deadline))
892 : 0 : break;
893 : : }
894 : : }
895 : : // Poll system actors to completion
896 : 6 : if (!check_force(actor_deadline)) {
897 : 12 : for (const auto& info : actors) {
898 : 9 : if (!info.is_system)
899 : 8 : continue;
900 : 1 : poll_drain_complete(*this, info.id, actor_deadline);
901 : 1 : if (check_force(actor_deadline))
902 : 0 : break;
903 : : }
904 : : }
905 : 6 : }
906 : :
907 : 6 : if (check_force(actor_deadline))
908 : 3 : return result<void>::make();
909 : :
910 : : // ── Phase: LeavingCluster ──────────────────────────────────────────
911 : 3 : phase = ShutdownPhase::LeavingCluster;
912 : 3 : shutdown_phase_.store(ShutdownPhase::LeavingCluster, std::memory_order_release);
913 : : auto leave_deadline =
914 : 3 : std::chrono::steady_clock::now() + opts.cluster_leave_timeout;
915 : : // (Full implementation deferred until sharding)
916 : 3 : if (check_force(leave_deadline))
917 : 0 : return result<void>::make();
918 : :
919 : : // ── Phase: FlushingTelemetry ──────────────────────────────────────
920 : 3 : phase = ShutdownPhase::FlushingTelemetry;
921 : 3 : shutdown_phase_.store(ShutdownPhase::FlushingTelemetry,
922 : : std::memory_order_release);
923 : : // Best-effort flush of logs, metrics, DLQ — no blocking
924 : :
925 : : // ── Phase: Stopped ─────────────────────────────────────────────────
926 : 3 : phase = ShutdownPhase::Stopped;
927 : 3 : shutdown_phase_.store(ShutdownPhase::Stopped, std::memory_order_release);
928 : 3 : running_.store(false, std::memory_order_release);
929 : 3 : return result<void>::make();
930 : : }
931 : :
932 : 5 : ShutdownPhase ActorSystem::shutdown_phase() const noexcept {
933 : 5 : return shutdown_phase_.load(std::memory_order_acquire);
934 : : }
935 : :
936 : 4 : bool ActorSystem::is_ready() const noexcept {
937 : 4 : return is_ready_.load(std::memory_order_acquire);
938 : : }
939 : :
940 : 1 : bool ActorSystem::is_draining() const noexcept {
941 : 1 : return shutdown_phase_.load(std::memory_order_acquire) ==
942 : 1 : ShutdownPhase::DrainingActors;
943 : : }
944 : :
945 : 0 : void ActorSystem::set_drain_config(ActorId target, DrainConfig cfg) {
946 : 0 : auto actor = get_actor(target);
947 : 0 : if (actor) {
948 : 0 : if (auto* lc = actor->as_lifecycle()) {
949 : 0 : lc->set_drain_config(cfg);
950 : : }
951 : : }
952 : 0 : }
953 : :
954 : : } // namespace hpactor
|