Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/actor/abstract_actor.hpp>
18 : : #include <hpactor/actor/typed_message.hpp>
19 : : #include <hpactor/core/actor_ref_cache.hpp>
20 : : #include <hpactor/mailbox/mailbox_policy.hpp>
21 : : #include <hpactor/net/http_types.hpp>
22 : : #include <hpactor/ref/actor_address.hpp>
23 : : #include <hpactor/ref/actor_ref.hpp>
24 : : #include <hpactor/rpc/rpc_channel.hpp>
25 : : #include <hpactor/types/types.hpp>
26 : :
27 : : #include <algorithm>
28 : : #include <chrono>
29 : : #include <cstdint>
30 : : #include <functional>
31 : : #include <memory>
32 : : #include <vector>
33 : :
34 : : namespace google {
35 : : namespace protobuf {
36 : : class Message;
37 : : } // namespace protobuf
38 : : } // namespace google
39 : :
40 : : namespace hpactor {
41 : :
42 : : // -----------------------------------------------------------------------------
43 : : // ActorContext - execution context for actors
44 : : // -----------------------------------------------------------------------------
45 : : class ActorContext {
46 : : public:
47 : : explicit ActorContext(Actor owner, ActorSystem* system = nullptr);
48 : : ~ActorContext();
49 : :
50 : : // Set the system reference (used when owner is not set)
51 : : void set_system(ActorSystem* system) {
52 : : system_ = system;
53 : : }
54 : :
55 : : // Spawn child actors
56 : : template <typename Fn, typename... Args>
57 : : Actor spawn(Fn&& fn, Args&&... args);
58 : :
59 : : template <typename T, typename... Args> T spawn(Args&&... args);
60 : :
61 : : // Send a pre-constructed TypedMessage
62 : : void send(const ActorAddress& target, TypedMessage msg);
63 : :
64 : : // Primary: send to an already-resolved ActorRef (local or remote)
65 : : void send(ActorRef& target, TypedMessage msg);
66 : :
67 : : // Send a protobuf message (serializes eagerly)
68 : : void send(const ActorAddress& target, TypeTag tag,
69 : : const google::protobuf::Message& msg);
70 : :
71 : : // Convenience: send a typed protobuf message
72 : : template <typename ProtoMsgT>
73 : : void send(const ActorAddress& target, const ProtoMsgT& msg);
74 : :
75 : : // Send with priority and deadline
76 : : void send_with_priority(const ActorAddress& target, TypedMessage msg,
77 : : uint8_t priority, int64_t deadline_ns);
78 : :
79 : : // Try-send returning an admission result (opt-in backpressure).
80 : : // Resolves the target address, stamps the sender address, and delegates
81 : : // to ActorRef::try_send(). Returns ActorNotFound if resolution fails.
82 : : mailbox::EnqueueResult try_send(const ActorAddress& target, TypedMessage msg,
83 : : mailbox::DeliveryOptions options = {});
84 : :
85 : : // Try-send with priority and deadline, returning an admission result.
86 : : // For local targets, delegates directly to
87 : : // ActorSystem::try_deliver_local() with the given priority/deadline.
88 : : // For remote targets, delegates to ActorRef::try_send().
89 : : mailbox::EnqueueResult
90 : : try_send_with_priority(const ActorAddress& target, TypedMessage msg,
91 : : uint8_t priority, int64_t deadline_ns,
92 : : mailbox::DeliveryOptions options = {});
93 : :
94 : : // Replies
95 : : void reply(TypedMessage msg);
96 : : void reply(TypeTag tag, const google::protobuf::Message& msg);
97 : : template <typename ProtoMsgT> void reply(const ProtoMsgT& msg);
98 : : void reply_with_error(const error& err);
99 : :
100 : : // Get the sender of the current message (for reply routing)
101 : : const ActorAddress& current_sender() const {
102 : : return current_sender_;
103 : : }
104 : 341 : void set_current_sender(const ActorAddress& sender) {
105 : 341 : current_sender_ = sender;
106 : 341 : }
107 : :
108 : : // Scheduled execution
109 : : AlarmHandle schedule(std::chrono::milliseconds delay, TypedMessage msg);
110 : : void cancel_schedule(AlarmHandle handle);
111 : :
112 : : // Children management
113 : : std::vector<Actor> children() const;
114 : : void add_child(Actor child);
115 : : void remove_child(Actor child);
116 : :
117 : : // Remote child management
118 : : void add_remote_child(ActorRef child);
119 : : std::vector<ActorRef> remote_children() const;
120 : :
121 : : // Link management (used by AbstractActor)
122 : : std::vector<ActorAddress> linked_actors() const;
123 : 10 : void add_linked(const ActorAddress& addr) {
124 : 10 : linked_.push_back(addr);
125 : 10 : }
126 : 7 : void remove_linked(const ActorAddress& addr) {
127 : 7 : auto it = std::find(linked_.begin(), linked_.end(), addr);
128 : 7 : if (it != linked_.end())
129 : 5 : linked_.erase(it);
130 : 7 : }
131 : :
132 : : // Monitoring
133 : : void monitor(const ActorAddress& target);
134 : :
135 : : // Monitor management (used by AbstractActor)
136 : 5 : void add_monitored(const ActorAddress& addr) {
137 : 5 : monitored_.push_back(addr);
138 : 5 : }
139 : 5 : void remove_monitored(const ActorAddress& addr) {
140 : 5 : auto it = std::find(monitored_.begin(), monitored_.end(), addr);
141 : 5 : if (it != monitored_.end())
142 : 2 : monitored_.erase(it);
143 : 5 : }
144 : 28 : const std::vector<ActorAddress>& monitored_actors() const {
145 : 28 : return monitored_;
146 : : }
147 : :
148 : : // Resolve an ActorAddress to an ActorRef (lazy + cached)
149 : : ActorRef resolve(const ActorAddress& target);
150 : :
151 : : // RPC calls (for non-actor threads only)
152 : : RpcFuture<StreamBuffer>
153 : : rpc(const ActorAddress& target, const StreamBuffer& encoded_request,
154 : : std::chrono::milliseconds timeout_ms = std::chrono::milliseconds(5000));
155 : :
156 : : // HTTP egress — async HTTP calls to external services.
157 : : // Delegates to ActorSystem's HttpClient. Returns a future for the response
158 : : // body.
159 : : RpcFuture<StreamBuffer>
160 : : http_get(const std::string& url, std::vector<net::HttpHeader> headers = {});
161 : : RpcFuture<StreamBuffer> http_post(const std::string& url, StreamBuffer body,
162 : : std::vector<net::HttpHeader> headers = {});
163 : : RpcFuture<StreamBuffer> http_put(const std::string& url, StreamBuffer body,
164 : : std::vector<net::HttpHeader> headers = {});
165 : : RpcFuture<StreamBuffer>
166 : : http_delete(const std::string& url, std::vector<net::HttpHeader> headers = {});
167 : : RpcFuture<StreamBuffer>
168 : : http_request(net::HttpMethod method, const std::string& url,
169 : : std::vector<net::HttpHeader> headers = {}, StreamBuffer body = {});
170 : :
171 : : // Current trace context for send/reply propagation
172 : 4 : bool has_current_trace_context() const noexcept {
173 : 4 : return has_current_trace_context_;
174 : : }
175 : :
176 : 1 : const TraceContext& current_trace_context() const noexcept {
177 : 1 : return current_trace_context_;
178 : : }
179 : :
180 : : class TraceScope {
181 : : public:
182 : : TraceScope(ActorContext* ctx, const TraceContext& next) noexcept;
183 : : ~TraceScope();
184 : : TraceScope(const TraceScope&) = delete;
185 : : TraceScope& operator=(const TraceScope&) = delete;
186 : :
187 : : private:
188 : : ActorContext* ctx_{nullptr};
189 : : TraceContext previous_{};
190 : : bool previous_valid_{false};
191 : : };
192 : :
193 : : // Backpressure signal handling
194 : : using BackpressureHandler =
195 : : std::function<void(const mailbox::BackpressureSignal&)>;
196 : :
197 : : void on_backpressure(BackpressureHandler handler);
198 : : void handle_backpressure(const mailbox::BackpressureSignal& signal);
199 : :
200 : : // ── Graceful actor stop ────────────────────────────────────────────────
201 : : // Initiates drain per the target actor's DrainPolicy.
202 : : // Returns immediately; the actor drains on its scheduler thread.
203 : : void stop(ActorId target);
204 : :
205 : : // Synchronous stop — blocks until target reaches kStopped or timeout.
206 : : // Returns error on timeout. Do not call from actor threads.
207 : : result<void> stop_sync(ActorId target, std::chrono::milliseconds timeout);
208 : :
209 : : private:
210 : : Actor owner_;
211 : : ActorSystem* system_ = nullptr;
212 : : std::vector<Actor> children_;
213 : : std::vector<ActorRef> remote_children_;
214 : : std::vector<ActorAddress> linked_;
215 : : std::vector<ActorAddress> monitored_;
216 : :
217 : 2 : void set_current_trace_context(const TraceContext& context) noexcept {
218 : 2 : current_trace_context_ = context;
219 : 2 : has_current_trace_context_ = context.valid();
220 : 2 : }
221 : :
222 : 2 : void clear_current_trace_context() noexcept {
223 : 2 : current_trace_context_.clear();
224 : 2 : has_current_trace_context_ = false;
225 : 2 : }
226 : :
227 : : ActorRefCache ref_cache_;
228 : : ActorAddress current_sender_;
229 : : BackpressureHandler backpressure_handler_;
230 : : TraceContext current_trace_context_;
231 : : bool has_current_trace_context_{false};
232 : : };
233 : :
234 : : } // namespace hpactor
|