LCOV - code coverage report
Current view: top level - include/hpactor/net - reactor_dispatcher.hpp (source / functions) Coverage Total Hit
Test: HPActor Coverage Lines: 73.0 % 204 149
Test Date: 2026-05-20 02:24:49 Functions: 87.5 % 16 14
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: - 0 0

             Branch data     Line data    Source code
       1                 :             : // Copyright 2026 HPActor Contributors
       2                 :             : #pragma once
       3                 :             : 
       4                 :             : #include <hpactor/net/async_io_fwd.hpp>
       5                 :             : #include <hpactor/net/reactor_backend.hpp>
       6                 :             : #include <hpactor/types/types.hpp>
       7                 :             : 
       8                 :             : #include <cerrno>
       9                 :             : #include <cstring>
      10                 :             : #include <fcntl.h>
      11                 :             : #include <functional>
      12                 :             : #include <sys/socket.h>
      13                 :             : #include <sys/uio.h>
      14                 :             : #include <unistd.h>
      15                 :             : #include <unordered_map>
      16                 :             : #include <vector>
      17                 :             : 
      18                 :             : namespace hpactor {
      19                 :             : 
      20                 :             : class ActorSystem;
      21                 :             : 
      22                 :             : namespace net {
      23                 :             : 
      24                 :             : // ReactorDispatcher - dispatches reactor (sync I/O) readiness events
      25                 :             : // to the appropriate actor mailbox.
      26                 :             : //
      27                 :             : // In reactor mode, the backend signals "fd is ready for I/O" and this
      28                 :             : // dispatcher performs the synchronous I/O operation and delivers a
      29                 :             : // completion event to the actor's mailbox.
      30                 :             : class ReactorDispatcher {
      31                 :             : public:
      32                 :             :     // Completion handler callback - called when I/O completes
      33                 :             :     using completion_handler = std::function<void(OpCompletion)>;
      34                 :             : 
      35                 :           7 :     void set_completion_handler(completion_handler handler) {
      36                 :           7 :         handler_ = std::move(handler);
      37                 :           7 :     }
      38                 :             : 
      39                 :             :     // Register an fd-to-actor mapping without active I/O tracking.
      40                 :             :     // Use register_recv/register_send/etc. for operations that need
      41                 :             :     // I/O performed on readiness.
      42                 :           1 :     void register_fd(int fd, ActorId actor) {
      43                 :           1 :         fd_to_actor_[fd] = actor;
      44                 :           1 :     }
      45                 :             : 
      46                 :           2 :     void unregister_fd(int fd) {
      47                 :           2 :         fd_to_actor_.erase(fd);
      48                 :           2 :         pending_ops_.erase(fd);
      49                 :           2 :     }
      50                 :             : 
      51                 :             :     // Pending I/O operation tracked per fd
      52                 :             :     struct PendingIO {
      53                 :             :         ActorId actor;
      54                 :             :         OpType type;
      55                 :             :         int buf_count = 0;
      56                 :             :         iovec saved_bufs[16];         // buffers for recv operations
      57                 :             :         std::vector<uint8_t> data;    // concatenated data for send operations
      58                 :             :         sockaddr_storage addr = {};   // target address for sendto/recvfrom
      59                 :             :         socklen_t addrlen = 0;
      60                 :             :     };
      61                 :             : 
      62                 :             :     // Register pending I/O operations.
      63                 :             :     // The dispatcher will perform the I/O when on_readiness fires.
      64                 :             : 
      65                 :           4 :     void register_recv(int fd, ActorId actor, OpType type,
      66                 :             :                        iovec* bufs, int buf_count) {
      67                 :           4 :         PendingIO op;
      68                 :           4 :         op.actor = actor;
      69                 :           4 :         op.type = type;
      70                 :           4 :         op.buf_count = buf_count;
      71                 :           8 :         for (int i = 0; i < buf_count && i < 16; ++i) {
      72                 :           4 :             op.saved_bufs[i] = bufs[i];
      73                 :             :         }
      74                 :           4 :         fd_to_actor_[fd] = actor;
      75                 :           4 :         pending_ops_[fd] = std::move(op);
      76                 :           4 :     }
      77                 :             : 
      78                 :           2 :     void register_send(int fd, ActorId actor, OpType type,
      79                 :             :                        const iovec* bufs, int buf_count) {
      80                 :           2 :         PendingIO op;
      81                 :           2 :         op.actor = actor;
      82                 :           2 :         op.type = type;
      83                 :             :         // Concatenate buffer data for send
      84                 :           2 :         size_t total = 0;
      85                 :           4 :         for (int i = 0; i < buf_count; ++i) {
      86                 :           2 :             total += bufs[i].iov_len;
      87                 :             :         }
      88                 :           2 :         op.data.resize(total);
      89                 :           2 :         size_t offset = 0;
      90                 :           4 :         for (int i = 0; i < buf_count; ++i) {
      91                 :           2 :             if (bufs[i].iov_base && bufs[i].iov_len > 0) {
      92                 :           4 :                 std::memcpy(op.data.data() + offset, bufs[i].iov_base,
      93                 :           2 :                             bufs[i].iov_len);
      94                 :           2 :                 offset += bufs[i].iov_len;
      95                 :             :             }
      96                 :             :         }
      97                 :           2 :         fd_to_actor_[fd] = actor;
      98                 :           2 :         pending_ops_[fd] = std::move(op);
      99                 :           2 :     }
     100                 :             : 
     101                 :           1 :     void register_sendto(int fd, ActorId actor, OpType type,
     102                 :             :                          const iovec* bufs, int buf_count,
     103                 :             :                          const sockaddr* addr, socklen_t addrlen) {
     104                 :           1 :         PendingIO op;
     105                 :           1 :         op.actor = actor;
     106                 :           1 :         op.type = type;
     107                 :           1 :         size_t total = 0;
     108                 :           2 :         for (int i = 0; i < buf_count; ++i) {
     109                 :           1 :             total += bufs[i].iov_len;
     110                 :             :         }
     111                 :           1 :         op.data.resize(total);
     112                 :           1 :         size_t offset = 0;
     113                 :           2 :         for (int i = 0; i < buf_count; ++i) {
     114                 :           1 :             if (bufs[i].iov_base && bufs[i].iov_len > 0) {
     115                 :           2 :                 std::memcpy(op.data.data() + offset, bufs[i].iov_base,
     116                 :           1 :                             bufs[i].iov_len);
     117                 :           1 :                 offset += bufs[i].iov_len;
     118                 :             :             }
     119                 :             :         }
     120                 :           1 :         if (addr && addrlen > 0) {
     121                 :           1 :             std::memcpy(&op.addr, addr, addrlen);
     122                 :           1 :             op.addrlen = addrlen;
     123                 :             :         }
     124                 :           1 :         fd_to_actor_[fd] = actor;
     125                 :           1 :         pending_ops_[fd] = std::move(op);
     126                 :           1 :     }
     127                 :             : 
     128                 :           2 :     void register_accept(int fd, ActorId actor) {
     129                 :           2 :         PendingIO op;
     130                 :           2 :         op.actor = actor;
     131                 :           2 :         op.type = OpType::Accept;
     132                 :           2 :         fd_to_actor_[fd] = actor;
     133                 :           2 :         pending_ops_[fd] = std::move(op);
     134                 :           2 :     }
     135                 :             : 
     136                 :             :     void register_connect(int fd, ActorId actor) {
     137                 :             :         PendingIO op;
     138                 :             :         op.actor = actor;
     139                 :             :         op.type = OpType::Connect;
     140                 :             :         fd_to_actor_[fd] = actor;
     141                 :             :         pending_ops_[fd] = std::move(op);
     142                 :             :     }
     143                 :             : 
     144                 :             :     // Remove pending I/O tracking without removing the fd mapping
     145                 :           3 :     void unregister_io(int fd) {
     146                 :           3 :         pending_ops_.erase(fd);
     147                 :           3 :     }
     148                 :             : 
     149                 :           6 :     bool has_pending(int fd) const {
     150                 :           6 :         return pending_ops_.find(fd) != pending_ops_.end();
     151                 :             :     }
     152                 :             : 
     153                 :             :     // Called by the reactor backend when an fd is ready for I/O.
     154                 :             :     // Performs the pending I/O operation and delivers a completion.
     155                 :           7 :     void on_readiness(int fd, IoEvent events) {
     156                 :           7 :         auto it = pending_ops_.find(fd);
     157                 :           7 :         if (it == pending_ops_.end()) {
     158                 :           2 :             return; // No pending operation for this fd
     159                 :             :         }
     160                 :             : 
     161                 :           5 :         PendingIO& op = it->second;
     162                 :           5 :         OpCompletion completion{};
     163                 :           5 :         completion.actor = op.actor;
     164                 :           5 :         completion.fd = fd;
     165                 :           5 :         completion.user_data = 0;
     166                 :             : 
     167                 :           5 :         switch (op.type) {
     168                 :           2 :             case OpType::Recv:
     169                 :           2 :                 if (int(events) & int(IoEvent::Read)) {
     170                 :           2 :                     do_recv(completion, fd, op);
     171                 :             :                 }
     172                 :           2 :                 break;
     173                 :           0 :             case OpType::RecvFrom:
     174                 :           0 :                 if (int(events) & int(IoEvent::Read)) {
     175                 :           0 :                     do_recvfrom(completion, fd, op);
     176                 :             :                 }
     177                 :           0 :                 break;
     178                 :           1 :             case OpType::Send:
     179                 :           1 :                 if (int(events) & int(IoEvent::Write)) {
     180                 :           1 :                     do_send(completion, fd, op);
     181                 :             :                 }
     182                 :           1 :                 break;
     183                 :           1 :             case OpType::SendTo:
     184                 :           1 :                 if (int(events) & int(IoEvent::Write)) {
     185                 :           1 :                     do_sendto(completion, fd, op);
     186                 :             :                 }
     187                 :           1 :                 break;
     188                 :           1 :             case OpType::Accept:
     189                 :           1 :                 if (int(events) & int(IoEvent::Read)) {
     190                 :           1 :                     do_accept(completion, fd);
     191                 :             :                 }
     192                 :           1 :                 break;
     193                 :           0 :             case OpType::Connect:
     194                 :           0 :                 if (int(events) & int(IoEvent::Write)) {
     195                 :           0 :                     do_connect(completion, fd);
     196                 :             :                 }
     197                 :           0 :                 break;
     198                 :           0 :             default:
     199                 :           0 :                 break;
     200                 :             :         }
     201                 :             : 
     202                 :             :         // If the operation completed (not still pending), deliver and remove
     203                 :           5 :         if (completion.type != OpType::TimerFired) {
     204                 :           5 :             pending_ops_.erase(it);
     205                 :           5 :             if (handler_) {
     206                 :           5 :                 handler_(completion);
     207                 :             :             }
     208                 :             :         }
     209                 :             :     }
     210                 :             : 
     211                 :             : private:
     212                 :           2 :     void do_recv(OpCompletion& completion, int fd, PendingIO& op) {
     213                 :           2 :         ssize_t total_n = 0;
     214                 :           2 :         int last_err = 0;
     215                 :             : 
     216                 :             :         // Drain loop for edge-triggered readiness
     217                 :             :         while (true) {
     218                 :           2 :             ssize_t n = ::readv(fd, op.saved_bufs, op.buf_count);
     219                 :           2 :             if (n < 0) {
     220                 :           1 :                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
     221                 :             :                     break; // No more data
     222                 :             :                 }
     223                 :           1 :                 last_err = errno;
     224                 :           1 :                 break;
     225                 :             :             }
     226                 :           1 :             total_n += n;
     227                 :           1 :             if (n == 0) {
     228                 :           0 :                 break; // EOF
     229                 :             :             }
     230                 :             :             // Check if we filled all buffers
     231                 :           1 :             break;
     232                 :             :         }
     233                 :             : 
     234                 :           2 :         completion.type = OpType::Recv;
     235                 :           2 :         completion.result = (last_err != 0) ? -last_err : static_cast<int>(total_n);
     236                 :           2 :     }
     237                 :             : 
     238                 :           0 :     void do_recvfrom(OpCompletion& completion, int fd, PendingIO& op) {
     239                 :           0 :         ssize_t total_n = 0;
     240                 :           0 :         int last_err = 0;
     241                 :           0 :         socklen_t addrlen = sizeof(op.addr);
     242                 :             : 
     243                 :             :         while (true) {
     244                 :             :             struct msghdr msg;
     245                 :           0 :             std::memset(&msg, 0, sizeof(msg));
     246                 :           0 :             msg.msg_name = &op.addr;
     247                 :           0 :             msg.msg_namelen = addrlen;
     248                 :           0 :             msg.msg_iov = op.saved_bufs;
     249                 :           0 :             msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(op.buf_count);
     250                 :             : 
     251                 :           0 :             ssize_t n = ::recvmsg(fd, &msg, 0);
     252                 :           0 :             if (n < 0) {
     253                 :           0 :                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
     254                 :             :                     break;
     255                 :             :                 }
     256                 :           0 :                 last_err = errno;
     257                 :           0 :                 break;
     258                 :             :             }
     259                 :           0 :             total_n += n;
     260                 :           0 :             completion.src_addr_len = msg.msg_namelen;
     261                 :           0 :             break;
     262                 :             :         }
     263                 :             : 
     264                 :           0 :         completion.type = OpType::RecvFrom;
     265                 :           0 :         completion.result = (last_err != 0) ? -last_err : static_cast<int>(total_n);
     266                 :           0 :         if (total_n > 0) {
     267                 :           0 :             std::memcpy(&completion.src_addr, &op.addr,
     268                 :             :                         sizeof(completion.src_addr));
     269                 :             :         }
     270                 :           0 :     }
     271                 :             : 
     272                 :           1 :     void do_send(OpCompletion& completion, int fd, PendingIO& op) {
     273                 :           1 :         ssize_t total_n = 0;
     274                 :           1 :         int last_err = 0;
     275                 :             : 
     276                 :           2 :         while (total_n < static_cast<ssize_t>(op.data.size())) {
     277                 :           1 :             ssize_t n = ::send(fd, op.data.data() + total_n,
     278                 :           1 :                                op.data.size() - static_cast<size_t>(total_n), 0);
     279                 :           1 :             if (n < 0) {
     280                 :           0 :                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
     281                 :           0 :                     if (total_n > 0) {
     282                 :           0 :                         break; // Partial send, report what we sent
     283                 :             :                     }
     284                 :           0 :                     last_err = EAGAIN;
     285                 :           0 :                     break;
     286                 :             :                 }
     287                 :           0 :                 last_err = errno;
     288                 :           0 :                 break;
     289                 :             :             }
     290                 :           1 :             total_n += n;
     291                 :             :         }
     292                 :             : 
     293                 :           1 :         completion.type = OpType::Send;
     294                 :           1 :         completion.result = (last_err != 0) ? -last_err : static_cast<int>(total_n);
     295                 :           1 :     }
     296                 :             : 
     297                 :           1 :     void do_sendto(OpCompletion& completion, int fd, PendingIO& op) {
     298                 :           1 :         ssize_t total_n = 0;
     299                 :           1 :         int last_err = 0;
     300                 :             : 
     301                 :           2 :         while (total_n < static_cast<ssize_t>(op.data.size())) {
     302                 :           1 :             ssize_t n = ::sendto(fd, op.data.data() + total_n,
     303                 :           1 :                                  op.data.size() - static_cast<size_t>(total_n), 0,
     304                 :           1 :                                  reinterpret_cast<const sockaddr*>(&op.addr),
     305                 :             :                                  op.addrlen);
     306                 :           1 :             if (n < 0) {
     307                 :           0 :                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
     308                 :           0 :                     if (total_n > 0) {
     309                 :           0 :                         break;
     310                 :             :                     }
     311                 :           0 :                     last_err = EAGAIN;
     312                 :           0 :                     break;
     313                 :             :                 }
     314                 :           0 :                 last_err = errno;
     315                 :           0 :                 break;
     316                 :             :             }
     317                 :           1 :             total_n += n;
     318                 :             :         }
     319                 :             : 
     320                 :           1 :         completion.type = OpType::SendTo;
     321                 :           1 :         completion.result = (last_err != 0) ? -last_err : static_cast<int>(total_n);
     322                 :           1 :     }
     323                 :             : 
     324                 :           1 :     void do_accept(OpCompletion& completion, int fd) {
     325                 :             :         // Ensure non-blocking
     326                 :           1 :         int flags = fcntl(fd, F_GETFL, 0);
     327                 :           1 :         if (flags >= 0 && !(flags & O_NONBLOCK)) {
     328                 :           1 :             fcntl(fd, F_SETFL, flags | O_NONBLOCK);
     329                 :             :         }
     330                 :             : 
     331                 :           1 :         int client_fd = ::accept(fd, nullptr, nullptr);
     332                 :           1 :         if (client_fd >= 0) {
     333                 :             :             // Set client fd to non-blocking
     334                 :           1 :             int cflags = fcntl(client_fd, F_GETFL, 0);
     335                 :           1 :             if (cflags >= 0) {
     336                 :           1 :                 fcntl(client_fd, F_SETFL, cflags | O_NONBLOCK);
     337                 :             :             }
     338                 :             :         }
     339                 :             : 
     340                 :           1 :         completion.type = OpType::Accept;
     341                 :           1 :         completion.result = (client_fd >= 0) ? client_fd : -errno;
     342                 :           1 :         completion.fd = (client_fd >= 0) ? client_fd : -1;
     343                 :           1 :     }
     344                 :             : 
     345                 :           0 :     void do_connect(OpCompletion& completion, int fd) {
     346                 :             :         // Check connect result via SO_ERROR
     347                 :           0 :         int error = 0;
     348                 :           0 :         socklen_t len = sizeof(error);
     349                 :           0 :         if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
     350                 :           0 :             error = errno;
     351                 :             :         }
     352                 :             : 
     353                 :           0 :         completion.type = OpType::Connect;
     354                 :           0 :         completion.result = (error == 0) ? 0 : -error;
     355                 :           0 :     }
     356                 :             : 
     357                 :             :     std::unordered_map<int, ActorId> fd_to_actor_;
     358                 :             :     std::unordered_map<int, PendingIO> pending_ops_;
     359                 :             :     completion_handler handler_;
     360                 :             : };
     361                 :             : 
     362                 :             : } // namespace net
     363                 :             : } // namespace hpactor
        

Generated by: LCOV version 2.0-1