Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #pragma once
16 : :
17 : : #include <hpactor/net/async_io_fwd.hpp>
18 : : #include <hpactor/net/reactor_backend.hpp>
19 : :
20 : : #if defined(__linux__)
21 : : # include <sys/epoll.h>
22 : : # include <sys/timerfd.h>
23 : : #else
24 : : // Stub definitions for non-Linux compilation
25 : : struct epoll_event {
26 : : uint32_t events;
27 : : void* ptr;
28 : : };
29 : : #endif
30 : :
31 : : #include <atomic>
32 : : #include <mutex>
33 : : #include <unordered_map>
34 : : #include <unordered_set>
35 : : #include <vector>
36 : :
37 : : namespace hpactor {
38 : : namespace net {
39 : :
40 : : // Forward declaration to avoid circular include
41 : : class EventLoop;
42 : :
43 : : class EpollBackend : public IReactorBackend {
44 : : public:
45 : : EpollBackend();
46 : : ~EpollBackend() override;
47 : :
48 : : bool start() override;
49 : : void stop() override;
50 : :
51 : : bool add_fd(int fd, IoEvent events) override;
52 : : bool update_fd(int fd, IoEvent events) override;
53 : : bool remove_fd(int fd) override;
54 : :
55 : : int register_buffer(const void* addr, size_t len) override;
56 : : bool unregister_buffer(int buffer_id) override;
57 : :
58 : : uint64_t run_after(ActorId actor, int delay_ms) override;
59 : : uint64_t run_every(ActorId actor, int interval_ms) override;
60 : : void cancel_timer(uint64_t handle) override;
61 : :
62 : : int wait(int timeout_ms) override;
63 : : void process_events() override;
64 : :
65 : : // Proactor methods
66 : : void async_send(int fd, const iovec* bufs, int buf_count, ActorId actor,
67 : : uint32_t op_type) override;
68 : : void async_recv(int fd, const iovec* bufs, int buf_count, ActorId actor,
69 : : uint32_t op_type) override;
70 : : void async_accept(int fd, ActorId actor) override;
71 : : void async_connect(int fd, const sockaddr* addr, socklen_t addrlen,
72 : : ActorId actor) override;
73 : : void async_sendto(int fd, const iovec* bufs, int buf_count,
74 : : const sockaddr* addr, socklen_t addrlen, ActorId actor,
75 : : uint32_t op_type) override;
76 : : void async_recvfrom(int fd, const iovec* bufs, int buf_count, ActorId actor,
77 : : uint32_t op_type) override;
78 : :
79 : : // Read handler management — epoll reads data in wait() and dispatches
80 : : void set_read_handler(int fd, read_callback handler) override;
81 : : void clear_read_handler(int fd) override;
82 : :
83 : 16 : bool supports_read_handler() const override { return true; }
84 : :
85 : : // Write handler management — epoll dispatches writable events via callback
86 : : void set_write_handler(int fd, write_callback handler) override;
87 : : void clear_write_handler(int fd) override;
88 : :
89 : 0 : bool supports_write_handler() const override { return true; }
90 : :
91 : : // Set the EventLoop pointer for routing completions
92 : 75 : void set_loop(net::EventLoop* loop) {
93 : 75 : loop_ = loop;
94 : 75 : }
95 : :
96 : : // Called by completions to deliver to actor
97 : : void deliver_completion(OpCompletion completion);
98 : :
99 : : // --- Extended proactor methods (not in IReactorBackend) ---
100 : : void async_send_fixed(int fd, int buffer_id, size_t offset, size_t len,
101 : : ActorId actor, uint32_t op_type);
102 : : void async_recv_fixed(int fd, int buffer_id, size_t offset, size_t len,
103 : : ActorId actor, uint32_t op_type);
104 : :
105 : : private:
106 : : // Timer entry
107 : : struct TimerEntry {
108 : : int64_t expires_at_ms; // absolute time in ms
109 : : ActorId actor;
110 : : int interval_ms; // 0 for one-shot, >0 for repeating
111 : : uint64_t handle;
112 : : };
113 : :
114 : : // Pending I/O operation tracked per fd
115 : : struct PendingOp {
116 : : ActorId actor;
117 : : uint32_t op_type;
118 : : std::vector<uint8_t> data; // concatenated buffers for send
119 : : int buf_count = 0;
120 : : iovec saved_bufs[16]; // original buffers for recv
121 : : sockaddr_storage addr; // for connect/recvfrom/sendto
122 : : socklen_t addrlen = sizeof(addr);
123 : : };
124 : :
125 : : // Encode fd+actor+op_type into user_data
126 : : static uint64_t encode_user_data(int fd, ActorId actor, uint32_t op_type);
127 : : static void decode_user_data(uint64_t user_data, int& fd, ActorId& actor,
128 : : uint32_t& op_type);
129 : :
130 : : // Process expired timers, returns number of triggered timers
131 : : int process_timers();
132 : :
133 : : // Process a pending I/O operation for a socket event
134 : : void process_pending_op(int fd, uint32_t events);
135 : :
136 : : // Dispatch a single epoll event to the appropriate handler
137 : : void dispatch_event(int fd, uint32_t events);
138 : :
139 : : // Sub-methods for process_pending_op — each handles one op type
140 : : void try_pending_accept(int fd, PendingOp& op);
141 : : bool try_pending_connect(int fd, PendingOp& op, uint32_t events, int& error);
142 : : bool try_pending_send(int fd, PendingOp& op, uint32_t events,
143 : : ssize_t& total, int& error);
144 : : bool try_pending_recv(int fd, PendingOp& op, uint32_t events,
145 : : ssize_t& total, int& error);
146 : :
147 : : // Enqueue an OpCompletion for a completed pending operation
148 : : void deliver_op_completion(const PendingOp& op, OpType optype, int fd,
149 : : ssize_t total, int error);
150 : :
151 : : int epoll_fd_ = -1;
152 : : int timerfd_ = -1;
153 : :
154 : : // EventLoop pointer for routing completions
155 : : net::EventLoop* loop_ = nullptr;
156 : :
157 : : // Timer management
158 : : std::vector<TimerEntry> timers_; // sorted by expires_at_ms
159 : : std::unordered_set<uint64_t> cancelled_timers_;
160 : : std::unordered_map<uint64_t, size_t> handle_to_timer_index_;
161 : : std::atomic<uint64_t> next_timer_handle_{1};
162 : :
163 : : // fd -> registered events for update tracking
164 : : std::unordered_map<int, uint32_t> fd_events_;
165 : :
166 : : // fd -> pending I/O operation for true async I/O
167 : : std::unordered_map<int, PendingOp> pending_ops_;
168 : :
169 : : // Registered buffers (not supported in epoll - always empty)
170 : : std::vector<std::pair<const void*, size_t>> registered_buffers_;
171 : :
172 : : bool running_ = false;
173 : :
174 : : // Thread safety
175 : : std::mutex mutex_;
176 : :
177 : : // Pending completions from async_* calls (for process_completions)
178 : : std::vector<OpCompletion> pending_completions_;
179 : : mutable std::mutex completions_mutex_;
180 : :
181 : : // Read handler callbacks for PlainConnection-style consumers
182 : : std::unordered_map<int, read_callback> read_handlers_;
183 : :
184 : : // Read data from fd and dispatch to registered read handler
185 : : void service_read_handler(int fd);
186 : :
187 : : // Write handler callbacks for non-blocking connect completion
188 : : std::unordered_map<int, write_callback> write_handlers_;
189 : :
190 : : // Dispatch writable event to registered write handler
191 : : void service_write_handler(int fd);
192 : : };
193 : :
194 : : } // namespace net
195 : : } // namespace hpactor
|