Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : #pragma once
3 : :
4 : : #include <hpactor/net/async_io_fwd.hpp>
5 : : #include <hpactor/net/reactor_backend.hpp>
6 : : #include <hpactor/types/types.hpp>
7 : :
8 : : #include <cerrno>
9 : : #include <cstring>
10 : : #include <fcntl.h>
11 : : #include <functional>
12 : : #include <sys/socket.h>
13 : : #include <sys/uio.h>
14 : : #include <unistd.h>
15 : : #include <unordered_map>
16 : : #include <vector>
17 : :
18 : : namespace hpactor {
19 : :
20 : : class ActorSystem;
21 : :
22 : : namespace net {
23 : :
24 : : // ReactorDispatcher - dispatches reactor (sync I/O) readiness events
25 : : // to the appropriate actor mailbox.
26 : : //
27 : : // In reactor mode, the backend signals "fd is ready for I/O" and this
28 : : // dispatcher performs the synchronous I/O operation and delivers a
29 : : // completion event to the actor's mailbox.
30 : : class ReactorDispatcher {
31 : : public:
32 : : // Completion handler callback - called when I/O completes
33 : : using completion_handler = std::function<void(OpCompletion)>;
34 : :
35 : 7 : void set_completion_handler(completion_handler handler) {
36 : 7 : handler_ = std::move(handler);
37 : 7 : }
38 : :
39 : : // Register an fd-to-actor mapping without active I/O tracking.
40 : : // Use register_recv/register_send/etc. for operations that need
41 : : // I/O performed on readiness.
42 : 1 : void register_fd(int fd, ActorId actor) {
43 : 1 : fd_to_actor_[fd] = actor;
44 : 1 : }
45 : :
46 : 2 : void unregister_fd(int fd) {
47 : 2 : fd_to_actor_.erase(fd);
48 : 2 : pending_ops_.erase(fd);
49 : 2 : }
50 : :
51 : : // Pending I/O operation tracked per fd
52 : : struct PendingIO {
53 : : ActorId actor;
54 : : OpType type;
55 : : int buf_count = 0;
56 : : iovec saved_bufs[16]; // buffers for recv operations
57 : : std::vector<uint8_t> data; // concatenated data for send operations
58 : : sockaddr_storage addr = {}; // target address for sendto/recvfrom
59 : : socklen_t addrlen = 0;
60 : : };
61 : :
62 : : // Register pending I/O operations.
63 : : // The dispatcher will perform the I/O when on_readiness fires.
64 : :
65 : 4 : void register_recv(int fd, ActorId actor, OpType type,
66 : : iovec* bufs, int buf_count) {
67 : 4 : PendingIO op;
68 : 4 : op.actor = actor;
69 : 4 : op.type = type;
70 : 4 : op.buf_count = buf_count;
71 : 8 : for (int i = 0; i < buf_count && i < 16; ++i) {
72 : 4 : op.saved_bufs[i] = bufs[i];
73 : : }
74 : 4 : fd_to_actor_[fd] = actor;
75 : 4 : pending_ops_[fd] = std::move(op);
76 : 4 : }
77 : :
78 : 2 : void register_send(int fd, ActorId actor, OpType type,
79 : : const iovec* bufs, int buf_count) {
80 : 2 : PendingIO op;
81 : 2 : op.actor = actor;
82 : 2 : op.type = type;
83 : : // Concatenate buffer data for send
84 : 2 : size_t total = 0;
85 : 4 : for (int i = 0; i < buf_count; ++i) {
86 : 2 : total += bufs[i].iov_len;
87 : : }
88 : 2 : op.data.resize(total);
89 : 2 : size_t offset = 0;
90 : 4 : for (int i = 0; i < buf_count; ++i) {
91 : 2 : if (bufs[i].iov_base && bufs[i].iov_len > 0) {
92 : 4 : std::memcpy(op.data.data() + offset, bufs[i].iov_base,
93 : 2 : bufs[i].iov_len);
94 : 2 : offset += bufs[i].iov_len;
95 : : }
96 : : }
97 : 2 : fd_to_actor_[fd] = actor;
98 : 2 : pending_ops_[fd] = std::move(op);
99 : 2 : }
100 : :
101 : 1 : void register_sendto(int fd, ActorId actor, OpType type,
102 : : const iovec* bufs, int buf_count,
103 : : const sockaddr* addr, socklen_t addrlen) {
104 : 1 : PendingIO op;
105 : 1 : op.actor = actor;
106 : 1 : op.type = type;
107 : 1 : size_t total = 0;
108 : 2 : for (int i = 0; i < buf_count; ++i) {
109 : 1 : total += bufs[i].iov_len;
110 : : }
111 : 1 : op.data.resize(total);
112 : 1 : size_t offset = 0;
113 : 2 : for (int i = 0; i < buf_count; ++i) {
114 : 1 : if (bufs[i].iov_base && bufs[i].iov_len > 0) {
115 : 2 : std::memcpy(op.data.data() + offset, bufs[i].iov_base,
116 : 1 : bufs[i].iov_len);
117 : 1 : offset += bufs[i].iov_len;
118 : : }
119 : : }
120 : 1 : if (addr && addrlen > 0) {
121 : 1 : std::memcpy(&op.addr, addr, addrlen);
122 : 1 : op.addrlen = addrlen;
123 : : }
124 : 1 : fd_to_actor_[fd] = actor;
125 : 1 : pending_ops_[fd] = std::move(op);
126 : 1 : }
127 : :
128 : 2 : void register_accept(int fd, ActorId actor) {
129 : 2 : PendingIO op;
130 : 2 : op.actor = actor;
131 : 2 : op.type = OpType::Accept;
132 : 2 : fd_to_actor_[fd] = actor;
133 : 2 : pending_ops_[fd] = std::move(op);
134 : 2 : }
135 : :
136 : : void register_connect(int fd, ActorId actor) {
137 : : PendingIO op;
138 : : op.actor = actor;
139 : : op.type = OpType::Connect;
140 : : fd_to_actor_[fd] = actor;
141 : : pending_ops_[fd] = std::move(op);
142 : : }
143 : :
144 : : // Remove pending I/O tracking without removing the fd mapping
145 : 3 : void unregister_io(int fd) {
146 : 3 : pending_ops_.erase(fd);
147 : 3 : }
148 : :
149 : 6 : bool has_pending(int fd) const {
150 : 6 : return pending_ops_.find(fd) != pending_ops_.end();
151 : : }
152 : :
153 : : // Called by the reactor backend when an fd is ready for I/O.
154 : : // Performs the pending I/O operation and delivers a completion.
155 : 7 : void on_readiness(int fd, IoEvent events) {
156 : 7 : auto it = pending_ops_.find(fd);
157 : 7 : if (it == pending_ops_.end()) {
158 : 2 : return; // No pending operation for this fd
159 : : }
160 : :
161 : 5 : PendingIO& op = it->second;
162 : 5 : OpCompletion completion{};
163 : 5 : completion.actor = op.actor;
164 : 5 : completion.fd = fd;
165 : 5 : completion.user_data = 0;
166 : :
167 : 5 : switch (op.type) {
168 : 2 : case OpType::Recv:
169 : 2 : if (int(events) & int(IoEvent::Read)) {
170 : 2 : do_recv(completion, fd, op);
171 : : }
172 : 2 : break;
173 : 0 : case OpType::RecvFrom:
174 : 0 : if (int(events) & int(IoEvent::Read)) {
175 : 0 : do_recvfrom(completion, fd, op);
176 : : }
177 : 0 : break;
178 : 1 : case OpType::Send:
179 : 1 : if (int(events) & int(IoEvent::Write)) {
180 : 1 : do_send(completion, fd, op);
181 : : }
182 : 1 : break;
183 : 1 : case OpType::SendTo:
184 : 1 : if (int(events) & int(IoEvent::Write)) {
185 : 1 : do_sendto(completion, fd, op);
186 : : }
187 : 1 : break;
188 : 1 : case OpType::Accept:
189 : 1 : if (int(events) & int(IoEvent::Read)) {
190 : 1 : do_accept(completion, fd);
191 : : }
192 : 1 : break;
193 : 0 : case OpType::Connect:
194 : 0 : if (int(events) & int(IoEvent::Write)) {
195 : 0 : do_connect(completion, fd);
196 : : }
197 : 0 : break;
198 : 0 : default:
199 : 0 : break;
200 : : }
201 : :
202 : : // If the operation completed (not still pending), deliver and remove
203 : 5 : if (completion.type != OpType::TimerFired) {
204 : 5 : pending_ops_.erase(it);
205 : 5 : if (handler_) {
206 : 5 : handler_(completion);
207 : : }
208 : : }
209 : : }
210 : :
211 : : private:
212 : 2 : void do_recv(OpCompletion& completion, int fd, PendingIO& op) {
213 : 2 : ssize_t total_n = 0;
214 : 2 : int last_err = 0;
215 : :
216 : : // Drain loop for edge-triggered readiness
217 : : while (true) {
218 : 2 : ssize_t n = ::readv(fd, op.saved_bufs, op.buf_count);
219 : 2 : if (n < 0) {
220 : 1 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
221 : : break; // No more data
222 : : }
223 : 1 : last_err = errno;
224 : 1 : break;
225 : : }
226 : 1 : total_n += n;
227 : 1 : if (n == 0) {
228 : 0 : break; // EOF
229 : : }
230 : : // Check if we filled all buffers
231 : 1 : break;
232 : : }
233 : :
234 : 2 : completion.type = OpType::Recv;
235 : 2 : completion.result = (last_err != 0) ? -last_err : static_cast<int>(total_n);
236 : 2 : }
237 : :
238 : 0 : void do_recvfrom(OpCompletion& completion, int fd, PendingIO& op) {
239 : 0 : ssize_t total_n = 0;
240 : 0 : int last_err = 0;
241 : 0 : socklen_t addrlen = sizeof(op.addr);
242 : :
243 : : while (true) {
244 : : struct msghdr msg;
245 : 0 : std::memset(&msg, 0, sizeof(msg));
246 : 0 : msg.msg_name = &op.addr;
247 : 0 : msg.msg_namelen = addrlen;
248 : 0 : msg.msg_iov = op.saved_bufs;
249 : 0 : msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(op.buf_count);
250 : :
251 : 0 : ssize_t n = ::recvmsg(fd, &msg, 0);
252 : 0 : if (n < 0) {
253 : 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
254 : : break;
255 : : }
256 : 0 : last_err = errno;
257 : 0 : break;
258 : : }
259 : 0 : total_n += n;
260 : 0 : completion.src_addr_len = msg.msg_namelen;
261 : 0 : break;
262 : : }
263 : :
264 : 0 : completion.type = OpType::RecvFrom;
265 : 0 : completion.result = (last_err != 0) ? -last_err : static_cast<int>(total_n);
266 : 0 : if (total_n > 0) {
267 : 0 : std::memcpy(&completion.src_addr, &op.addr,
268 : : sizeof(completion.src_addr));
269 : : }
270 : 0 : }
271 : :
272 : 1 : void do_send(OpCompletion& completion, int fd, PendingIO& op) {
273 : 1 : ssize_t total_n = 0;
274 : 1 : int last_err = 0;
275 : :
276 : 2 : while (total_n < static_cast<ssize_t>(op.data.size())) {
277 : 1 : ssize_t n = ::send(fd, op.data.data() + total_n,
278 : 1 : op.data.size() - static_cast<size_t>(total_n), 0);
279 : 1 : if (n < 0) {
280 : 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
281 : 0 : if (total_n > 0) {
282 : 0 : break; // Partial send, report what we sent
283 : : }
284 : 0 : last_err = EAGAIN;
285 : 0 : break;
286 : : }
287 : 0 : last_err = errno;
288 : 0 : break;
289 : : }
290 : 1 : total_n += n;
291 : : }
292 : :
293 : 1 : completion.type = OpType::Send;
294 : 1 : completion.result = (last_err != 0) ? -last_err : static_cast<int>(total_n);
295 : 1 : }
296 : :
297 : 1 : void do_sendto(OpCompletion& completion, int fd, PendingIO& op) {
298 : 1 : ssize_t total_n = 0;
299 : 1 : int last_err = 0;
300 : :
301 : 2 : while (total_n < static_cast<ssize_t>(op.data.size())) {
302 : 1 : ssize_t n = ::sendto(fd, op.data.data() + total_n,
303 : 1 : op.data.size() - static_cast<size_t>(total_n), 0,
304 : 1 : reinterpret_cast<const sockaddr*>(&op.addr),
305 : : op.addrlen);
306 : 1 : if (n < 0) {
307 : 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
308 : 0 : if (total_n > 0) {
309 : 0 : break;
310 : : }
311 : 0 : last_err = EAGAIN;
312 : 0 : break;
313 : : }
314 : 0 : last_err = errno;
315 : 0 : break;
316 : : }
317 : 1 : total_n += n;
318 : : }
319 : :
320 : 1 : completion.type = OpType::SendTo;
321 : 1 : completion.result = (last_err != 0) ? -last_err : static_cast<int>(total_n);
322 : 1 : }
323 : :
324 : 1 : void do_accept(OpCompletion& completion, int fd) {
325 : : // Ensure non-blocking
326 : 1 : int flags = fcntl(fd, F_GETFL, 0);
327 : 1 : if (flags >= 0 && !(flags & O_NONBLOCK)) {
328 : 1 : fcntl(fd, F_SETFL, flags | O_NONBLOCK);
329 : : }
330 : :
331 : 1 : int client_fd = ::accept(fd, nullptr, nullptr);
332 : 1 : if (client_fd >= 0) {
333 : : // Set client fd to non-blocking
334 : 1 : int cflags = fcntl(client_fd, F_GETFL, 0);
335 : 1 : if (cflags >= 0) {
336 : 1 : fcntl(client_fd, F_SETFL, cflags | O_NONBLOCK);
337 : : }
338 : : }
339 : :
340 : 1 : completion.type = OpType::Accept;
341 : 1 : completion.result = (client_fd >= 0) ? client_fd : -errno;
342 : 1 : completion.fd = (client_fd >= 0) ? client_fd : -1;
343 : 1 : }
344 : :
345 : 0 : void do_connect(OpCompletion& completion, int fd) {
346 : : // Check connect result via SO_ERROR
347 : 0 : int error = 0;
348 : 0 : socklen_t len = sizeof(error);
349 : 0 : if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
350 : 0 : error = errno;
351 : : }
352 : :
353 : 0 : completion.type = OpType::Connect;
354 : 0 : completion.result = (error == 0) ? 0 : -error;
355 : 0 : }
356 : :
357 : : std::unordered_map<int, ActorId> fd_to_actor_;
358 : : std::unordered_map<int, PendingIO> pending_ops_;
359 : : completion_handler handler_;
360 : : };
361 : :
362 : : } // namespace net
363 : : } // namespace hpactor
|