Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/core/actor_system.hpp>
16 : : #include <hpactor/hpactor_config.hpp>
17 : : #include <hpactor/net/event_loop.hpp>
18 : : #include <hpactor/net/proactor_dispatcher.hpp>
19 : :
20 : : #if defined(__APPLE__)
21 : : # include <hpactor/net/reactor/kqueue_backend.hpp>
22 : : # if HPACTOR_ENABLE_PROACTOR
23 : : # include <hpactor/net/proactor/gcd_backend.hpp>
24 : : # endif
25 : : #elif defined(__linux__)
26 : : # include <hpactor/net/reactor/epoll_backend.hpp>
27 : : # if HPACTOR_ENABLE_PROACTOR
28 : : # include <hpactor/net/proactor/iouring_backend.hpp>
29 : : # endif
30 : : #endif
31 : :
32 : : namespace hpactor {
33 : :
34 : : namespace net {
35 : :
36 : 75 : EventLoop::EventLoop()
37 : 75 : : proactor_dispatcher_(std::make_unique<ProactorDispatcher>()) {
38 : 75 : proactor_dispatcher_->set_timer_handler([this](uint64_t user_data) {
39 : 33 : OpCompletion completion{};
40 : 33 : completion.type = OpType::TimerFired;
41 : 33 : completion.user_data = user_data;
42 : 33 : deliver_timer_completion(completion);
43 : 33 : });
44 : :
45 : : #if defined(__APPLE__)
46 : : // Try kqueue first (for sync I/O testing)
47 : : auto kqueue_backend = std::make_unique<KqueueBackend>();
48 : : if (kqueue_backend->start()) {
49 : : backend_name_ = "kqueue";
50 : : static_cast<KqueueBackend*>(kqueue_backend.get())->set_loop(this);
51 : : backend_ = std::move(kqueue_backend);
52 : : # if HPACTOR_ENABLE_PROACTOR
53 : : } else {
54 : : // Fall back to GCD only when Proactor mode is enabled
55 : : auto gcd_backend = std::make_unique<GcdBackend>();
56 : : if (gcd_backend->start()) {
57 : : backend_name_ = "gcd";
58 : : static_cast<GcdBackend*>(gcd_backend.get())->set_loop(this);
59 : : backend_ = std::move(gcd_backend);
60 : : }
61 : : # endif
62 : : }
63 : : #elif defined(__linux__)
64 : : # if HPACTOR_ENABLE_PROACTOR
65 : : // Try io_uring first (preferred on Linux)
66 : : auto iouring_backend = std::make_unique<IoUringBackend>();
67 : : if (iouring_backend->start()) {
68 : : backend_name_ = "iouring";
69 : : backend_ = std::move(iouring_backend);
70 : : } else {
71 : : // Fall back to epoll
72 : : auto epoll_backend = std::make_unique<EpollBackend>();
73 : : if (epoll_backend->start()) {
74 : : backend_name_ = "epoll";
75 : : static_cast<EpollBackend*>(epoll_backend.get())->set_loop(this);
76 : : backend_ = std::move(epoll_backend);
77 : : }
78 : : }
79 : : # else
80 : : // Reactor mode: use epoll directly
81 : 75 : auto epoll_backend = std::make_unique<EpollBackend>();
82 : 75 : if (epoll_backend->start()) {
83 : 75 : backend_name_ = "epoll";
84 : 75 : static_cast<EpollBackend*>(epoll_backend.get())->set_loop(this);
85 : 75 : backend_ = std::move(epoll_backend);
86 : : }
87 : : # endif
88 : : #endif
89 : 75 : }
90 : :
91 : 75 : EventLoop::~EventLoop() = default;
92 : :
93 : 9 : bool EventLoop::run() {
94 : 9 : if (running_.load()) {
95 : 0 : return true;
96 : : }
97 : 9 : if (!backend_) {
98 : 0 : return false;
99 : : }
100 : 9 : running_.store(backend_->start());
101 : 9 : return running_.load();
102 : : }
103 : :
104 : 6 : void EventLoop::stop() {
105 : 6 : running_.store(false);
106 : 6 : if (backend_) {
107 : 6 : backend_->stop();
108 : : }
109 : 6 : }
110 : :
111 : 0 : const char* EventLoop::backend_name() const {
112 : 0 : return backend_name_;
113 : : }
114 : :
115 : 81 : bool EventLoop::add_fd(int fd, Event events) {
116 : 81 : if (backend_ == nullptr) {
117 : 0 : return false;
118 : : }
119 : 81 : IoEvent io_events = IoEvent::Read;
120 : 81 : if ((static_cast<uint32_t>(events) & static_cast<uint32_t>(Event::Write)) != 0U) {
121 : 19 : io_events = static_cast<IoEvent>(static_cast<uint32_t>(io_events) |
122 : : static_cast<uint32_t>(IoEvent::Write));
123 : : }
124 : 81 : fd_events_[fd] = events;
125 : 81 : return backend_->add_fd(fd, io_events);
126 : : }
127 : :
128 : 1 : bool EventLoop::update_fd(int fd, Event events) {
129 : 1 : return add_fd(fd, events);
130 : : }
131 : :
132 : 80 : bool EventLoop::remove_fd(int fd) {
133 : 80 : fd_events_.erase(fd);
134 : 80 : if (!backend_) {
135 : 0 : return false;
136 : : }
137 : 80 : return backend_->remove_fd(fd);
138 : : }
139 : :
140 : 58 : void EventLoop::set_read_handler(int fd, read_callback handler) {
141 : 58 : if (backend_) {
142 : 58 : backend_->set_read_handler(fd, std::move(handler));
143 : : }
144 : 58 : }
145 : :
146 : 76 : void EventLoop::clear_read_handler(int fd) {
147 : 76 : if (backend_) {
148 : 76 : backend_->clear_read_handler(fd);
149 : : }
150 : 76 : }
151 : :
152 : 16 : bool EventLoop::supports_read_handler() const {
153 : 16 : return backend_ && backend_->supports_read_handler();
154 : : }
155 : :
156 : 18 : void EventLoop::set_write_handler(int fd, write_callback handler) {
157 : 18 : if (backend_) {
158 : 18 : backend_->set_write_handler(fd, std::move(handler));
159 : : }
160 : 18 : }
161 : :
162 : 0 : void EventLoop::clear_write_handler(int fd) {
163 : 0 : if (backend_) {
164 : 0 : backend_->clear_write_handler(fd);
165 : : }
166 : 0 : }
167 : :
168 : 0 : bool EventLoop::supports_write_handler() const {
169 : 0 : return backend_ && backend_->supports_write_handler();
170 : : }
171 : :
172 : 81 : int EventLoop::wait(int timeout_ms) {
173 : 81 : if (!backend_) {
174 : 0 : return -1;
175 : : }
176 : 81 : return backend_->wait(timeout_ms);
177 : : }
178 : :
179 : 1 : bool EventLoop::has_event(int /*fd*/, Event /*event*/) const {
180 : 1 : return true;
181 : : }
182 : :
183 : 49 : uint64_t EventLoop::run_after(timer_callback callback, int delay_ms) {
184 : 49 : if (!backend_) {
185 : 0 : return 0;
186 : : }
187 : 49 : uint64_t handle = next_timer_handle_++;
188 : 49 : timer_callbacks_[handle] = std::move(callback);
189 : : // Use ActorId(0) as a sentinel - we'll intercept timer completions
190 : : // The backend will deliver completion with user_data = handle
191 : 49 : uint64_t backend_handle = backend_->run_after(ActorId(0), delay_ms);
192 : 49 : if (backend_handle == 0) {
193 : 0 : timer_callbacks_.erase(handle);
194 : 0 : return 0;
195 : : }
196 : 49 : backend_handle_to_handle_[backend_handle] = handle;
197 : 49 : return handle;
198 : : }
199 : :
200 : 2 : uint64_t EventLoop::run_every(timer_callback callback, int interval_ms) {
201 : 2 : if (!backend_) {
202 : 0 : return 0;
203 : : }
204 : 2 : uint64_t handle = next_timer_handle_++;
205 : 2 : timer_callbacks_[handle] = std::move(callback);
206 : 2 : uint64_t backend_handle = backend_->run_every(ActorId(0), interval_ms);
207 : 2 : if (backend_handle == 0) {
208 : 0 : timer_callbacks_.erase(handle);
209 : 0 : return 0;
210 : : }
211 : 2 : backend_handle_to_handle_[backend_handle] = handle;
212 : 2 : repeating_timers_.insert(handle); // Mark as repeating timer
213 : 2 : return handle;
214 : : }
215 : :
216 : 2 : void EventLoop::cancel_timer(uint64_t timer_handle) {
217 : 2 : timer_callbacks_.erase(timer_handle);
218 : 2 : repeating_timers_.erase(timer_handle);
219 : 2 : if (backend_) {
220 : 2 : backend_->cancel_timer(timer_handle);
221 : : }
222 : 2 : }
223 : :
224 : 86 : void EventLoop::process_completions() {
225 : 86 : if (backend_) {
226 : 86 : backend_->process_events();
227 : : }
228 : 86 : }
229 : :
230 : 45 : void EventLoop::enqueue_completion(OpCompletion completion) {
231 : 45 : if (completion_callback_) {
232 : 8 : completion_callback_(completion);
233 : 8 : return;
234 : : }
235 : 37 : if (proactor_dispatcher_) {
236 : 37 : proactor_dispatcher_->on_completion(completion);
237 : : }
238 : : }
239 : :
240 : 33 : void EventLoop::deliver_timer_completion(OpCompletion completion) {
241 : 33 : uint64_t backend_handle = completion.user_data;
242 : 33 : auto it = backend_handle_to_handle_.find(backend_handle);
243 : 33 : if (it != backend_handle_to_handle_.end()) {
244 : 33 : uint64_t handle = it->second;
245 : 33 : auto callback_it = timer_callbacks_.find(handle);
246 : 33 : if (callback_it != timer_callbacks_.end()) {
247 : 33 : callback_it->second();
248 : : // Only erase callback for one-shot timers; repeating timers stay
249 : 33 : if (repeating_timers_.find(handle) == repeating_timers_.end()) {
250 : 29 : timer_callbacks_.erase(callback_it);
251 : 29 : backend_handle_to_handle_.erase(it);
252 : : }
253 : : }
254 : : }
255 : 33 : }
256 : :
257 : 0 : void EventLoop::set_actor_system(ActorSystem* actor_system) {
258 : 0 : actor_system_ = actor_system;
259 : 0 : if (proactor_dispatcher_) {
260 : 0 : proactor_dispatcher_->set_actor_system(actor_system);
261 : : }
262 : 0 : }
263 : :
264 : : } // namespace net
265 : : } // namespace hpactor
|