Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/net/wireframe_connection.hpp>
16 : :
17 : : #include <hpactor/log/logger.hpp>
18 : : #include <hpactor/net/event_loop.hpp>
19 : :
20 : : #include <cstring>
21 : : #include <sys/socket.h>
22 : : #include <unistd.h>
23 : :
24 : : namespace hpactor {
25 : :
26 : : namespace net {
27 : :
28 : 23 : WireFrameConnection::WireFrameConnection(int fd, EndPoint local_endpoint,
29 : 23 : EndPoint remote_endpoint, EventLoop* loop)
30 : 23 : : Connection(fd, local_endpoint, remote_endpoint, loop), is_sending_(false) {
31 : 23 : write_buffer_.reserve(kWriteChunkSize);
32 : 23 : }
33 : :
34 : 46 : WireFrameConnection::~WireFrameConnection() {
35 : 23 : close();
36 : 46 : }
37 : :
38 : : WireFrameConnectionPtr
39 : 3 : WireFrameConnection::create_as_client(int fd, EndPoint local_endpoint,
40 : : EndPoint remote_endpoint, EventLoop* loop) {
41 : : auto conn = std::shared_ptr<WireFrameConnection>(
42 : 3 : new WireFrameConnection(fd, local_endpoint, remote_endpoint, loop));
43 : 3 : conn->set_state(ConnectionState::Connected);
44 : :
45 : 3 : HPACTOR_LOG_DEBUG(log::LogCategory::kNetwork, ActorId{0}, 0,
46 : : "connection opened");
47 : :
48 : 3 : if (loop && fd >= 0) {
49 : 3 : loop->add_fd(fd, EventLoop::Event::Read);
50 : 3 : if (loop->supports_read_handler()) {
51 : 3 : std::weak_ptr<WireFrameConnection> weak_conn = conn;
52 : 3 : loop->set_read_handler(fd, [weak_conn](int /*event_fd*/) {
53 : 0 : if (auto self = weak_conn.lock()) {
54 : 0 : self->handle_read();
55 : 0 : }
56 : 0 : });
57 : 3 : }
58 : : }
59 : :
60 : 3 : return conn;
61 : : }
62 : :
63 : : WireFrameConnectionPtr
64 : 18 : WireFrameConnection::create_connecting_client(int fd, EndPoint local_endpoint,
65 : : EndPoint remote_endpoint,
66 : : EventLoop* loop) {
67 : : auto conn = std::shared_ptr<WireFrameConnection>(
68 : 18 : new WireFrameConnection(fd, local_endpoint, remote_endpoint, loop));
69 : 18 : conn->set_state(ConnectionState::Connecting);
70 : : // Event loop registration deferred — caller must verify connect and call
71 : : // setup_after_connect().
72 : 18 : return conn;
73 : : }
74 : :
75 : 0 : void WireFrameConnection::setup_after_connect(WireFrameConnectionPtr conn) {
76 : 0 : conn->set_state(ConnectionState::Connected);
77 : :
78 : 0 : HPACTOR_LOG_DEBUG(log::LogCategory::kNetwork, ActorId{0}, 0,
79 : : "connection opened");
80 : :
81 : 0 : auto* loop = conn->event_loop();
82 : 0 : int fd = conn->fd();
83 : 0 : if (loop && fd >= 0) {
84 : 0 : loop->add_fd(fd, EventLoop::Event::Read);
85 : 0 : if (loop->supports_read_handler()) {
86 : 0 : std::weak_ptr<WireFrameConnection> weak_conn = conn;
87 : 0 : loop->set_read_handler(fd, [weak_conn](int /*event_fd*/) {
88 : 0 : if (auto self = weak_conn.lock()) {
89 : 0 : self->handle_read();
90 : 0 : }
91 : 0 : });
92 : 0 : }
93 : : }
94 : 0 : }
95 : :
96 : : WireFrameConnectionPtr
97 : 2 : WireFrameConnection::create_as_server(int fd, EndPoint local_endpoint,
98 : : EndPoint remote_endpoint, EventLoop* loop) {
99 : : auto conn = std::shared_ptr<WireFrameConnection>(
100 : 2 : new WireFrameConnection(fd, local_endpoint, remote_endpoint, loop));
101 : 2 : conn->set_state(ConnectionState::Connected);
102 : :
103 : 2 : HPACTOR_LOG_DEBUG(log::LogCategory::kNetwork, ActorId{0}, 0,
104 : : "connection opened");
105 : :
106 : 2 : if (loop && fd >= 0) {
107 : 2 : loop->add_fd(fd, EventLoop::Event::Read);
108 : 2 : if (loop->supports_read_handler()) {
109 : 2 : std::weak_ptr<WireFrameConnection> weak_conn = conn;
110 : 2 : loop->set_read_handler(fd, [weak_conn](int /*event_fd*/) {
111 : 1 : if (auto self = weak_conn.lock()) {
112 : 1 : self->handle_read();
113 : 1 : }
114 : 1 : });
115 : 2 : }
116 : : }
117 : :
118 : 2 : return conn;
119 : : }
120 : :
121 : 18 : void WireFrameConnection::set_ready_handler(std::function<void(ConnectionPtr)> handler) {
122 : 18 : ready_handler_ = std::move(handler);
123 : 18 : }
124 : :
125 : 18 : void WireFrameConnection::set_frame_handler(frame_handler handler) {
126 : 18 : frame_handler_ = std::move(handler);
127 : 18 : }
128 : :
129 : 19 : void WireFrameConnection::set_error_handler(
130 : : std::function<void(ConnectionPtr, const error&)> handler) {
131 : 19 : error_handler_ = std::move(handler);
132 : 19 : }
133 : :
134 : 2 : void WireFrameConnection::set_send_completion_handler(
135 : : std::function<void(int result)> handler) {
136 : 2 : send_completion_handler_ = std::move(handler);
137 : 2 : }
138 : :
139 : 11 : void WireFrameConnection::send(const StreamBuffer& frame_data) {
140 : 11 : if (state_ != ConnectionState::Connected) {
141 : 0 : return;
142 : : }
143 : :
144 : 11 : HPACTOR_LOG_TRACE(
145 : : log::LogCategory::kNetwork, ActorId{0}, 0, "network frame sent",
146 : : log::field("bytes", static_cast<uint64_t>(frame_data.size())));
147 : :
148 : 11 : send_raw(frame_data);
149 : : }
150 : :
151 : 43 : void WireFrameConnection::close() {
152 : 43 : if (fd_ >= 0) {
153 : 23 : if (loop_) {
154 : 23 : loop_->clear_read_handler(fd_);
155 : 23 : loop_->remove_fd(fd_);
156 : : }
157 : 23 : ::close(fd_);
158 : 23 : fd_ = -1;
159 : : }
160 : 43 : set_state(ConnectionState::Disconnected);
161 : 43 : HPACTOR_LOG_DEBUG(log::LogCategory::kNetwork, ActorId{0}, 0,
162 : : "connection closed");
163 : 43 : }
164 : :
165 : 1 : void WireFrameConnection::handle_read() {
166 : : // Phase 1 — accumulate header bytes (8 bytes: magic + payload length).
167 : : // Read header first so we know the exact payload length before allocating
168 : : // buffer space for it.
169 : 2 : while (read_buffer_.size() < WireFrame::HeaderSize) {
170 : 2 : size_t remaining = WireFrame::HeaderSize - read_buffer_.size();
171 : 2 : uint8_t* area = read_buffer_.reserve_tail(remaining);
172 : 2 : ssize_t n = ::read(fd_, area, remaining);
173 : 2 : if (n > 0) {
174 : 1 : read_buffer_.commit_tail(static_cast<size_t>(n));
175 : 1 : } else if (n == 0) {
176 : 1 : read_buffer_.clear(); // EOF — discard incomplete header
177 : 1 : return;
178 : : } else {
179 : 0 : if (errno == EAGAIN || errno == EWOULDBLOCK)
180 : 0 : return;
181 : 0 : read_buffer_.clear(); // Error — discard partial data
182 : 0 : return;
183 : : }
184 : : }
185 : :
186 : : // Validate magic "HPAC"
187 : 0 : if (read_buffer_[0] != 'H' || read_buffer_[1] != 'P' ||
188 : 0 : read_buffer_[2] != 'A' || read_buffer_[3] != 'C') {
189 : : // Invalid magic — skip one byte and retry (re-sync stream)
190 : 0 : read_buffer_.consume(1);
191 : 0 : handle_read(); // Recurse to re-enter header-read phase
192 : 0 : return;
193 : : }
194 : :
195 : : // Parse remaining length (big-endian uint32_t from bytes 4-7)
196 : 0 : size_t payload_len = (static_cast<size_t>(read_buffer_[4]) << 24) |
197 : 0 : (static_cast<size_t>(read_buffer_[5]) << 16) |
198 : 0 : (static_cast<size_t>(read_buffer_[6]) << 8) |
199 : 0 : static_cast<size_t>(read_buffer_[7]);
200 : :
201 : 0 : size_t total_frame_size = WireFrame::HeaderSize + payload_len;
202 : :
203 : : // Phase 2 — accumulate exactly payload_len bytes.
204 : : // Reserve only the remaining bytes needed, not a fixed chunk.
205 : 0 : while (read_buffer_.size() < total_frame_size) {
206 : 0 : size_t remaining = total_frame_size - read_buffer_.size();
207 : 0 : uint8_t* area = read_buffer_.reserve_tail(remaining);
208 : 0 : ssize_t n = ::read(fd_, area, remaining);
209 : 0 : if (n > 0) {
210 : 0 : read_buffer_.commit_tail(static_cast<size_t>(n));
211 : 0 : } else if (n == 0) {
212 : 0 : read_buffer_.clear(); // EOF mid-frame — discard incomplete frame
213 : 0 : return;
214 : : } else {
215 : 0 : if (errno == EAGAIN || errno == EWOULDBLOCK)
216 : 0 : return;
217 : 0 : read_buffer_.clear(); // Error — discard partial data
218 : 0 : return;
219 : : }
220 : : }
221 : :
222 : : // Complete frame received — extract into StreamBuffer.
223 : : StreamBuffer frame_data(read_buffer_.begin(),
224 : 0 : read_buffer_.begin() + total_frame_size);
225 : 0 : read_buffer_.consume(total_frame_size);
226 : :
227 : 0 : if (frame_handler_) {
228 : 0 : frame_handler_(std::move(frame_data));
229 : : }
230 : :
231 : : // Process any additional frames already in the buffer.
232 : 0 : if (read_buffer_.size() >= WireFrame::HeaderSize) {
233 : 0 : handle_read();
234 : : }
235 : 0 : }
236 : :
237 : 11 : void WireFrameConnection::send_raw(const StreamBuffer& data) {
238 : 11 : if (fd_ < 0 || !loop_)
239 : 0 : return;
240 : :
241 : 11 : write_buffer_.append(data.data(), data.size());
242 : :
243 : 11 : if (is_sending_)
244 : 9 : return;
245 : :
246 : 2 : flush_write_buffer();
247 : : }
248 : :
249 : 2 : void WireFrameConnection::flush_write_buffer() {
250 : 2 : if (fd_ < 0 || loop_ == nullptr || write_buffer_.empty()) {
251 : 0 : return;
252 : : }
253 : :
254 : 2 : is_sending_ = true;
255 : :
256 : : struct iovec iov;
257 : 2 : iov.iov_base = write_buffer_.data();
258 : 2 : iov.iov_len = write_buffer_.size();
259 : :
260 : 2 : loop_->backend()->async_send(fd_, &iov, 1, ActorId(0),
261 : : static_cast<uint32_t>(OpType::Send));
262 : : }
263 : :
264 : 0 : void WireFrameConnection::handle_send_completion(int result) {
265 : 0 : if (send_completion_handler_) {
266 : 0 : send_completion_handler_(result);
267 : : }
268 : 0 : is_sending_ = false;
269 : :
270 : 0 : if (result < 0) {
271 : 0 : set_state(ConnectionState::Error);
272 : 0 : if (error_handler_) {
273 : 0 : error_handler_(
274 : 0 : std::enable_shared_from_this<WireFrameConnection>::shared_from_this(),
275 : 0 : error{});
276 : : }
277 : 0 : return;
278 : : }
279 : :
280 : 0 : if (static_cast<size_t>(result) >= write_buffer_.size()) {
281 : 0 : write_buffer_.clear();
282 : : } else {
283 : 0 : write_buffer_.consume(static_cast<size_t>(result));
284 : : }
285 : :
286 : 0 : if (!write_buffer_.empty()) {
287 : 0 : flush_write_buffer();
288 : : }
289 : : }
290 : :
291 : : } // namespace net
292 : : } // namespace hpactor
|