LCOV - code coverage report
Current view: top level - src/net - wireframe_connection.cpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 54.1 % 159 86
Test Date: 2026-05-20 02:24:49 Functions: 80.0 % 20 16
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : //
       3                 :             : // Licensed under the Apache License, Version 2.0 (the "License");
       4                 :             : // you may not use this file except in compliance with the License.
       5                 :             : // You may obtain a copy of the License at
       6                 :             : //
       7                 :             : //     http://www.apache.org/licenses/LICENSE-2.0
       8                 :             : //
       9                 :             : // Unless required by applicable law or agreed to in writing, software
      10                 :             : // distributed under the License is distributed on an "AS IS" BASIS,
      11                 :             : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      12                 :             : // See the License for the specific language governing permissions and
      13                 :             : // limitations under the License.
      14                 :             : 
      15                 :             : #include <hpactor/net/wireframe_connection.hpp>
      16                 :             : 
      17                 :             : #include <hpactor/log/logger.hpp>
      18                 :             : #include <hpactor/net/event_loop.hpp>
      19                 :             : 
      20                 :             : #include <cstring>
      21                 :             : #include <sys/socket.h>
      22                 :             : #include <unistd.h>
      23                 :             : 
      24                 :             : namespace hpactor {
      25                 :             : 
      26                 :             : namespace net {
      27                 :             : 
      28                 :          23 : WireFrameConnection::WireFrameConnection(int fd, EndPoint local_endpoint,
      29                 :          23 :                                          EndPoint remote_endpoint, EventLoop* loop)
      30                 :          23 :     : Connection(fd, local_endpoint, remote_endpoint, loop), is_sending_(false) {
      31                 :          23 :     write_buffer_.reserve(kWriteChunkSize);
      32                 :          23 : }
      33                 :             : 
      34                 :          46 : WireFrameConnection::~WireFrameConnection() {
      35                 :          23 :     close();
      36                 :          46 : }
      37                 :             : 
      38                 :             : WireFrameConnectionPtr
      39                 :           3 : WireFrameConnection::create_as_client(int fd, EndPoint local_endpoint,
      40                 :             :                                       EndPoint remote_endpoint, EventLoop* loop) {
      41                 :             :     auto conn = std::shared_ptr<WireFrameConnection>(
      42                 :           3 :         new WireFrameConnection(fd, local_endpoint, remote_endpoint, loop));
      43                 :           3 :     conn->set_state(ConnectionState::Connected);
      44                 :             : 
      45                 :           3 :     HPACTOR_LOG_DEBUG(log::LogCategory::kNetwork, ActorId{0}, 0,
      46                 :             :                       "connection opened");
      47                 :             : 
      48                 :           3 :     if (loop && fd >= 0) {
      49                 :           3 :         loop->add_fd(fd, EventLoop::Event::Read);
      50                 :           3 :         if (loop->supports_read_handler()) {
      51                 :           3 :             std::weak_ptr<WireFrameConnection> weak_conn = conn;
      52                 :           3 :             loop->set_read_handler(fd, [weak_conn](int /*event_fd*/) {
      53                 :           0 :                 if (auto self = weak_conn.lock()) {
      54                 :           0 :                     self->handle_read();
      55                 :           0 :                 }
      56                 :           0 :             });
      57                 :           3 :         }
      58                 :             :     }
      59                 :             : 
      60                 :           3 :     return conn;
      61                 :             : }
      62                 :             : 
      63                 :             : WireFrameConnectionPtr
      64                 :          18 : WireFrameConnection::create_connecting_client(int fd, EndPoint local_endpoint,
      65                 :             :                                               EndPoint remote_endpoint,
      66                 :             :                                               EventLoop* loop) {
      67                 :             :     auto conn = std::shared_ptr<WireFrameConnection>(
      68                 :          18 :         new WireFrameConnection(fd, local_endpoint, remote_endpoint, loop));
      69                 :          18 :     conn->set_state(ConnectionState::Connecting);
      70                 :             :     // Event loop registration deferred — caller must verify connect and call
      71                 :             :     // setup_after_connect().
      72                 :          18 :     return conn;
      73                 :             : }
      74                 :             : 
      75                 :           0 : void WireFrameConnection::setup_after_connect(WireFrameConnectionPtr conn) {
      76                 :           0 :     conn->set_state(ConnectionState::Connected);
      77                 :             : 
      78                 :           0 :     HPACTOR_LOG_DEBUG(log::LogCategory::kNetwork, ActorId{0}, 0,
      79                 :             :                       "connection opened");
      80                 :             : 
      81                 :           0 :     auto* loop = conn->event_loop();
      82                 :           0 :     int fd = conn->fd();
      83                 :           0 :     if (loop && fd >= 0) {
      84                 :           0 :         loop->add_fd(fd, EventLoop::Event::Read);
      85                 :           0 :         if (loop->supports_read_handler()) {
      86                 :           0 :             std::weak_ptr<WireFrameConnection> weak_conn = conn;
      87                 :           0 :             loop->set_read_handler(fd, [weak_conn](int /*event_fd*/) {
      88                 :           0 :                 if (auto self = weak_conn.lock()) {
      89                 :           0 :                     self->handle_read();
      90                 :           0 :                 }
      91                 :           0 :             });
      92                 :           0 :         }
      93                 :             :     }
      94                 :           0 : }
      95                 :             : 
      96                 :             : WireFrameConnectionPtr
      97                 :           2 : WireFrameConnection::create_as_server(int fd, EndPoint local_endpoint,
      98                 :             :                                       EndPoint remote_endpoint, EventLoop* loop) {
      99                 :             :     auto conn = std::shared_ptr<WireFrameConnection>(
     100                 :           2 :         new WireFrameConnection(fd, local_endpoint, remote_endpoint, loop));
     101                 :           2 :     conn->set_state(ConnectionState::Connected);
     102                 :             : 
     103                 :           2 :     HPACTOR_LOG_DEBUG(log::LogCategory::kNetwork, ActorId{0}, 0,
     104                 :             :                       "connection opened");
     105                 :             : 
     106                 :           2 :     if (loop && fd >= 0) {
     107                 :           2 :         loop->add_fd(fd, EventLoop::Event::Read);
     108                 :           2 :         if (loop->supports_read_handler()) {
     109                 :           2 :             std::weak_ptr<WireFrameConnection> weak_conn = conn;
     110                 :           2 :             loop->set_read_handler(fd, [weak_conn](int /*event_fd*/) {
     111                 :           1 :                 if (auto self = weak_conn.lock()) {
     112                 :           1 :                     self->handle_read();
     113                 :           1 :                 }
     114                 :           1 :             });
     115                 :           2 :         }
     116                 :             :     }
     117                 :             : 
     118                 :           2 :     return conn;
     119                 :             : }
     120                 :             : 
     121                 :          18 : void WireFrameConnection::set_ready_handler(std::function<void(ConnectionPtr)> handler) {
     122                 :          18 :     ready_handler_ = std::move(handler);
     123                 :          18 : }
     124                 :             : 
     125                 :          18 : void WireFrameConnection::set_frame_handler(frame_handler handler) {
     126                 :          18 :     frame_handler_ = std::move(handler);
     127                 :          18 : }
     128                 :             : 
     129                 :          19 : void WireFrameConnection::set_error_handler(
     130                 :             :     std::function<void(ConnectionPtr, const error&)> handler) {
     131                 :          19 :     error_handler_ = std::move(handler);
     132                 :          19 : }
     133                 :             : 
     134                 :           2 : void WireFrameConnection::set_send_completion_handler(
     135                 :             :     std::function<void(int result)> handler) {
     136                 :           2 :     send_completion_handler_ = std::move(handler);
     137                 :           2 : }
     138                 :             : 
     139                 :          11 : void WireFrameConnection::send(const StreamBuffer& frame_data) {
     140                 :          11 :     if (state_ != ConnectionState::Connected) {
     141                 :           0 :         return;
     142                 :             :     }
     143                 :             : 
     144                 :          11 :     HPACTOR_LOG_TRACE(
     145                 :             :         log::LogCategory::kNetwork, ActorId{0}, 0, "network frame sent",
     146                 :             :         log::field("bytes", static_cast<uint64_t>(frame_data.size())));
     147                 :             : 
     148                 :          11 :     send_raw(frame_data);
     149                 :             : }
     150                 :             : 
     151                 :          43 : void WireFrameConnection::close() {
     152                 :          43 :     if (fd_ >= 0) {
     153                 :          23 :         if (loop_) {
     154                 :          23 :             loop_->clear_read_handler(fd_);
     155                 :          23 :             loop_->remove_fd(fd_);
     156                 :             :         }
     157                 :          23 :         ::close(fd_);
     158                 :          23 :         fd_ = -1;
     159                 :             :     }
     160                 :          43 :     set_state(ConnectionState::Disconnected);
     161                 :          43 :     HPACTOR_LOG_DEBUG(log::LogCategory::kNetwork, ActorId{0}, 0,
     162                 :             :                       "connection closed");
     163                 :          43 : }
     164                 :             : 
     165                 :           1 : void WireFrameConnection::handle_read() {
     166                 :             :     // Phase 1 — accumulate header bytes (8 bytes: magic + payload length).
     167                 :             :     // Read header first so we know the exact payload length before allocating
     168                 :             :     // buffer space for it.
     169                 :           2 :     while (read_buffer_.size() < WireFrame::HeaderSize) {
     170                 :           2 :         size_t remaining = WireFrame::HeaderSize - read_buffer_.size();
     171                 :           2 :         uint8_t* area = read_buffer_.reserve_tail(remaining);
     172                 :           2 :         ssize_t n = ::read(fd_, area, remaining);
     173                 :           2 :         if (n > 0) {
     174                 :           1 :             read_buffer_.commit_tail(static_cast<size_t>(n));
     175                 :           1 :         } else if (n == 0) {
     176                 :           1 :             read_buffer_.clear(); // EOF — discard incomplete header
     177                 :           1 :             return;
     178                 :             :         } else {
     179                 :           0 :             if (errno == EAGAIN || errno == EWOULDBLOCK)
     180                 :           0 :                 return;
     181                 :           0 :             read_buffer_.clear(); // Error — discard partial data
     182                 :           0 :             return;
     183                 :             :         }
     184                 :             :     }
     185                 :             : 
     186                 :             :     // Validate magic "HPAC"
     187                 :           0 :     if (read_buffer_[0] != 'H' || read_buffer_[1] != 'P' ||
     188                 :           0 :         read_buffer_[2] != 'A' || read_buffer_[3] != 'C') {
     189                 :             :         // Invalid magic — skip one byte and retry (re-sync stream)
     190                 :           0 :         read_buffer_.consume(1);
     191                 :           0 :         handle_read(); // Recurse to re-enter header-read phase
     192                 :           0 :         return;
     193                 :             :     }
     194                 :             : 
     195                 :             :     // Parse remaining length (big-endian uint32_t from bytes 4-7)
     196                 :           0 :     size_t payload_len = (static_cast<size_t>(read_buffer_[4]) << 24) |
     197                 :           0 :                          (static_cast<size_t>(read_buffer_[5]) << 16) |
     198                 :           0 :                          (static_cast<size_t>(read_buffer_[6]) << 8) |
     199                 :           0 :                          static_cast<size_t>(read_buffer_[7]);
     200                 :             : 
     201                 :           0 :     size_t total_frame_size = WireFrame::HeaderSize + payload_len;
     202                 :             : 
     203                 :             :     // Phase 2 — accumulate exactly payload_len bytes.
     204                 :             :     // Reserve only the remaining bytes needed, not a fixed chunk.
     205                 :           0 :     while (read_buffer_.size() < total_frame_size) {
     206                 :           0 :         size_t remaining = total_frame_size - read_buffer_.size();
     207                 :           0 :         uint8_t* area = read_buffer_.reserve_tail(remaining);
     208                 :           0 :         ssize_t n = ::read(fd_, area, remaining);
     209                 :           0 :         if (n > 0) {
     210                 :           0 :             read_buffer_.commit_tail(static_cast<size_t>(n));
     211                 :           0 :         } else if (n == 0) {
     212                 :           0 :             read_buffer_.clear(); // EOF mid-frame — discard incomplete frame
     213                 :           0 :             return;
     214                 :             :         } else {
     215                 :           0 :             if (errno == EAGAIN || errno == EWOULDBLOCK)
     216                 :           0 :                 return;
     217                 :           0 :             read_buffer_.clear(); // Error — discard partial data
     218                 :           0 :             return;
     219                 :             :         }
     220                 :             :     }
     221                 :             : 
     222                 :             :     // Complete frame received — extract into StreamBuffer.
     223                 :             :     StreamBuffer frame_data(read_buffer_.begin(),
     224                 :           0 :                             read_buffer_.begin() + total_frame_size);
     225                 :           0 :     read_buffer_.consume(total_frame_size);
     226                 :             : 
     227                 :           0 :     if (frame_handler_) {
     228                 :           0 :         frame_handler_(std::move(frame_data));
     229                 :             :     }
     230                 :             : 
     231                 :             :     // Process any additional frames already in the buffer.
     232                 :           0 :     if (read_buffer_.size() >= WireFrame::HeaderSize) {
     233                 :           0 :         handle_read();
     234                 :             :     }
     235                 :           0 : }
     236                 :             : 
     237                 :          11 : void WireFrameConnection::send_raw(const StreamBuffer& data) {
     238                 :          11 :     if (fd_ < 0 || !loop_)
     239                 :           0 :         return;
     240                 :             : 
     241                 :          11 :     write_buffer_.append(data.data(), data.size());
     242                 :             : 
     243                 :          11 :     if (is_sending_)
     244                 :           9 :         return;
     245                 :             : 
     246                 :           2 :     flush_write_buffer();
     247                 :             : }
     248                 :             : 
     249                 :           2 : void WireFrameConnection::flush_write_buffer() {
     250                 :           2 :     if (fd_ < 0 || loop_ == nullptr || write_buffer_.empty()) {
     251                 :           0 :         return;
     252                 :             :     }
     253                 :             : 
     254                 :           2 :     is_sending_ = true;
     255                 :             : 
     256                 :             :     struct iovec iov;
     257                 :           2 :     iov.iov_base = write_buffer_.data();
     258                 :           2 :     iov.iov_len = write_buffer_.size();
     259                 :             : 
     260                 :           2 :     loop_->backend()->async_send(fd_, &iov, 1, ActorId(0),
     261                 :             :                                  static_cast<uint32_t>(OpType::Send));
     262                 :             : }
     263                 :             : 
     264                 :           0 : void WireFrameConnection::handle_send_completion(int result) {
     265                 :           0 :     if (send_completion_handler_) {
     266                 :           0 :         send_completion_handler_(result);
     267                 :             :     }
     268                 :           0 :     is_sending_ = false;
     269                 :             : 
     270                 :           0 :     if (result < 0) {
     271                 :           0 :         set_state(ConnectionState::Error);
     272                 :           0 :         if (error_handler_) {
     273                 :           0 :             error_handler_(
     274                 :           0 :                 std::enable_shared_from_this<WireFrameConnection>::shared_from_this(),
     275                 :           0 :                 error{});
     276                 :             :         }
     277                 :           0 :         return;
     278                 :             :     }
     279                 :             : 
     280                 :           0 :     if (static_cast<size_t>(result) >= write_buffer_.size()) {
     281                 :           0 :         write_buffer_.clear();
     282                 :             :     } else {
     283                 :           0 :         write_buffer_.consume(static_cast<size_t>(result));
     284                 :             :     }
     285                 :             : 
     286                 :           0 :     if (!write_buffer_.empty()) {
     287                 :           0 :         flush_write_buffer();
     288                 :             :     }
     289                 :             : }
     290                 :             : 
     291                 :             : } // namespace net
     292                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1