Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/actor/actor_state.hpp>
18 : : #include <hpactor/actor/local_actor.hpp>
19 : : #include <hpactor/actor/typed_message.hpp>
20 : : #include <hpactor/core/mailbox.hpp>
21 : : #include <hpactor/mailbox/mpsc_actor_mailbox.hpp>
22 : : #include <hpactor/sched/dispatch_policy.hpp>
23 : : #include <hpactor/types/types.hpp>
24 : :
25 : : #include <atomic>
26 : : #include <chrono>
27 : : #include <condition_variable>
28 : : #include <mutex>
29 : : #include <thread>
30 : :
31 : : namespace hpactor {
32 : :
33 : : // -----------------------------------------------------------------------------
34 : : // BlockingActor - actor that runs in its own thread with blocking receive.
35 : : //
36 : : // Uses DedicatedThread dispatch — each BlockingActor owns a std::thread that
37 : : // blocks on a condition variable waiting for mailbox messages. When a message
38 : : // arrives, the mailbox continuation callback wakes the thread.
39 : : //
40 : : // Subclass and call receive(handlers...) from within your loop, or override
41 : : // on_activate() to do custom work in the dedicated thread.
42 : : // -----------------------------------------------------------------------------
43 : : class BlockingActor : public LocalActor {
44 : : public:
45 : : using AbstractActor::receive;
46 : :
47 : 5 : sched::DispatchPolicy dispatch_policy() const override {
48 : 5 : return sched::DispatchPolicy::DedicatedThread;
49 : : }
50 : :
51 : 4 : void set_scheduler(sched::IScheduler* sched) override {
52 : 4 : scheduler_ = sched;
53 : 4 : }
54 : 4 : void set_mailbox(mailbox::MPSCActorMailbox<TypedMessage>* mbox) override {
55 : 4 : mailbox_ = mbox;
56 : 4 : }
57 : :
58 : : // Block until a message arrives, then dispatch to matching handler.
59 : : // Handlers are called in order; the first one that accepts the message
60 : : // consumes it.
61 : : template <typename... Handlers> void receive(Handlers&&... handlers) {
62 : : TypedMessage msg;
63 : : {
64 : : std::unique_lock<std::mutex> lock(cv_mutex_);
65 : :
66 : : // Try non-blocking pop first
67 : : if (mailbox_ && mailbox_->try_pop(msg)) {
68 : : message_arrived_.store(false, std::memory_order_release);
69 : : } else {
70 : : // Block until a message arrives
71 : : actor_state_.set(ActorState::kIdle);
72 : : cv_.wait(lock, [this]() {
73 : : return message_arrived_.load(std::memory_order_acquire) ||
74 : : !running_.load(std::memory_order_acquire);
75 : : });
76 : : if (!running_.load(std::memory_order_acquire))
77 : : return;
78 : : message_arrived_.store(false, std::memory_order_release);
79 : : actor_state_.set(ActorState::kRunning);
80 : : if (mailbox_)
81 : : mailbox_->try_pop(msg);
82 : : }
83 : : }
84 : :
85 : : if (msg.type_id() != TypeTag::Invalid) {
86 : : (handlers(msg), ...);
87 : : }
88 : : }
89 : :
90 : : // Receive messages in a loop, dispatching each to one of the handlers
91 : : // provided via the iterator range.
92 : : template <typename T> void receive_for(T& begin, T end) {
93 : : while (begin != end && running_.load(std::memory_order_acquire)) {
94 : : receive(*begin++);
95 : : }
96 : : }
97 : :
98 : : // Wait for one or more actors to finish.
99 : : template <typename... Actors>
100 : : void wait_for(ActorAddr first, Actors&&... rest) {
101 : : // Stub: in a full implementation this would monitor exit events
102 : : // for the given actors and block until one finishes.
103 : : (void)first;
104 : : ((void)rest, ...);
105 : : }
106 : :
107 : : void await_all_other_actors_done();
108 : :
109 : : // Satisfy the pure virtual from AbstractActor.
110 : : // Override in subclasses to handle messages delivered by thread_loop().
111 : : void receive(TypedMessage& msg) override;
112 : :
113 : : const error& fail_state() const {
114 : : return fail_state_;
115 : : }
116 : : void fail_state(error e) {
117 : : fail_state_ = e;
118 : : }
119 : :
120 : : void on_activate() override;
121 : : void on_deactivate() override;
122 : :
123 : : protected:
124 : : BlockingActor(ActorContext* ctx, ActorSystem& sys);
125 : : BlockingActor(ActorId id, ActorContext* ctx, ActorSystem& sys);
126 : : ~BlockingActor() override;
127 : :
128 : : mailbox::MPSCActorMailbox<TypedMessage>* get_mailbox() {
129 : : return mailbox_;
130 : : }
131 : : sched::IScheduler* get_scheduler() {
132 : : return scheduler_;
133 : : }
134 : : ActorState& actor_state() {
135 : : return actor_state_;
136 : : }
137 : :
138 : : private:
139 : : void thread_loop();
140 : :
141 : : std::thread thread_;
142 : : std::mutex cv_mutex_;
143 : : std::condition_variable cv_;
144 : : std::atomic<bool> running_{false};
145 : : std::atomic<bool> message_arrived_{false};
146 : : ActorState actor_state_;
147 : : mailbox::MPSCActorMailbox<TypedMessage>* mailbox_{nullptr};
148 : : sched::IScheduler* scheduler_{nullptr};
149 : : error fail_state_;
150 : : };
151 : :
152 : : } // namespace hpactor
|