Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/net/event_loop.hpp>
16 : : #include <hpactor/net/reactor/epoll_backend.hpp>
17 : :
18 : : #if defined(__linux__)
19 : : # include <cerrno>
20 : : # include <cstdlib>
21 : : # include <cstring>
22 : : # include <ctime>
23 : : # include <fcntl.h>
24 : : # include <sys/epoll.h>
25 : : # include <sys/socket.h>
26 : : # include <sys/timerfd.h>
27 : : # include <sys/uio.h>
28 : : # include <unistd.h>
29 : : #endif
30 : :
31 : : namespace hpactor {
32 : : namespace net {
33 : :
34 : : #if defined(__linux__)
35 : :
36 : 75 : EpollBackend::EpollBackend() = default;
37 : :
38 : 150 : EpollBackend::~EpollBackend() {
39 : 75 : stop();
40 : 150 : }
41 : :
42 : 84 : bool EpollBackend::start() {
43 : 84 : epoll_fd_ = epoll_create1(EPOLL_CLOEXEC);
44 : 84 : if (epoll_fd_ < 0) {
45 : 0 : return false;
46 : : }
47 : :
48 : : // Create timerfd for timer events
49 : 84 : timerfd_ = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
50 : 84 : if (timerfd_ < 0) {
51 : 0 : close(epoll_fd_);
52 : 0 : epoll_fd_ = -1;
53 : 0 : return false;
54 : : }
55 : :
56 : : // Add timerfd to epoll
57 : : struct epoll_event ev;
58 : 84 : ev.events = EPOLLIN;
59 : 84 : ev.data.fd = timerfd_;
60 : 84 : if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timerfd_, &ev) < 0) {
61 : 0 : close(timerfd_);
62 : 0 : close(epoll_fd_);
63 : 0 : timerfd_ = -1;
64 : 0 : epoll_fd_ = -1;
65 : 0 : return false;
66 : : }
67 : :
68 : 84 : running_ = true;
69 : 84 : return true;
70 : : }
71 : :
72 : 81 : void EpollBackend::stop() {
73 : 81 : if (!running_) {
74 : 6 : return;
75 : : }
76 : 75 : running_ = false;
77 : :
78 : 75 : if (timerfd_ >= 0) {
79 : 75 : close(timerfd_);
80 : 75 : timerfd_ = -1;
81 : : }
82 : :
83 : 75 : if (epoll_fd_ >= 0) {
84 : 75 : close(epoll_fd_);
85 : 75 : epoll_fd_ = -1;
86 : : }
87 : :
88 : 75 : timers_.clear();
89 : 75 : cancelled_timers_.clear();
90 : 75 : handle_to_timer_index_.clear();
91 : 75 : fd_events_.clear();
92 : 75 : pending_ops_.clear();
93 : : }
94 : :
95 : 81 : bool EpollBackend::add_fd(int fd, IoEvent events) {
96 : 81 : uint32_t epoll_events = 0;
97 : : using T = std::underlying_type_t<IoEvent>;
98 : 81 : if (static_cast<T>(events) & static_cast<T>(IoEvent::Read)) {
99 : 81 : epoll_events |= EPOLLIN;
100 : : }
101 : 81 : if (static_cast<T>(events) & static_cast<T>(IoEvent::Write)) {
102 : 19 : epoll_events |= EPOLLOUT;
103 : : }
104 : :
105 : : struct epoll_event ev;
106 : 81 : ev.events = epoll_events | EPOLLET;
107 : 81 : ev.data.fd = fd;
108 : :
109 : 81 : int op = fd_events_.count(fd) ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
110 : 81 : if (epoll_ctl(epoll_fd_, op, fd, &ev) < 0) {
111 : 0 : return false;
112 : : }
113 : :
114 : 81 : fd_events_[fd] = epoll_events | EPOLLET;
115 : 81 : return true;
116 : : }
117 : :
118 : 0 : bool EpollBackend::update_fd(int fd, IoEvent events) {
119 : 0 : return add_fd(fd, events); // Same operation
120 : : }
121 : :
122 : 80 : bool EpollBackend::remove_fd(int fd) {
123 : 80 : if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr) < 0) {
124 : 10 : return false;
125 : : }
126 : 70 : fd_events_.erase(fd);
127 : 70 : pending_ops_.erase(fd);
128 : 70 : read_handlers_.erase(fd);
129 : 70 : write_handlers_.erase(fd);
130 : 70 : return true;
131 : : }
132 : :
133 : 0 : int EpollBackend::register_buffer(const void* addr, size_t len) {
134 : : (void)addr;
135 : : (void)len;
136 : : // Fixed buffers not supported with epoll
137 : 0 : return -1;
138 : : }
139 : :
140 : 0 : bool EpollBackend::unregister_buffer(int buffer_id) {
141 : : (void)buffer_id;
142 : 0 : return false;
143 : : }
144 : :
145 : 0 : uint64_t EpollBackend::encode_user_data(int fd, ActorId actor, uint32_t op_type) {
146 : 0 : return (static_cast<uint64_t>(fd) & 0xFFFFFFFFULL) |
147 : 0 : ((actor.value() & 0xFFFFULL) << 32) |
148 : 0 : ((static_cast<uint64_t>(op_type) & 0xFFULL) << 56);
149 : : }
150 : :
151 : 0 : void EpollBackend::decode_user_data(uint64_t ud, int& fd, ActorId& actor,
152 : : uint32_t& op_type) {
153 : 0 : fd = static_cast<int>(ud & 0xFFFFFFFFULL);
154 : 0 : actor = ActorId(static_cast<uint32_t>((ud >> 32) & 0xFFFFULL));
155 : 0 : op_type = static_cast<uint32_t>((ud >> 56) & 0xFFULL);
156 : 0 : }
157 : :
158 : 20 : int EpollBackend::process_timers() {
159 : 20 : int triggered = 0;
160 : :
161 : : // Read expired timerfd events
162 : : uint64_t expirations;
163 : 40 : while (read(timerfd_, &expirations, sizeof(expirations)) > 0) {
164 : 20 : triggered++;
165 : : }
166 : :
167 : : // Calculate current time
168 : : struct timespec ts;
169 : 20 : clock_gettime(CLOCK_MONOTONIC, &ts);
170 : 20 : int64_t now_ms = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
171 : :
172 : : // Process expired timers
173 : 72 : for (auto& timer : timers_) {
174 : 52 : if (timer.expires_at_ms <= now_ms) {
175 : 39 : if (cancelled_timers_.count(timer.handle) == 0) {
176 : 33 : OpCompletion completion{
177 : : .actor = timer.actor,
178 : : .type = OpType::TimerFired,
179 : : .fd = -1,
180 : : .result = 0,
181 : 33 : .user_data = timer.handle,
182 : 33 : };
183 : 33 : deliver_completion(completion);
184 : 33 : triggered++;
185 : :
186 : : // Reschedule if repeating
187 : 33 : if (timer.interval_ms > 0) {
188 : 4 : timer.expires_at_ms = now_ms + timer.interval_ms;
189 : : } else {
190 : 29 : timer.expires_at_ms = -1; // Mark for removal
191 : : }
192 : : }
193 : : }
194 : : }
195 : :
196 : : // Remove expired one-shot timers
197 : 40 : timers_.erase(
198 : 20 : std::remove_if(timers_.begin(), timers_.end(),
199 : 52 : [](const TimerEntry& t) { return t.expires_at_ms < 0; }),
200 : 20 : timers_.end());
201 : :
202 : : // Sort remaining timers by expiry
203 : 20 : std::sort(timers_.begin(), timers_.end(),
204 : 14 : [](const TimerEntry& a, const TimerEntry& b) {
205 : 14 : return a.expires_at_ms < b.expires_at_ms;
206 : : });
207 : :
208 : : // Update timerfd to next expiry if we have timers
209 : 20 : if (!timers_.empty()) {
210 : 16 : int64_t nextExpiry = timers_.front().expires_at_ms;
211 : : struct timespec now_ts;
212 : 16 : clock_gettime(CLOCK_MONOTONIC, &now_ts);
213 : 16 : int64_t now_abs_ms = now_ts.tv_sec * 1000 + now_ts.tv_nsec / 1000000;
214 : 16 : int64_t delay_ms = nextExpiry - now_abs_ms;
215 : 16 : if (delay_ms < 1)
216 : 6 : delay_ms = 1;
217 : 16 : struct itimerspec new_val{};
218 : 16 : new_val.it_value.tv_sec = delay_ms / 1000;
219 : 16 : new_val.it_value.tv_nsec = (delay_ms % 1000) * 1000000;
220 : 16 : timerfd_settime(timerfd_, 0, &new_val, nullptr);
221 : : } else {
222 : : // No timers, disarm
223 : 4 : struct itimerspec new_val{};
224 : 4 : timerfd_settime(timerfd_, 0, &new_val, nullptr);
225 : : }
226 : :
227 : 20 : return triggered;
228 : : }
229 : :
230 : : // --- Sub-methods extracted from process_pending_op ---
231 : :
232 : 0 : void EpollBackend::try_pending_accept(int fd, PendingOp& op) {
233 : : while (true) {
234 : 0 : int client_fd = ::accept(fd, nullptr, nullptr);
235 : 0 : if (client_fd < 0) {
236 : 0 : if (errno == EAGAIN)
237 : 0 : break;
238 : 0 : break;
239 : : }
240 : 0 : add_fd(client_fd, IoEvent::Read);
241 : :
242 : 0 : OpCompletion completion{
243 : : .actor = op.actor,
244 : : .type = OpType::Accept,
245 : : .fd = client_fd,
246 : : .result = client_fd,
247 : : .user_data = 0,
248 : 0 : };
249 : : {
250 : 0 : std::lock_guard<std::mutex> lock(completions_mutex_);
251 : 0 : pending_completions_.push_back(completion);
252 : 0 : }
253 : 0 : }
254 : 0 : }
255 : :
256 : 0 : bool EpollBackend::try_pending_connect(int fd, PendingOp& op, uint32_t events,
257 : : int& error) {
258 : : (void)fd;
259 : : (void)op;
260 : 0 : if (!(events & EPOLLOUT))
261 : 0 : return false;
262 : 0 : int err = 0;
263 : 0 : socklen_t errlen = sizeof(err);
264 : 0 : getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen);
265 : 0 : error = err;
266 : 0 : return true;
267 : : }
268 : :
269 : 0 : bool EpollBackend::try_pending_send(int fd, PendingOp& op, uint32_t events,
270 : : ssize_t& total, int& error) {
271 : 0 : if (!(events & EPOLLOUT))
272 : 0 : return false;
273 : :
274 : 0 : OpType optype = static_cast<OpType>(op.op_type);
275 : : while (true) {
276 : : ssize_t n;
277 : 0 : if (optype == OpType::SendTo) {
278 : 0 : n = ::sendto(fd, op.data.data(), op.data.size(), 0,
279 : 0 : reinterpret_cast<const sockaddr*>(&op.addr), op.addrlen);
280 : : } else {
281 : 0 : n = ::send(fd, op.data.data(), op.data.size(), 0);
282 : : }
283 : 0 : if (n < 0) {
284 : 0 : if (errno == EAGAIN)
285 : 0 : return false; // Keep pending op
286 : 0 : error = errno;
287 : 0 : return true;
288 : : }
289 : 0 : total += n;
290 : 0 : if (n < static_cast<ssize_t>(op.data.size())) {
291 : 0 : op.data.erase(op.data.begin(), op.data.begin() + n);
292 : : } else {
293 : 0 : return true; // All data sent
294 : : }
295 : 0 : }
296 : : }
297 : :
298 : 0 : bool EpollBackend::try_pending_recv(int fd, PendingOp& op, uint32_t events,
299 : : ssize_t& total, int& error) {
300 : 0 : if (!(events & EPOLLIN))
301 : 0 : return false;
302 : :
303 : 0 : OpType optype = static_cast<OpType>(op.op_type);
304 : : while (true) {
305 : : ssize_t n;
306 : 0 : if (optype == OpType::RecvFrom) {
307 : 0 : struct msghdr hdr = {};
308 : 0 : hdr.msg_iov = op.saved_bufs;
309 : 0 : hdr.msg_iovlen = static_cast<decltype(hdr.msg_iovlen)>(op.buf_count);
310 : 0 : n = ::recvmsg(fd, &hdr, 0);
311 : : } else {
312 : 0 : n = ::readv(fd, op.saved_bufs, op.buf_count);
313 : : }
314 : 0 : if (n < 0) {
315 : 0 : if (errno == EAGAIN)
316 : 0 : return false; // Keep pending op
317 : 0 : error = errno;
318 : 0 : return true;
319 : : }
320 : 0 : total += n;
321 : 0 : if (n == 0)
322 : 0 : return true; // EOF
323 : 0 : }
324 : : }
325 : :
326 : 0 : void EpollBackend::deliver_op_completion(const PendingOp& op, OpType optype,
327 : : int fd, ssize_t total, int error) {
328 : 0 : OpCompletion completion{
329 : : .actor = op.actor,
330 : : .type = optype,
331 : : .fd = fd,
332 : 0 : .result = (error != 0) ? -error : static_cast<int>(total),
333 : : .user_data = 0,
334 : 0 : };
335 : : {
336 : 0 : std::lock_guard<std::mutex> lock(completions_mutex_);
337 : 0 : pending_completions_.push_back(completion);
338 : 0 : }
339 : 0 : }
340 : :
341 : : // --- Dispatcher ---
342 : :
343 : 0 : void EpollBackend::process_pending_op(int fd, uint32_t events) {
344 : 0 : auto it = pending_ops_.find(fd);
345 : 0 : if (it == pending_ops_.end())
346 : 0 : return;
347 : :
348 : 0 : PendingOp& op = it->second;
349 : 0 : OpType optype = static_cast<OpType>(op.op_type);
350 : :
351 : 0 : switch (optype) {
352 : 0 : case OpType::Accept:
353 : 0 : try_pending_accept(fd, op);
354 : 0 : return; // Keep pending op for re-trigger
355 : :
356 : 0 : case OpType::Connect: {
357 : 0 : int error = 0;
358 : 0 : if (try_pending_connect(fd, op, events, error)) {
359 : 0 : deliver_op_completion(op, optype, fd, 0, error);
360 : 0 : pending_ops_.erase(it);
361 : : }
362 : 0 : return;
363 : : }
364 : 0 : case OpType::Send:
365 : : case OpType::SendTo: {
366 : 0 : ssize_t total = 0;
367 : 0 : int error = 0;
368 : 0 : if (try_pending_send(fd, op, events, total, error)) {
369 : 0 : deliver_op_completion(op, optype, fd, total, error);
370 : 0 : pending_ops_.erase(it);
371 : : }
372 : 0 : return;
373 : : }
374 : 0 : case OpType::Recv:
375 : : case OpType::RecvFrom: {
376 : 0 : ssize_t total = 0;
377 : 0 : int error = 0;
378 : 0 : if (try_pending_recv(fd, op, events, total, error)) {
379 : 0 : deliver_op_completion(op, optype, fd, total, error);
380 : 0 : pending_ops_.erase(it);
381 : : }
382 : 0 : return;
383 : : }
384 : 0 : default:
385 : 0 : deliver_op_completion(op, optype, fd, 0, EINVAL);
386 : 0 : pending_ops_.erase(it);
387 : 0 : return;
388 : : }
389 : : }
390 : :
391 : : // --- Event dispatch ---
392 : :
393 : 60 : void EpollBackend::dispatch_event(int fd, uint32_t events) {
394 : 60 : if (fd == timerfd_) {
395 : 20 : process_timers();
396 : 40 : } else if (fd >= 0) {
397 : 40 : if (pending_ops_.find(fd) != pending_ops_.end()) {
398 : 0 : process_pending_op(fd, events);
399 : : } else {
400 : 40 : if (events & EPOLLIN) {
401 : 40 : service_read_handler(fd);
402 : : }
403 : 40 : if (events & EPOLLOUT) {
404 : 0 : service_write_handler(fd);
405 : : }
406 : : }
407 : : }
408 : 60 : }
409 : :
410 : 49 : uint64_t EpollBackend::run_after(ActorId actor, int delay_ms) {
411 : 49 : std::lock_guard<std::mutex> lock(mutex_);
412 : :
413 : 49 : uint64_t handle = next_timer_handle_++;
414 : :
415 : : struct timespec ts;
416 : 49 : clock_gettime(CLOCK_MONOTONIC, &ts);
417 : 49 : int64_t expires_at_ms = ts.tv_sec * 1000 + ts.tv_nsec / 1000000 + delay_ms;
418 : :
419 : 49 : TimerEntry entry;
420 : 49 : entry.expires_at_ms = expires_at_ms;
421 : 49 : entry.actor = actor;
422 : 49 : entry.interval_ms = 0;
423 : 49 : entry.handle = handle;
424 : :
425 : 49 : timers_.push_back(entry);
426 : 49 : handle_to_timer_index_[handle] = timers_.size() - 1;
427 : :
428 : : // Sort timers by expiry
429 : 49 : std::sort(timers_.begin(), timers_.end(),
430 : 592 : [](const TimerEntry& a, const TimerEntry& b) {
431 : 592 : return a.expires_at_ms < b.expires_at_ms;
432 : : });
433 : :
434 : : // Update timerfd to fire at earliest timer (relative delay from now)
435 : 49 : int64_t front_delay = timers_.front().expires_at_ms -
436 : 49 : (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
437 : 49 : if (front_delay < 1)
438 : 0 : front_delay = 1;
439 : 49 : struct itimerspec new_val{};
440 : 49 : new_val.it_value.tv_sec = front_delay / 1000;
441 : 49 : new_val.it_value.tv_nsec = (front_delay % 1000) * 1000000;
442 : 49 : timerfd_settime(timerfd_, 0, &new_val, nullptr);
443 : :
444 : 98 : return handle;
445 : 49 : }
446 : :
447 : 2 : uint64_t EpollBackend::run_every(ActorId actor, int interval_ms) {
448 : 2 : std::lock_guard<std::mutex> lock(mutex_);
449 : :
450 : 2 : uint64_t handle = next_timer_handle_++;
451 : :
452 : : struct timespec ts;
453 : 2 : clock_gettime(CLOCK_MONOTONIC, &ts);
454 : 2 : int64_t expires_at_ms = ts.tv_sec * 1000 + ts.tv_nsec / 1000000 + interval_ms;
455 : :
456 : 2 : TimerEntry entry;
457 : 2 : entry.expires_at_ms = expires_at_ms;
458 : 2 : entry.actor = actor;
459 : 2 : entry.interval_ms = interval_ms;
460 : 2 : entry.handle = handle;
461 : :
462 : 2 : timers_.push_back(entry);
463 : :
464 : : // Sort timers by expiry
465 : 2 : std::sort(timers_.begin(), timers_.end(),
466 : 0 : [](const TimerEntry& a, const TimerEntry& b) {
467 : 0 : return a.expires_at_ms < b.expires_at_ms;
468 : : });
469 : :
470 : : // Update timerfd to fire at earliest timer (relative delay from now)
471 : 2 : int64_t front_delay = timers_.front().expires_at_ms -
472 : 2 : (ts.tv_sec * 1000 + ts.tv_nsec / 1000000);
473 : 2 : if (front_delay < 1)
474 : 0 : front_delay = 1;
475 : 2 : struct itimerspec new_val{};
476 : 2 : new_val.it_value.tv_sec = front_delay / 1000;
477 : 2 : new_val.it_value.tv_nsec = (front_delay % 1000) * 1000000;
478 : 2 : timerfd_settime(timerfd_, 0, &new_val, nullptr);
479 : :
480 : 4 : return handle;
481 : 2 : }
482 : :
483 : 2 : void EpollBackend::cancel_timer(uint64_t handle) {
484 : 2 : std::lock_guard<std::mutex> lock(mutex_);
485 : 2 : cancelled_timers_.insert(handle);
486 : 2 : handle_to_timer_index_.erase(handle);
487 : 2 : }
488 : :
489 : 8 : void EpollBackend::async_send(int fd, const iovec* bufs, int buf_count,
490 : : ActorId actor, uint32_t op_type) {
491 : : // Concatenate scatter-gather buffers
492 : 8 : size_t total_len = 0;
493 : 17 : for (int i = 0; i < buf_count; ++i) {
494 : 9 : total_len += bufs[i].iov_len;
495 : : }
496 : :
497 : 8 : std::vector<uint8_t> data(total_len);
498 : 8 : size_t offset = 0;
499 : 17 : for (int i = 0; i < buf_count; ++i) {
500 : 9 : std::memcpy(data.data() + offset, bufs[i].iov_base, bufs[i].iov_len);
501 : 9 : offset += bufs[i].iov_len;
502 : : }
503 : :
504 : 8 : size_t total_n = 0;
505 : 8 : int last_err = 0;
506 : :
507 : : // Loop until EAGAIN to drain send buffer completely (edge-triggered
508 : : // requirement)
509 : : while (true) {
510 : 8 : ssize_t n = ::send(fd, data.data() + total_n, data.size() - total_n, 0);
511 : 8 : if (n < 0) {
512 : 1 : if (errno == EAGAIN) {
513 : 0 : break; // Would block
514 : : }
515 : 1 : last_err = errno;
516 : 1 : break;
517 : : }
518 : 7 : total_n += static_cast<size_t>(n);
519 : 7 : if (total_n >= data.size()) {
520 : 7 : break; // All data sent
521 : : }
522 : : // Continue looping to send more
523 : 0 : }
524 : :
525 : 8 : if (total_n == 0 && last_err == 0) {
526 : : // Would block - store pending operation for wait() to process
527 : 0 : PendingOp op;
528 : 0 : op.actor = actor;
529 : 0 : op.op_type = op_type;
530 : 0 : op.data = std::move(data);
531 : 0 : op.buf_count = buf_count;
532 : 0 : pending_ops_[fd] = std::move(op);
533 : 0 : return;
534 : 0 : }
535 : :
536 : 8 : int result = (last_err != 0) ? last_err : static_cast<int>(total_n);
537 : :
538 : 8 : OpCompletion completion{
539 : : .actor = actor,
540 : : .type = static_cast<OpType>(op_type),
541 : : .fd = fd,
542 : : .result = result,
543 : : .user_data = 0,
544 : 8 : };
545 : : {
546 : 8 : std::lock_guard<std::mutex> lock(completions_mutex_);
547 : 8 : pending_completions_.push_back(completion);
548 : 8 : }
549 : 8 : }
550 : :
551 : 2 : void EpollBackend::async_recv(int fd, const iovec* bufs, int buf_count,
552 : : ActorId actor, uint32_t op_type) {
553 : 2 : ssize_t total_n = 0;
554 : 2 : int last_err = 0;
555 : :
556 : : // Loop until EAGAIN to drain all available data (edge-triggered
557 : : // requirement)
558 : : while (true) {
559 : 4 : ssize_t n = ::readv(fd, bufs, buf_count);
560 : 4 : if (n < 0) {
561 : 2 : if (errno == EAGAIN) {
562 : 2 : break; // No more data available right now
563 : : }
564 : 0 : last_err = errno;
565 : 0 : break;
566 : : }
567 : 2 : total_n += n;
568 : 2 : if (n == 0) {
569 : 0 : break; // EOF
570 : : }
571 : : // Continue looping to check for more data
572 : 2 : }
573 : :
574 : 2 : if (total_n == 0 && last_err == 0) {
575 : : // Socket not ready - store pending operation for wait() to process
576 : 0 : PendingOp op;
577 : 0 : op.actor = actor;
578 : 0 : op.op_type = op_type;
579 : 0 : op.buf_count = std::min(buf_count, 16);
580 : 0 : for (int i = 0; i < op.buf_count; ++i) {
581 : 0 : op.saved_bufs[i] = bufs[i];
582 : : }
583 : 0 : pending_ops_[fd] = std::move(op);
584 : 0 : return;
585 : 0 : }
586 : :
587 : 2 : int result = (last_err != 0) ? last_err : static_cast<int>(total_n);
588 : :
589 : 2 : OpCompletion completion{
590 : : .actor = actor,
591 : : .type = static_cast<OpType>(op_type),
592 : : .fd = fd,
593 : : .result = result,
594 : : .user_data = 0,
595 : 2 : };
596 : : {
597 : 2 : std::lock_guard<std::mutex> lock(completions_mutex_);
598 : 2 : pending_completions_.push_back(completion);
599 : 2 : }
600 : : }
601 : :
602 : 0 : void EpollBackend::async_send_fixed(int fd, int buffer_id, size_t offset,
603 : : size_t len, ActorId actor, uint32_t op_type) {
604 : : (void)fd;
605 : : (void)buffer_id;
606 : : (void)offset;
607 : : (void)len;
608 : : // Fixed buffers not supported with epoll
609 : 0 : OpCompletion completion{
610 : : .actor = actor,
611 : : .type = static_cast<OpType>(op_type),
612 : : .fd = fd,
613 : : .result = -1,
614 : : .user_data = 0,
615 : 0 : };
616 : : {
617 : 0 : std::lock_guard<std::mutex> lock(completions_mutex_);
618 : 0 : pending_completions_.push_back(completion);
619 : 0 : }
620 : 0 : }
621 : :
622 : 0 : void EpollBackend::async_recv_fixed(int fd, int buffer_id, size_t offset,
623 : : size_t len, ActorId actor, uint32_t op_type) {
624 : : (void)fd;
625 : : (void)buffer_id;
626 : : (void)offset;
627 : : (void)len;
628 : : // Fixed buffers not supported with epoll
629 : 0 : OpCompletion completion{
630 : : .actor = actor,
631 : : .type = static_cast<OpType>(op_type),
632 : : .fd = fd,
633 : : .result = -1,
634 : : .user_data = 0,
635 : 0 : };
636 : : {
637 : 0 : std::lock_guard<std::mutex> lock(completions_mutex_);
638 : 0 : pending_completions_.push_back(completion);
639 : 0 : }
640 : 0 : }
641 : :
642 : 0 : void EpollBackend::async_accept(int fd, ActorId actor) {
643 : 0 : int client_fd = ::accept(fd, nullptr, nullptr);
644 : 0 : if (client_fd < 0 && errno == EAGAIN) {
645 : : // Would block - register fd with epoll for edge-triggered accept
646 : : // then store pending accept for wait() to process
647 : : struct epoll_event ev;
648 : 0 : ev.events = EPOLLIN | EPOLLET;
649 : 0 : ev.data.fd = fd;
650 : 0 : if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0) {
651 : : // Failed to register - deliver error completion
652 : 0 : OpCompletion completion{
653 : : .actor = actor,
654 : : .type = OpType::Accept,
655 : : .fd = -1,
656 : 0 : .result = -errno,
657 : : .user_data = 0,
658 : 0 : };
659 : : {
660 : 0 : std::lock_guard<std::mutex> lock(completions_mutex_);
661 : 0 : pending_completions_.push_back(completion);
662 : 0 : }
663 : 0 : return;
664 : : }
665 : 0 : PendingOp op;
666 : 0 : op.actor = actor;
667 : 0 : op.op_type = static_cast<uint32_t>(OpType::Accept);
668 : 0 : pending_ops_[fd] = std::move(op);
669 : 0 : return;
670 : 0 : }
671 : :
672 : 0 : if (client_fd >= 0) {
673 : : // Register client_fd with event loop before delivering completion
674 : : // so caller can immediately use it with async_recv
675 : 0 : add_fd(client_fd, IoEvent::Read);
676 : : }
677 : :
678 : 0 : OpCompletion completion{
679 : : .actor = actor,
680 : : .type = OpType::Accept,
681 : 0 : .fd = (client_fd >= 0) ? client_fd : -1,
682 : 0 : .result = (client_fd >= 0) ? client_fd : errno,
683 : : .user_data = 0,
684 : 0 : };
685 : : {
686 : 0 : std::lock_guard<std::mutex> lock(completions_mutex_);
687 : 0 : pending_completions_.push_back(completion);
688 : 0 : }
689 : : }
690 : :
691 : 0 : void EpollBackend::async_connect(int fd, const sockaddr* addr,
692 : : socklen_t addrlen, ActorId actor) {
693 : 0 : int ret = ::connect(fd, addr, addrlen);
694 : 0 : if (ret < 0 && errno == EINPROGRESS) {
695 : : // Connection in progress - store for wait() to complete
696 : 0 : PendingOp op;
697 : 0 : op.actor = actor;
698 : 0 : op.op_type = static_cast<uint32_t>(OpType::Connect);
699 : 0 : if (addrlen <= sizeof(op.addr)) {
700 : 0 : std::memcpy(&op.addr, addr, addrlen);
701 : 0 : op.addrlen = addrlen;
702 : : }
703 : 0 : pending_ops_[fd] = std::move(op);
704 : 0 : return;
705 : 0 : }
706 : :
707 : 0 : int result = (ret < 0) ? errno : 0;
708 : :
709 : 0 : OpCompletion completion{
710 : : .actor = actor,
711 : : .type = OpType::Connect,
712 : : .fd = fd,
713 : : .result = result,
714 : : .user_data = 0,
715 : 0 : };
716 : : {
717 : 0 : std::lock_guard<std::mutex> lock(completions_mutex_);
718 : 0 : pending_completions_.push_back(completion);
719 : 0 : }
720 : : }
721 : :
722 : 1 : void EpollBackend::async_recvfrom(int fd, const iovec* bufs, int buf_count,
723 : : ActorId actor, uint32_t op_type) {
724 : : struct sockaddr_storage addr;
725 : 1 : socklen_t addrlen = sizeof(addr);
726 : 1 : ssize_t total_n = 0;
727 : 1 : int last_err = 0;
728 : :
729 : 1 : struct msghdr msg = {};
730 : 1 : msg.msg_name = &addr;
731 : 1 : msg.msg_namelen = addrlen;
732 : 1 : msg.msg_iov = const_cast<iovec*>(bufs);
733 : 1 : msg.msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(buf_count);
734 : :
735 : : // Loop until EAGAIN to drain all available data (edge-triggered
736 : : // requirement)
737 : : while (true) {
738 : 2 : ssize_t n = ::recvmsg(fd, &msg, 0);
739 : 2 : if (n < 0) {
740 : 1 : if (errno == EAGAIN) {
741 : 1 : break; // No more data available right now
742 : : }
743 : 0 : last_err = errno;
744 : 0 : break;
745 : : }
746 : 1 : total_n += n;
747 : 1 : if (n == 0) {
748 : 0 : break; // EOF
749 : : }
750 : : // Continue looping to check for more data
751 : 1 : }
752 : :
753 : 1 : if (total_n == 0 && last_err == 0) {
754 : : // Socket not ready - store pending for wait() to process
755 : 0 : PendingOp op;
756 : 0 : op.actor = actor;
757 : 0 : op.op_type = op_type;
758 : 0 : op.buf_count = std::min(buf_count, 16);
759 : 0 : for (int i = 0; i < op.buf_count; ++i) {
760 : 0 : op.saved_bufs[i] = bufs[i];
761 : : }
762 : 0 : pending_ops_[fd] = std::move(op);
763 : 0 : return;
764 : 0 : }
765 : :
766 : 1 : int result = (last_err != 0) ? last_err : static_cast<int>(total_n);
767 : :
768 : 1 : OpCompletion completion{
769 : : .actor = actor,
770 : : .type = static_cast<OpType>(op_type),
771 : : .fd = fd,
772 : : .result = result,
773 : : .user_data = 0,
774 : 1 : };
775 : : {
776 : 1 : std::lock_guard<std::mutex> lock(completions_mutex_);
777 : 1 : pending_completions_.push_back(completion);
778 : 1 : }
779 : : }
780 : :
781 : 1 : void EpollBackend::async_sendto(int fd, const iovec* bufs, int buf_count,
782 : : const sockaddr* addr, socklen_t addrlen,
783 : : ActorId actor, uint32_t op_type) {
784 : : // Concatenate scatter-gather buffers
785 : 1 : size_t total_len = 0;
786 : 2 : for (int i = 0; i < buf_count; ++i) {
787 : 1 : total_len += bufs[i].iov_len;
788 : : }
789 : 1 : std::vector<uint8_t> data(total_len);
790 : 1 : size_t offset = 0;
791 : 2 : for (int i = 0; i < buf_count; ++i) {
792 : 1 : std::memcpy(data.data() + offset, bufs[i].iov_base, bufs[i].iov_len);
793 : 1 : offset += bufs[i].iov_len;
794 : : }
795 : :
796 : 1 : size_t total_n = 0;
797 : 1 : int last_err = 0;
798 : :
799 : : // Loop until EAGAIN to drain send buffer completely (edge-triggered
800 : : // requirement)
801 : : while (true) {
802 : 1 : ssize_t n = ::sendto(fd, data.data() + total_n, data.size() - total_n,
803 : : 0, addr, addrlen);
804 : 1 : if (n < 0) {
805 : 0 : if (errno == EAGAIN) {
806 : 0 : break; // Would block
807 : : }
808 : 0 : last_err = errno;
809 : 0 : break;
810 : : }
811 : 1 : total_n += static_cast<size_t>(n);
812 : 1 : if (total_n >= data.size()) {
813 : 1 : break; // All data sent
814 : : }
815 : : // Continue looping to send more
816 : 0 : }
817 : :
818 : 1 : if (total_n == 0 && last_err == 0) {
819 : : // Would block - store pending for wait() to process
820 : 0 : PendingOp op;
821 : 0 : op.actor = actor;
822 : 0 : op.op_type = op_type;
823 : 0 : op.data = std::move(data);
824 : 0 : if (addrlen <= sizeof(op.addr)) {
825 : 0 : std::memcpy(&op.addr, addr, addrlen);
826 : 0 : op.addrlen = addrlen;
827 : : }
828 : 0 : pending_ops_[fd] = std::move(op);
829 : 0 : return;
830 : 0 : }
831 : :
832 : 1 : int result = (last_err != 0) ? last_err : static_cast<int>(total_n);
833 : :
834 : 1 : OpCompletion completion{
835 : : .actor = actor,
836 : : .type = static_cast<OpType>(op_type),
837 : : .fd = fd,
838 : : .result = result,
839 : : .user_data = 0,
840 : 1 : };
841 : : {
842 : 1 : std::lock_guard<std::mutex> lock(completions_mutex_);
843 : 1 : pending_completions_.push_back(completion);
844 : 1 : }
845 : 1 : }
846 : :
847 : 81 : int EpollBackend::wait(int timeout_ms) {
848 : : struct epoll_event events[16];
849 : 81 : int num_events = epoll_wait(epoll_fd_, events, 16, timeout_ms);
850 : :
851 : 81 : if (num_events < 0) {
852 : 0 : if (errno == EINTR)
853 : 0 : return 0;
854 : 0 : return -1;
855 : : }
856 : :
857 : 141 : for (int i = 0; i < num_events; ++i) {
858 : 60 : dispatch_event(events[i].data.fd, events[i].events);
859 : : }
860 : :
861 : 81 : return num_events;
862 : : }
863 : :
864 : 58 : void EpollBackend::set_read_handler(int fd, read_callback handler) {
865 : 58 : std::lock_guard<std::mutex> lock(mutex_);
866 : 58 : if (handler) {
867 : 58 : read_handlers_[fd] = std::move(handler);
868 : : } else {
869 : 0 : read_handlers_.erase(fd);
870 : : }
871 : 58 : }
872 : :
873 : 76 : void EpollBackend::clear_read_handler(int fd) {
874 : 76 : std::lock_guard<std::mutex> lock(mutex_);
875 : 76 : read_handlers_.erase(fd);
876 : 76 : }
877 : :
878 : 40 : void EpollBackend::service_read_handler(int fd) {
879 : 40 : read_callback cb;
880 : : {
881 : 40 : std::lock_guard<std::mutex> lock(mutex_);
882 : 40 : auto it = read_handlers_.find(fd);
883 : 40 : if (it == read_handlers_.end())
884 : 2 : return;
885 : 38 : cb = it->second;
886 : 40 : }
887 : 38 : cb(fd);
888 : 40 : }
889 : :
890 : 18 : void EpollBackend::set_write_handler(int fd, write_callback handler) {
891 : 18 : std::lock_guard<std::mutex> lock(mutex_);
892 : 18 : if (handler) {
893 : 18 : write_handlers_[fd] = std::move(handler);
894 : : } else {
895 : 0 : write_handlers_.erase(fd);
896 : : }
897 : 18 : }
898 : :
899 : 0 : void EpollBackend::clear_write_handler(int fd) {
900 : 0 : std::lock_guard<std::mutex> lock(mutex_);
901 : 0 : write_handlers_.erase(fd);
902 : 0 : }
903 : :
904 : 0 : void EpollBackend::service_write_handler(int fd) {
905 : 0 : write_callback cb;
906 : : {
907 : 0 : std::lock_guard<std::mutex> lock(mutex_);
908 : 0 : auto it = write_handlers_.find(fd);
909 : 0 : if (it == write_handlers_.end())
910 : 0 : return;
911 : 0 : cb = it->second;
912 : 0 : }
913 : 0 : cb(fd);
914 : 0 : }
915 : :
916 : 86 : void EpollBackend::process_events() {
917 : 86 : std::vector<OpCompletion> completions;
918 : : {
919 : 86 : std::lock_guard<std::mutex> lock(completions_mutex_);
920 : 86 : completions = std::move(pending_completions_);
921 : 86 : pending_completions_.clear();
922 : 86 : }
923 : 98 : for (auto& completion : completions) {
924 : 12 : deliver_completion(completion);
925 : : }
926 : 86 : }
927 : :
928 : 45 : void EpollBackend::deliver_completion(OpCompletion completion) {
929 : 45 : if (loop_) {
930 : 45 : loop_->enqueue_completion(completion);
931 : : }
932 : 45 : }
933 : :
934 : : #else // !defined(__linux__)
935 : :
936 : : // Stub implementations for non-Linux
937 : :
938 : : EpollBackend::EpollBackend() = default;
939 : : EpollBackend::~EpollBackend() = default;
940 : :
941 : : bool EpollBackend::start() {
942 : : return false;
943 : : }
944 : : void EpollBackend::stop() {}
945 : :
946 : : bool EpollBackend::add_fd(int fd, IoEvent events) {
947 : : (void)fd;
948 : : (void)events;
949 : : return false;
950 : : }
951 : : bool EpollBackend::update_fd(int fd, IoEvent events) {
952 : : (void)fd;
953 : : (void)events;
954 : : return false;
955 : : }
956 : : bool EpollBackend::remove_fd(int fd) {
957 : : (void)fd;
958 : : return false;
959 : : }
960 : :
961 : : int EpollBackend::register_buffer(const void* addr, size_t len) {
962 : : (void)addr;
963 : : (void)len;
964 : : return -1;
965 : : }
966 : : bool EpollBackend::unregister_buffer(int buffer_id) {
967 : : (void)buffer_id;
968 : : return false;
969 : : }
970 : :
971 : : void EpollBackend::async_send(int fd, const iovec* bufs, int buf_count,
972 : : ActorId actor, uint32_t op_type) {
973 : : (void)fd;
974 : : (void)bufs;
975 : : (void)buf_count;
976 : : (void)actor;
977 : : (void)op_type;
978 : : }
979 : : void EpollBackend::async_recv(int fd, const iovec* bufs, int buf_count,
980 : : ActorId actor, uint32_t op_type) {
981 : : (void)fd;
982 : : (void)bufs;
983 : : (void)buf_count;
984 : : (void)actor;
985 : : (void)op_type;
986 : : }
987 : :
988 : : void EpollBackend::async_send_fixed(int fd, int buffer_id, size_t offset,
989 : : size_t len, ActorId actor, uint32_t op_type) {
990 : : (void)fd;
991 : : (void)buffer_id;
992 : : (void)offset;
993 : : (void)len;
994 : : (void)actor;
995 : : (void)op_type;
996 : : }
997 : : void EpollBackend::async_recv_fixed(int fd, int buffer_id, size_t offset,
998 : : size_t len, ActorId actor, uint32_t op_type) {
999 : : (void)fd;
1000 : : (void)buffer_id;
1001 : : (void)offset;
1002 : : (void)len;
1003 : : (void)actor;
1004 : : (void)op_type;
1005 : : }
1006 : :
1007 : : void EpollBackend::async_accept(int fd, ActorId actor) {
1008 : : (void)fd;
1009 : : (void)actor;
1010 : : }
1011 : : void EpollBackend::async_connect(int fd, const sockaddr* addr,
1012 : : socklen_t addrlen, ActorId actor) {
1013 : : (void)fd;
1014 : : (void)addr;
1015 : : (void)addrlen;
1016 : : (void)actor;
1017 : : }
1018 : :
1019 : : void EpollBackend::async_recvfrom(int fd, const iovec* bufs, int buf_count,
1020 : : ActorId actor, uint32_t op_type) {
1021 : : (void)fd;
1022 : : (void)bufs;
1023 : : (void)buf_count;
1024 : : (void)actor;
1025 : : (void)op_type;
1026 : : }
1027 : : void EpollBackend::async_sendto(int fd, const iovec* bufs, int buf_count,
1028 : : const sockaddr* addr, socklen_t addrlen,
1029 : : ActorId actor, uint32_t op_type) {
1030 : : (void)fd;
1031 : : (void)bufs;
1032 : : (void)buf_count;
1033 : : (void)addr;
1034 : : (void)addrlen;
1035 : : (void)actor;
1036 : : (void)op_type;
1037 : : }
1038 : :
1039 : : uint64_t EpollBackend::run_after(ActorId actor, int delay_ms) {
1040 : : (void)actor;
1041 : : (void)delay_ms;
1042 : : return 0;
1043 : : }
1044 : : uint64_t EpollBackend::run_every(ActorId actor, int interval_ms) {
1045 : : (void)actor;
1046 : : (void)interval_ms;
1047 : : return 0;
1048 : : }
1049 : : void EpollBackend::cancel_timer(uint64_t handle) {
1050 : : (void)handle;
1051 : : }
1052 : :
1053 : : int EpollBackend::wait(int timeout_ms) {
1054 : : (void)timeout_ms;
1055 : : return -1;
1056 : : }
1057 : : void EpollBackend::process_events() {}
1058 : :
1059 : : void EpollBackend::deliver_completion(OpCompletion completion) {
1060 : : (void)completion;
1061 : : }
1062 : :
1063 : : void EpollBackend::set_write_handler(int fd, write_callback handler) {
1064 : : (void)fd;
1065 : : (void)handler;
1066 : : }
1067 : : void EpollBackend::clear_write_handler(int fd) {
1068 : : (void)fd;
1069 : : }
1070 : : void EpollBackend::service_write_handler(int fd) {
1071 : : (void)fd;
1072 : : }
1073 : :
1074 : : #endif // defined(__linux__)
1075 : :
1076 : : } // namespace net
1077 : : } // namespace hpactor
|