Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/net/http_connection.hpp>
16 : :
17 : : #include <hpactor/net/event_loop.hpp>
18 : :
19 : : #include <cstdio>
20 : : #include <cstring>
21 : : #include <sys/socket.h>
22 : : #include <sys/uio.h>
23 : : #include <unistd.h>
24 : :
25 : : namespace hpactor {
26 : : namespace net {
27 : :
28 : 4 : HTTPConnection::HTTPConnection(int fd, EndPoint local_endpoint,
29 : : EndPoint remote_endpoint, EventLoop* loop,
30 : 4 : HTTPConnectionMode mode)
31 : : : Connection(fd, local_endpoint, remote_endpoint, loop),
32 : 4 : is_sending_(false),
33 : 4 : mode_(mode) {
34 : 4 : write_buffer_.reserve(kWriteChunkSize);
35 : :
36 : 4 : auto parser_mode = (mode == HTTPConnectionMode::Server)
37 : 4 : ? HttpParserMode::Request
38 : : : HttpParserMode::Response;
39 : 4 : parser_ = std::make_unique<HttpParser>(parser_mode);
40 : :
41 : : // Wire up parser callbacks based on mode.
42 : 4 : if (mode == HTTPConnectionMode::Server) {
43 : 6 : parser_->set_on_message(
44 : 6 : [this](HttpRequest&& req) {
45 : 3 : if (request_handler_) {
46 : 3 : request_handler_(this, std::move(req));
47 : : }
48 : 3 : });
49 : : } else {
50 : 2 : parser_->set_on_response(
51 : 2 : [this](int status_code, const std::vector<HttpHeader>& headers,
52 : : const StreamBuffer& body) {
53 : 1 : if (response_handler_) {
54 : 1 : response_handler_(this, status_code, headers, body);
55 : : }
56 : 1 : });
57 : : }
58 : :
59 : 4 : parser_->set_on_error([this](llhttp_errno_t /*err*/, const char* msg) {
60 : 0 : if (error_handler_) {
61 : : error handler_err(errors::http_parse_error,
62 : 0 : std::string(msg ? msg : "parse error"));
63 : 0 : error_handler_(this, handler_err);
64 : 0 : }
65 : 0 : });
66 : 4 : }
67 : :
68 : 8 : HTTPConnection::~HTTPConnection() {
69 : 4 : close();
70 : 8 : }
71 : :
72 : : HTTPConnectionPtr
73 : 4 : HTTPConnection::create(int fd, EndPoint local_endpoint,
74 : : EndPoint remote_endpoint, EventLoop* loop,
75 : : HTTPConnectionMode mode) {
76 : : auto conn = std::shared_ptr<HTTPConnection>(
77 : 4 : new HTTPConnection(fd, local_endpoint, remote_endpoint, loop, mode));
78 : 4 : conn->set_state(ConnectionState::Connected);
79 : :
80 : 4 : if (loop && fd >= 0) {
81 : 4 : loop->add_fd(fd, EventLoop::Event::Read);
82 : 4 : if (loop->supports_read_handler()) {
83 : 4 : std::weak_ptr<HTTPConnection> weak_conn = conn;
84 : 4 : loop->set_read_handler(fd, [weak_conn](int /*event_fd*/) {
85 : 5 : if (auto self = weak_conn.lock()) {
86 : 5 : self->handle_read();
87 : 5 : }
88 : 5 : });
89 : 4 : }
90 : : }
91 : :
92 : 4 : return conn;
93 : : }
94 : :
95 : 0 : void HTTPConnection::send(const StreamBuffer& data) {
96 : 0 : if (state_ != ConnectionState::Connected) {
97 : 0 : return;
98 : : }
99 : 0 : send_raw(data);
100 : : }
101 : :
102 : 2 : void HTTPConnection::send_raw(const StreamBuffer& data) {
103 : 2 : if (fd_ < 0 || !loop_)
104 : 0 : return;
105 : :
106 : 2 : write_buffer_.append(data.data(), data.size());
107 : :
108 : 2 : if (is_sending_)
109 : 0 : return;
110 : :
111 : 2 : flush_write_buffer();
112 : : }
113 : :
114 : 2 : void HTTPConnection::send_response(HttpStatusCode code,
115 : : std::vector<HttpHeader> headers,
116 : : StreamBuffer body) {
117 : : // Build HTTP/1.1 status line.
118 : : char status_line[64];
119 : 2 : int status_line_len = snprintf(status_line, sizeof(status_line),
120 : : "HTTP/1.1 %d %s\r\n",
121 : : static_cast<int>(code),
122 : : reason_phrase(code));
123 : :
124 : 2 : StreamBuffer response;
125 : 2 : response.append(reinterpret_cast<const uint8_t*>(status_line),
126 : : static_cast<size_t>(status_line_len));
127 : :
128 : : // Add Content-Length header.
129 : : char content_length[32];
130 : 2 : int cl_len = snprintf(content_length, sizeof(content_length),
131 : : "Content-Length: %zu\r\n", body.size());
132 : 2 : response.append(reinterpret_cast<const uint8_t*>(content_length),
133 : : static_cast<size_t>(cl_len));
134 : :
135 : : // Add caller-supplied headers.
136 : 4 : for (const auto& h : headers) {
137 : 2 : response.append(reinterpret_cast<const uint8_t*>(h.name.data()),
138 : : h.name.size());
139 : 2 : response.append(reinterpret_cast<const uint8_t*>(": "), 2);
140 : 2 : response.append(reinterpret_cast<const uint8_t*>(h.value.data()),
141 : : h.value.size());
142 : 2 : response.append(reinterpret_cast<const uint8_t*>("\r\n"), 2);
143 : : }
144 : :
145 : : // End of headers.
146 : 2 : response.append(reinterpret_cast<const uint8_t*>("\r\n"), 2);
147 : :
148 : : // Append body.
149 : 2 : if (!body.empty()) {
150 : 2 : response.append(body.data(), body.size());
151 : : }
152 : :
153 : 2 : send_raw(response);
154 : 2 : }
155 : :
156 : 2 : void HTTPConnection::flush_write_buffer() {
157 : 2 : if (fd_ < 0 || loop_ == nullptr || write_buffer_.empty()) {
158 : 0 : return;
159 : : }
160 : :
161 : 2 : is_sending_ = true;
162 : :
163 : : struct iovec iov;
164 : 2 : iov.iov_base = write_buffer_.data();
165 : 2 : iov.iov_len = write_buffer_.size();
166 : :
167 : 2 : loop_->backend()->async_send(fd_, &iov, 1, ActorId(0),
168 : : static_cast<uint32_t>(OpType::Send));
169 : : }
170 : :
171 : 0 : void HTTPConnection::handle_send_completion(int result) {
172 : 0 : is_sending_ = false;
173 : :
174 : 0 : if (result < 0) {
175 : 0 : set_state(ConnectionState::Error);
176 : 0 : if (error_handler_) {
177 : 0 : error_handler_(this, error{});
178 : : }
179 : 0 : return;
180 : : }
181 : :
182 : 0 : if (static_cast<size_t>(result) >= write_buffer_.size()) {
183 : 0 : write_buffer_.clear();
184 : : } else {
185 : 0 : write_buffer_.consume(static_cast<size_t>(result));
186 : : }
187 : :
188 : 0 : if (!write_buffer_.empty()) {
189 : 0 : flush_write_buffer();
190 : : }
191 : : }
192 : :
193 : 8 : void HTTPConnection::close() {
194 : 8 : if (fd_ >= 0) {
195 : 4 : if (loop_) {
196 : 4 : loop_->clear_read_handler(fd_);
197 : 4 : loop_->remove_fd(fd_);
198 : : }
199 : 4 : ::close(fd_);
200 : 4 : fd_ = -1;
201 : : }
202 : 8 : set_state(ConnectionState::Disconnected);
203 : 8 : }
204 : :
205 : 5 : void HTTPConnection::handle_read() {
206 : : // Read up to 8KB per call.
207 : 5 : constexpr size_t kReadChunkSize = 8192;
208 : 5 : uint8_t* area = read_buffer_.reserve_tail(kReadChunkSize);
209 : 5 : ssize_t n = ::read(fd_, area, kReadChunkSize);
210 : 5 : if (n > 0) {
211 : 4 : read_buffer_.commit_tail(static_cast<size_t>(n));
212 : : // Feed to parser.
213 : 4 : size_t consumed = parser_->execute(read_buffer_);
214 : : // Consume parsed bytes from read buffer.
215 : 4 : if (consumed > 0) {
216 : 4 : read_buffer_.consume(consumed);
217 : : }
218 : 1 : } else if (n == 0) {
219 : : // EOF — connection closed by peer.
220 : 1 : read_buffer_.clear();
221 : 1 : set_state(ConnectionState::Disconnected);
222 : : } else {
223 : : // n < 0: error.
224 : 0 : if (errno == EAGAIN || errno == EWOULDBLOCK)
225 : 0 : return;
226 : 0 : read_buffer_.clear();
227 : : }
228 : : }
229 : :
230 : 2 : bool HTTPConnection::should_keep_alive() const {
231 : 2 : return parser_ && parser_->should_keep_alive();
232 : : }
233 : :
234 : : } // namespace net
235 : : } // namespace hpactor
|