Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/actor/http_gateway_actor.hpp>
16 : : #include <hpactor/core/actor_system.hpp>
17 : :
18 : : #include <cstring>
19 : :
20 : : namespace hpactor {
21 : : namespace net {
22 : :
23 : : // =============================================================================
24 : : // ReplyAdapter — internal EventBasedActor for receiving actor replies
25 : : // =============================================================================
26 : : namespace {
27 : :
28 : : class ReplyAdapter final : public EventBasedActor {
29 : : public:
30 : : using ReplyHandler = std::function<void(TypedMessage&&)>;
31 : :
32 : 1 : ReplyAdapter(ActorContext* ctx, ActorSystem& sys, ReplyHandler handler)
33 : 1 : : EventBasedActor(ctx, sys), handler_(std::move(handler)) {
34 : 1 : become(make_behavior());
35 : 1 : }
36 : :
37 : 1 : Behavior make_behavior() override {
38 : 3 : return Behavior([this](TypedMessage& msg) {
39 : 1 : if (handler_) {
40 : 1 : handler_(std::move(msg));
41 : : }
42 : 1 : });
43 : : }
44 : :
45 : : private:
46 : : ReplyHandler handler_;
47 : : };
48 : :
49 : : } // anonymous namespace
50 : :
51 : : // =============================================================================
52 : : // HTTPGatewayActor Implementation
53 : : // =============================================================================
54 : :
55 : 1 : HTTPGatewayActor::HTTPGatewayActor(ActorContext* ctx, ActorSystem& sys,
56 : 1 : const std::string& bind_host, uint16_t port)
57 : : : ExternalMsgGatewayActor(ctx, sys),
58 : 1 : serializer_(std::make_unique<HttpSerializer>()) {
59 : 1 : auto handler = [this](TypedMessage&& msg) {
60 : 1 : std::lock_guard<std::mutex> lock(reply_queue_mutex_);
61 : 1 : reply_queue_.push(std::move(msg));
62 : 1 : };
63 : 1 : reply_adapter_ = system().spawn<ReplyAdapter>(std::move(handler));
64 : :
65 : 1 : gateway_.set_request_handler([this](HTTPConnection* conn, HttpRequest&& req) {
66 : 1 : on_request(conn, std::move(req));
67 : 1 : });
68 : 1 : gateway_.set_error_handler([this](HTTPConnection* conn, const error& err) {
69 : 0 : on_error(conn, err);
70 : 0 : });
71 : 1 : gateway_.set_max_connections(max_connections_);
72 : 1 : gateway_.set_max_request_size(max_request_size_);
73 : :
74 : 1 : gateway_.listen(port, bind_host);
75 : 1 : }
76 : :
77 : 1 : HTTPGatewayActor::~HTTPGatewayActor() {
78 : 1 : on_deactivate();
79 : 1 : }
80 : :
81 : 3 : void HTTPGatewayActor::route(HttpMethod method, std::string path_pattern,
82 : : MessageBuilder builder, int priority) {
83 : 3 : routes_.add(method, std::move(path_pattern), std::move(builder), priority);
84 : 3 : }
85 : :
86 : 0 : void HTTPGatewayActor::route(HttpMethod method, std::string path_pattern,
87 : : ActorAddr target) {
88 : 0 : auto serializer = serializer_.get();
89 : 0 : route(method, std::move(path_pattern),
90 : 0 : [target, serializer](
91 : : const HttpRequest& req) -> std::pair<ActorAddress, TypedMessage> {
92 : 0 : auto result = serializer->deserialize_request(req, TypeTag::User);
93 : 0 : if (!result.has_value()) {
94 : 0 : return {invalid_actor_addr, TypedMessage{}};
95 : : }
96 : 0 : return {target, std::move(result.value())};
97 : 0 : });
98 : 0 : }
99 : :
100 : 1 : void HTTPGatewayActor::on_daemon_start() {}
101 : :
102 : 1 : void HTTPGatewayActor::on_daemon_stop() {
103 : 1 : gateway_.stop();
104 : :
105 : : {
106 : 1 : std::lock_guard<std::mutex> lock(reply_mutex_);
107 : 1 : pending_replies_.clear();
108 : 1 : }
109 : 1 : }
110 : :
111 : 6 : bool HTTPGatewayActor::run_once() {
112 : 6 : gateway_.run_once();
113 : :
114 : : for (;;) {
115 : 7 : TypedMessage msg;
116 : : {
117 : 7 : std::lock_guard<std::mutex> lock(reply_queue_mutex_);
118 : 7 : if (reply_queue_.empty())
119 : 6 : break;
120 : 1 : msg = std::move(reply_queue_.front());
121 : 1 : reply_queue_.pop();
122 : 7 : }
123 : 1 : on_reply(std::move(msg));
124 : 8 : }
125 : :
126 : 6 : return true;
127 : : }
128 : :
129 : 1 : void HTTPGatewayActor::on_deactivate() {
130 : 1 : gateway_.stop();
131 : 1 : ExternalMsgGatewayActor::on_deactivate();
132 : 1 : }
133 : :
134 : 1 : void HTTPGatewayActor::on_request(HTTPConnection* conn,
135 : : HttpRequest&& req) { // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved)
136 : 1 : size_t qpos = req.path.find('?');
137 : 1 : if (qpos != std::string::npos) {
138 : 0 : std::string query_str = req.path.substr(qpos + 1);
139 : 0 : req.path = req.path.substr(0, qpos);
140 : 0 : size_t pos = 0;
141 : 0 : while (pos < query_str.size()) {
142 : 0 : size_t eq = query_str.find('=', pos);
143 : 0 : size_t amp = query_str.find('&', pos);
144 : 0 : if (eq != std::string::npos && (amp == std::string::npos || eq < amp)) {
145 : 0 : size_t vstart = eq + 1;
146 : 0 : size_t vend = amp == std::string::npos ? query_str.size() : amp;
147 : 0 : req.query_params[query_str.substr(pos, eq - pos)] =
148 : 0 : query_str.substr(vstart, vend - vstart);
149 : : }
150 : 0 : pos = amp == std::string::npos ? query_str.size() : amp + 1;
151 : : }
152 : 0 : }
153 : :
154 : 1 : const auto* builder = routes_.match(req.method, req.path, req);
155 : 1 : if (!builder) {
156 : 0 : StreamBuffer body;
157 : 0 : gateway_.send_response(
158 : : conn, HttpStatusCode::NotFound,
159 : : {{"Content-Type", "text/plain"}, {"Connection", "close"}},
160 : 0 : std::move(body));
161 : 0 : close_connection(conn);
162 : 0 : return;
163 : 0 : }
164 : :
165 : 1 : auto [target, msg] = (*builder)(req);
166 : 1 : if (!target) {
167 : 0 : StreamBuffer body;
168 : 0 : gateway_.send_response(
169 : : conn, HttpStatusCode::InternalError,
170 : : {{"Content-Type", "text/plain"}, {"Connection", "close"}},
171 : 0 : std::move(body));
172 : 0 : close_connection(conn);
173 : 0 : return;
174 : 0 : }
175 : :
176 : 1 : uint64_t request_id = next_request_id_++;
177 : 1 : auto pending = std::make_unique<PendingReply>();
178 : 1 : pending->request_id = request_id;
179 : 1 : pending->conn = conn;
180 : 1 : pending->enqueued_at = std::chrono::steady_clock::now();
181 : : {
182 : 1 : std::lock_guard<std::mutex> lock(reply_mutex_);
183 : 1 : pending_replies_[request_id] = std::move(pending);
184 : 1 : }
185 : :
186 : 1 : StreamBuffer correlated;
187 : 9 : for (int i = 7; i >= 0; --i) {
188 : 8 : correlated.push_back(static_cast<uint8_t>((request_id >> (i * 8)) & 0xFF));
189 : : }
190 : 1 : correlated.append(msg.payload().data(), msg.payload().size());
191 : :
192 : 1 : TypedMessage correlated_msg(msg.type_id(), correlated);
193 : 1 : correlated_msg.set_sender_address(reply_adapter_.address());
194 : :
195 : 1 : auto result = system().try_deliver_local(target.id, std::move(correlated_msg));
196 : 1 : if (!result.accepted()) {
197 : : // Clean up the pending reply entry we just created
198 : : {
199 : 0 : std::lock_guard<std::mutex> lock(reply_mutex_);
200 : 0 : auto it = pending_replies_.find(request_id);
201 : 0 : if (it != pending_replies_.end()) {
202 : : // Cancel the timeout timer for this request
203 : 0 : it->second->conn = nullptr;
204 : 0 : pending_replies_.erase(it);
205 : : }
206 : 0 : }
207 : :
208 : 0 : StreamBuffer body;
209 : 0 : const char* msg_str = "Too Many Requests";
210 : 0 : body.append(reinterpret_cast<const uint8_t*>(msg_str), strlen(msg_str));
211 : 0 : gateway_.send_response(conn, HttpStatusCode::TooManyRequests,
212 : : {{"Content-Type", "text/plain"},
213 : : {"Connection", "close"},
214 : : {"Retry-After", "5"}},
215 : 0 : std::move(body));
216 : 0 : close_connection(conn);
217 : 0 : return;
218 : 0 : }
219 : :
220 : : int timeout_ms = static_cast<int>(
221 : 1 : std::chrono::duration_cast<std::chrono::milliseconds>(reply_timeout_).count());
222 : 2 : gateway_.event_loop().run_after(
223 : 1 : [this, request_id] { on_timeout(request_id); }, timeout_ms);
224 : 1 : }
225 : :
226 : 1 : void HTTPGatewayActor::on_reply(TypedMessage&& msg) { // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved)
227 : 1 : const auto& payload = msg.payload();
228 : 1 : if (payload.size() < 8)
229 : 0 : return;
230 : :
231 : 1 : uint64_t request_id = 0;
232 : 9 : for (int i = 0; i < 8; ++i) {
233 : 8 : request_id = (request_id << 8) | payload.data()[i];
234 : : }
235 : :
236 : 1 : HTTPConnection* conn = nullptr;
237 : : {
238 : 1 : std::lock_guard<std::mutex> lock(reply_mutex_);
239 : 1 : auto it = pending_replies_.find(request_id);
240 : 1 : if (it == pending_replies_.end())
241 : 0 : return;
242 : 1 : conn = it->second->conn;
243 : 1 : pending_replies_.erase(it);
244 : 1 : }
245 : :
246 : 1 : if (!conn)
247 : 0 : return;
248 : :
249 : 1 : StreamBuffer reply_payload;
250 : 1 : reply_payload.append(payload.data() + 8, payload.size() - 8);
251 : 1 : TypedMessage reply_msg(msg.type_id(), reply_payload);
252 : :
253 : 2 : auto [body, content_type] = serializer_->serialize_response(reply_msg, "app"
254 : : "lic"
255 : : "ati"
256 : : "on/"
257 : : "jso"
258 : 3 : "n");
259 : :
260 : 4 : std::vector<HttpHeader> headers = {{"Content-Type", content_type}};
261 : :
262 : 1 : if (!conn->should_keep_alive()) {
263 : 0 : headers.push_back({"Connection", "close"});
264 : : }
265 : :
266 : 1 : gateway_.send_response(conn, HttpStatusCode::OK, std::move(headers),
267 : 1 : std::move(body));
268 : :
269 : 1 : if (!conn->should_keep_alive()) {
270 : 0 : close_connection(conn);
271 : : }
272 : 1 : }
273 : :
274 : 0 : void HTTPGatewayActor::on_error(HTTPConnection* conn, const error& err) {
275 : 0 : HttpStatusCode code = HttpStatusCode::InternalError;
276 : 0 : if (err.code() == errors::http_parse_error) {
277 : 0 : code = HttpStatusCode::BadRequest;
278 : : }
279 : :
280 : 0 : StreamBuffer body;
281 : 0 : const auto& msg = err.message();
282 : 0 : if (!msg.empty()) {
283 : 0 : body.append(reinterpret_cast<const uint8_t*>(msg.data()), msg.size());
284 : : }
285 : :
286 : 0 : gateway_.send_response(
287 : : conn, code, {{"Content-Type", "text/plain"}, {"Connection", "close"}},
288 : 0 : std::move(body));
289 : 0 : close_connection(conn);
290 : 0 : }
291 : :
292 : 0 : void HTTPGatewayActor::on_timeout(uint64_t request_id) {
293 : 0 : HTTPConnection* conn = nullptr;
294 : : {
295 : 0 : std::lock_guard<std::mutex> lock(reply_mutex_);
296 : 0 : auto it = pending_replies_.find(request_id);
297 : 0 : if (it == pending_replies_.end())
298 : 0 : return;
299 : 0 : conn = it->second->conn;
300 : 0 : pending_replies_.erase(it);
301 : 0 : }
302 : :
303 : 0 : if (conn) {
304 : 0 : StreamBuffer body;
305 : 0 : const char* msg_str = "Upstream actor did not respond in time";
306 : 0 : body.append(reinterpret_cast<const uint8_t*>(msg_str), strlen(msg_str));
307 : 0 : gateway_.send_response(
308 : : conn, HttpStatusCode::GatewayTimeout,
309 : : {{"Content-Type", "text/plain"}, {"Connection", "close"}},
310 : 0 : std::move(body));
311 : 0 : close_connection(conn);
312 : 0 : }
313 : : }
314 : :
315 : 0 : void HTTPGatewayActor::close_connection(HTTPConnection* conn) {
316 : : {
317 : 0 : std::lock_guard<std::mutex> lock(reply_mutex_);
318 : 0 : for (auto it = pending_replies_.begin(); it != pending_replies_.end();) {
319 : 0 : if (it->second->conn == conn) {
320 : 0 : it = pending_replies_.erase(it);
321 : : } else {
322 : 0 : ++it;
323 : : }
324 : : }
325 : 0 : }
326 : :
327 : 0 : gateway_.close_connection(conn);
328 : 0 : }
329 : :
330 : : } // namespace net
331 : : } // namespace hpactor
|