LCOV - code coverage report
Current view: top level - src/actor - http_gateway_actor.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 53.0 % 200 106
Test Date: 2026-05-20 02:24:49 Functions: 63.6 % 22 14
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/actor/http_gateway_actor.hpp>
      16                 :             : #include <hpactor/core/actor_system.hpp>
      17                 :             : 
      18                 :             : #include <cstring>
      19                 :             : 
      20                 :             : namespace hpactor {
      21                 :             : namespace net {
      22                 :             : 
      23                 :             : // =============================================================================
      24                 :             : // ReplyAdapter — internal EventBasedActor for receiving actor replies
      25                 :             : // =============================================================================
      26                 :             : namespace {
      27                 :             : 
      28                 :             : class ReplyAdapter final : public EventBasedActor {
      29                 :             :   public:
      30                 :             :     using ReplyHandler = std::function<void(TypedMessage&&)>;
      31                 :             : 
      32                 :           1 :     ReplyAdapter(ActorContext* ctx, ActorSystem& sys, ReplyHandler handler)
      33                 :           1 :         : EventBasedActor(ctx, sys), handler_(std::move(handler)) {
      34                 :           1 :         become(make_behavior());
      35                 :           1 :     }
      36                 :             : 
      37                 :           1 :     Behavior make_behavior() override {
      38                 :           3 :         return Behavior([this](TypedMessage& msg) {
      39                 :           1 :             if (handler_) {
      40                 :           1 :                 handler_(std::move(msg));
      41                 :             :             }
      42                 :           1 :         });
      43                 :             :     }
      44                 :             : 
      45                 :             :   private:
      46                 :             :     ReplyHandler handler_;
      47                 :             : };
      48                 :             : 
      49                 :             : } // anonymous namespace
      50                 :             : 
      51                 :             : // =============================================================================
      52                 :             : // HTTPGatewayActor Implementation
      53                 :             : // =============================================================================
      54                 :             : 
      55                 :           1 : HTTPGatewayActor::HTTPGatewayActor(ActorContext* ctx, ActorSystem& sys,
      56                 :           1 :                                    const std::string& bind_host, uint16_t port)
      57                 :             :     : ExternalMsgGatewayActor(ctx, sys),
      58                 :           1 :       serializer_(std::make_unique<HttpSerializer>()) {
      59                 :           1 :     auto handler = [this](TypedMessage&& msg) {
      60                 :           1 :         std::lock_guard<std::mutex> lock(reply_queue_mutex_);
      61                 :           1 :         reply_queue_.push(std::move(msg));
      62                 :           1 :     };
      63                 :           1 :     reply_adapter_ = system().spawn<ReplyAdapter>(std::move(handler));
      64                 :             : 
      65                 :           1 :     gateway_.set_request_handler([this](HTTPConnection* conn, HttpRequest&& req) {
      66                 :           1 :         on_request(conn, std::move(req));
      67                 :           1 :     });
      68                 :           1 :     gateway_.set_error_handler([this](HTTPConnection* conn, const error& err) {
      69                 :           0 :         on_error(conn, err);
      70                 :           0 :     });
      71                 :           1 :     gateway_.set_max_connections(max_connections_);
      72                 :           1 :     gateway_.set_max_request_size(max_request_size_);
      73                 :             : 
      74                 :           1 :     gateway_.listen(port, bind_host);
      75                 :           1 : }
      76                 :             : 
      77                 :           1 : HTTPGatewayActor::~HTTPGatewayActor() {
      78                 :           1 :     on_deactivate();
      79                 :           1 : }
      80                 :             : 
      81                 :           3 : void HTTPGatewayActor::route(HttpMethod method, std::string path_pattern,
      82                 :             :                              MessageBuilder builder, int priority) {
      83                 :           3 :     routes_.add(method, std::move(path_pattern), std::move(builder), priority);
      84                 :           3 : }
      85                 :             : 
      86                 :           0 : void HTTPGatewayActor::route(HttpMethod method, std::string path_pattern,
      87                 :             :                              ActorAddr target) {
      88                 :           0 :     auto serializer = serializer_.get();
      89                 :           0 :     route(method, std::move(path_pattern),
      90                 :           0 :           [target, serializer](
      91                 :             :               const HttpRequest& req) -> std::pair<ActorAddress, TypedMessage> {
      92                 :           0 :               auto result = serializer->deserialize_request(req, TypeTag::User);
      93                 :           0 :               if (!result.has_value()) {
      94                 :           0 :                   return {invalid_actor_addr, TypedMessage{}};
      95                 :             :               }
      96                 :           0 :               return {target, std::move(result.value())};
      97                 :           0 :           });
      98                 :           0 : }
      99                 :             : 
     100                 :           1 : void HTTPGatewayActor::on_daemon_start() {}
     101                 :             : 
     102                 :           1 : void HTTPGatewayActor::on_daemon_stop() {
     103                 :           1 :     gateway_.stop();
     104                 :             : 
     105                 :             :     {
     106                 :           1 :         std::lock_guard<std::mutex> lock(reply_mutex_);
     107                 :           1 :         pending_replies_.clear();
     108                 :           1 :     }
     109                 :           1 : }
     110                 :             : 
     111                 :           6 : bool HTTPGatewayActor::run_once() {
     112                 :           6 :     gateway_.run_once();
     113                 :             : 
     114                 :             :     for (;;) {
     115                 :           7 :         TypedMessage msg;
     116                 :             :         {
     117                 :           7 :             std::lock_guard<std::mutex> lock(reply_queue_mutex_);
     118                 :           7 :             if (reply_queue_.empty())
     119                 :           6 :                 break;
     120                 :           1 :             msg = std::move(reply_queue_.front());
     121                 :           1 :             reply_queue_.pop();
     122                 :           7 :         }
     123                 :           1 :         on_reply(std::move(msg));
     124                 :           8 :     }
     125                 :             : 
     126                 :           6 :     return true;
     127                 :             : }
     128                 :             : 
     129                 :           1 : void HTTPGatewayActor::on_deactivate() {
     130                 :           1 :     gateway_.stop();
     131                 :           1 :     ExternalMsgGatewayActor::on_deactivate();
     132                 :           1 : }
     133                 :             : 
     134                 :           1 : void HTTPGatewayActor::on_request(HTTPConnection* conn,
     135                 :             :                                   HttpRequest&& req) { // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved)
     136                 :           1 :     size_t qpos = req.path.find('?');
     137                 :           1 :     if (qpos != std::string::npos) {
     138                 :           0 :         std::string query_str = req.path.substr(qpos + 1);
     139                 :           0 :         req.path = req.path.substr(0, qpos);
     140                 :           0 :         size_t pos = 0;
     141                 :           0 :         while (pos < query_str.size()) {
     142                 :           0 :             size_t eq = query_str.find('=', pos);
     143                 :           0 :             size_t amp = query_str.find('&', pos);
     144                 :           0 :             if (eq != std::string::npos && (amp == std::string::npos || eq < amp)) {
     145                 :           0 :                 size_t vstart = eq + 1;
     146                 :           0 :                 size_t vend = amp == std::string::npos ? query_str.size() : amp;
     147                 :           0 :                 req.query_params[query_str.substr(pos, eq - pos)] =
     148                 :           0 :                     query_str.substr(vstart, vend - vstart);
     149                 :             :             }
     150                 :           0 :             pos = amp == std::string::npos ? query_str.size() : amp + 1;
     151                 :             :         }
     152                 :           0 :     }
     153                 :             : 
     154                 :           1 :     const auto* builder = routes_.match(req.method, req.path, req);
     155                 :           1 :     if (!builder) {
     156                 :           0 :         StreamBuffer body;
     157                 :           0 :         gateway_.send_response(
     158                 :             :             conn, HttpStatusCode::NotFound,
     159                 :             :             {{"Content-Type", "text/plain"}, {"Connection", "close"}},
     160                 :           0 :             std::move(body));
     161                 :           0 :         close_connection(conn);
     162                 :           0 :         return;
     163                 :           0 :     }
     164                 :             : 
     165                 :           1 :     auto [target, msg] = (*builder)(req);
     166                 :           1 :     if (!target) {
     167                 :           0 :         StreamBuffer body;
     168                 :           0 :         gateway_.send_response(
     169                 :             :             conn, HttpStatusCode::InternalError,
     170                 :             :             {{"Content-Type", "text/plain"}, {"Connection", "close"}},
     171                 :           0 :             std::move(body));
     172                 :           0 :         close_connection(conn);
     173                 :           0 :         return;
     174                 :           0 :     }
     175                 :             : 
     176                 :           1 :     uint64_t request_id = next_request_id_++;
     177                 :           1 :     auto pending = std::make_unique<PendingReply>();
     178                 :           1 :     pending->request_id = request_id;
     179                 :           1 :     pending->conn = conn;
     180                 :           1 :     pending->enqueued_at = std::chrono::steady_clock::now();
     181                 :             :     {
     182                 :           1 :         std::lock_guard<std::mutex> lock(reply_mutex_);
     183                 :           1 :         pending_replies_[request_id] = std::move(pending);
     184                 :           1 :     }
     185                 :             : 
     186                 :           1 :     StreamBuffer correlated;
     187                 :           9 :     for (int i = 7; i >= 0; --i) {
     188                 :           8 :         correlated.push_back(static_cast<uint8_t>((request_id >> (i * 8)) & 0xFF));
     189                 :             :     }
     190                 :           1 :     correlated.append(msg.payload().data(), msg.payload().size());
     191                 :             : 
     192                 :           1 :     TypedMessage correlated_msg(msg.type_id(), correlated);
     193                 :           1 :     correlated_msg.set_sender_address(reply_adapter_.address());
     194                 :             : 
     195                 :           1 :     auto result = system().try_deliver_local(target.id, std::move(correlated_msg));
     196                 :           1 :     if (!result.accepted()) {
     197                 :             :         // Clean up the pending reply entry we just created
     198                 :             :         {
     199                 :           0 :             std::lock_guard<std::mutex> lock(reply_mutex_);
     200                 :           0 :             auto it = pending_replies_.find(request_id);
     201                 :           0 :             if (it != pending_replies_.end()) {
     202                 :             :                 // Cancel the timeout timer for this request
     203                 :           0 :                 it->second->conn = nullptr;
     204                 :           0 :                 pending_replies_.erase(it);
     205                 :             :             }
     206                 :           0 :         }
     207                 :             : 
     208                 :           0 :         StreamBuffer body;
     209                 :           0 :         const char* msg_str = "Too Many Requests";
     210                 :           0 :         body.append(reinterpret_cast<const uint8_t*>(msg_str), strlen(msg_str));
     211                 :           0 :         gateway_.send_response(conn, HttpStatusCode::TooManyRequests,
     212                 :             :                                {{"Content-Type", "text/plain"},
     213                 :             :                                 {"Connection", "close"},
     214                 :             :                                 {"Retry-After", "5"}},
     215                 :           0 :                                std::move(body));
     216                 :           0 :         close_connection(conn);
     217                 :           0 :         return;
     218                 :           0 :     }
     219                 :             : 
     220                 :             :     int timeout_ms = static_cast<int>(
     221                 :           1 :         std::chrono::duration_cast<std::chrono::milliseconds>(reply_timeout_).count());
     222                 :           2 :     gateway_.event_loop().run_after(
     223                 :           1 :         [this, request_id] { on_timeout(request_id); }, timeout_ms);
     224                 :           1 : }
     225                 :             : 
     226                 :           1 : void HTTPGatewayActor::on_reply(TypedMessage&& msg) { // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved)
     227                 :           1 :     const auto& payload = msg.payload();
     228                 :           1 :     if (payload.size() < 8)
     229                 :           0 :         return;
     230                 :             : 
     231                 :           1 :     uint64_t request_id = 0;
     232                 :           9 :     for (int i = 0; i < 8; ++i) {
     233                 :           8 :         request_id = (request_id << 8) | payload.data()[i];
     234                 :             :     }
     235                 :             : 
     236                 :           1 :     HTTPConnection* conn = nullptr;
     237                 :             :     {
     238                 :           1 :         std::lock_guard<std::mutex> lock(reply_mutex_);
     239                 :           1 :         auto it = pending_replies_.find(request_id);
     240                 :           1 :         if (it == pending_replies_.end())
     241                 :           0 :             return;
     242                 :           1 :         conn = it->second->conn;
     243                 :           1 :         pending_replies_.erase(it);
     244                 :           1 :     }
     245                 :             : 
     246                 :           1 :     if (!conn)
     247                 :           0 :         return;
     248                 :             : 
     249                 :           1 :     StreamBuffer reply_payload;
     250                 :           1 :     reply_payload.append(payload.data() + 8, payload.size() - 8);
     251                 :           1 :     TypedMessage reply_msg(msg.type_id(), reply_payload);
     252                 :             : 
     253                 :           2 :     auto [body, content_type] = serializer_->serialize_response(reply_msg, "app"
     254                 :             :                                                                            "lic"
     255                 :             :                                                                            "ati"
     256                 :             :                                                                            "on/"
     257                 :             :                                                                            "jso"
     258                 :           3 :                                                                            "n");
     259                 :             : 
     260                 :           4 :     std::vector<HttpHeader> headers = {{"Content-Type", content_type}};
     261                 :             : 
     262                 :           1 :     if (!conn->should_keep_alive()) {
     263                 :           0 :         headers.push_back({"Connection", "close"});
     264                 :             :     }
     265                 :             : 
     266                 :           1 :     gateway_.send_response(conn, HttpStatusCode::OK, std::move(headers),
     267                 :           1 :                            std::move(body));
     268                 :             : 
     269                 :           1 :     if (!conn->should_keep_alive()) {
     270                 :           0 :         close_connection(conn);
     271                 :             :     }
     272                 :           1 : }
     273                 :             : 
     274                 :           0 : void HTTPGatewayActor::on_error(HTTPConnection* conn, const error& err) {
     275                 :           0 :     HttpStatusCode code = HttpStatusCode::InternalError;
     276                 :           0 :     if (err.code() == errors::http_parse_error) {
     277                 :           0 :         code = HttpStatusCode::BadRequest;
     278                 :             :     }
     279                 :             : 
     280                 :           0 :     StreamBuffer body;
     281                 :           0 :     const auto& msg = err.message();
     282                 :           0 :     if (!msg.empty()) {
     283                 :           0 :         body.append(reinterpret_cast<const uint8_t*>(msg.data()), msg.size());
     284                 :             :     }
     285                 :             : 
     286                 :           0 :     gateway_.send_response(
     287                 :             :         conn, code, {{"Content-Type", "text/plain"}, {"Connection", "close"}},
     288                 :           0 :         std::move(body));
     289                 :           0 :     close_connection(conn);
     290                 :           0 : }
     291                 :             : 
     292                 :           0 : void HTTPGatewayActor::on_timeout(uint64_t request_id) {
     293                 :           0 :     HTTPConnection* conn = nullptr;
     294                 :             :     {
     295                 :           0 :         std::lock_guard<std::mutex> lock(reply_mutex_);
     296                 :           0 :         auto it = pending_replies_.find(request_id);
     297                 :           0 :         if (it == pending_replies_.end())
     298                 :           0 :             return;
     299                 :           0 :         conn = it->second->conn;
     300                 :           0 :         pending_replies_.erase(it);
     301                 :           0 :     }
     302                 :             : 
     303                 :           0 :     if (conn) {
     304                 :           0 :         StreamBuffer body;
     305                 :           0 :         const char* msg_str = "Upstream actor did not respond in time";
     306                 :           0 :         body.append(reinterpret_cast<const uint8_t*>(msg_str), strlen(msg_str));
     307                 :           0 :         gateway_.send_response(
     308                 :             :             conn, HttpStatusCode::GatewayTimeout,
     309                 :             :             {{"Content-Type", "text/plain"}, {"Connection", "close"}},
     310                 :           0 :             std::move(body));
     311                 :           0 :         close_connection(conn);
     312                 :           0 :     }
     313                 :             : }
     314                 :             : 
     315                 :           0 : void HTTPGatewayActor::close_connection(HTTPConnection* conn) {
     316                 :             :     {
     317                 :           0 :         std::lock_guard<std::mutex> lock(reply_mutex_);
     318                 :           0 :         for (auto it = pending_replies_.begin(); it != pending_replies_.end();) {
     319                 :           0 :             if (it->second->conn == conn) {
     320                 :           0 :                 it = pending_replies_.erase(it);
     321                 :             :             } else {
     322                 :           0 :                 ++it;
     323                 :             :             }
     324                 :             :         }
     325                 :           0 :     }
     326                 :             : 
     327                 :           0 :     gateway_.close_connection(conn);
     328                 :           0 : }
     329                 :             : 
     330                 :             : } // namespace net
     331                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1