Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/actor/blocking_actor.hpp>
16 : : #include <hpactor/core/actor_system.hpp>
17 : :
18 : : namespace hpactor {
19 : :
20 : 5 : BlockingActor::BlockingActor(ActorContext* ctx, ActorSystem& sys)
21 : 5 : : LocalActor(ctx, sys) {}
22 : :
23 : 0 : BlockingActor::BlockingActor(ActorId id, ActorContext* ctx, ActorSystem& sys)
24 : 0 : : LocalActor(id, ctx, sys) {}
25 : :
26 : 5 : BlockingActor::~BlockingActor() {
27 : 5 : running_.store(false, std::memory_order_release);
28 : 5 : cv_.notify_all();
29 : 5 : if (thread_.joinable()) {
30 : 4 : thread_.join();
31 : : }
32 : 5 : }
33 : :
34 : 0 : void BlockingActor::receive(TypedMessage& /*msg*/) {
35 : : // Default no-op. Subclasses override this to handle messages.
36 : 0 : }
37 : :
38 : 4 : void BlockingActor::on_activate() {
39 : 4 : LocalActor::on_activate();
40 : :
41 : : // Wire continuation callback on the mailbox so that when a message
42 : : // transitions the mailbox from empty to non-empty, the condition
43 : : // variable is signalled and the blocking thread wakes up.
44 : 4 : if (mailbox_) {
45 : 4 : mailbox_->set_continuation_callback([this]() {
46 : : {
47 : 1 : std::lock_guard<std::mutex> lock(cv_mutex_);
48 : 1 : message_arrived_.store(true, std::memory_order_release);
49 : 1 : }
50 : 1 : cv_.notify_one();
51 : 1 : });
52 : : }
53 : :
54 : 4 : running_.store(true, std::memory_order_release);
55 : 4 : thread_ = std::thread(&BlockingActor::thread_loop, this);
56 : 4 : }
57 : :
58 : 1 : void BlockingActor::on_deactivate() {
59 : 1 : running_.store(false, std::memory_order_release);
60 : 1 : cv_.notify_all();
61 : 1 : if (thread_.joinable()) {
62 : 0 : thread_.join();
63 : : }
64 : 1 : LocalActor::on_deactivate();
65 : 1 : }
66 : :
67 : 4 : void BlockingActor::thread_loop() {
68 : 4 : actor_state_.set(ActorState::kRunning);
69 : :
70 : 5 : while (running_.load(std::memory_order_acquire)) {
71 : 5 : TypedMessage msg;
72 : :
73 : : {
74 : 5 : std::unique_lock<std::mutex> lock(cv_mutex_);
75 : :
76 : : // Try non-blocking pop first
77 : 5 : if (mailbox_ && mailbox_->try_pop(msg)) {
78 : 0 : message_arrived_.store(false, std::memory_order_release);
79 : : } else {
80 : : // Block until a message arrives or we're shutting down
81 : 5 : actor_state_.set(ActorState::kIdle);
82 : 5 : cv_.wait(lock, [this]() {
83 : 19 : return message_arrived_.load(std::memory_order_acquire) ||
84 : 19 : !running_.load(std::memory_order_acquire);
85 : : });
86 : 5 : if (!running_.load(std::memory_order_acquire))
87 : 4 : break;
88 : :
89 : 1 : message_arrived_.store(false, std::memory_order_release);
90 : 1 : actor_state_.set(ActorState::kRunning);
91 : :
92 : : // Retry pop after wakeup
93 : 1 : if (mailbox_)
94 : 1 : mailbox_->try_pop(msg);
95 : : }
96 : 5 : }
97 : :
98 : : // Dispatch the message via the virtual receive()
99 : 1 : if (msg.type_id() != TypeTag::Invalid) {
100 : 1 : receive(msg);
101 : : }
102 : 5 : }
103 : :
104 : 4 : actor_state_.set(ActorState::kTerminated);
105 : 4 : }
106 : :
107 : 0 : void BlockingActor::await_all_other_actors_done() {
108 : : // Stub: in a full implementation this would poll ActorSystem for
109 : : // active actor count and block until only this actor remains.
110 : 0 : }
111 : :
112 : : } // namespace hpactor
|