Branch data Line data Source code
1 : : // Copyright 2026 HPActor Contributors
2 : : //
3 : : // Licensed under the Apache License, Version 2.0 (the "License");
4 : : // you may not use this file except in compliance with the License.
5 : : // You may obtain a copy of the License at
6 : : //
7 : : // http://www.apache.org/licenses/LICENSE-2.0
8 : : //
9 : : // Unless required by applicable law or agreed to in writing, software
10 : : // distributed under the License is distributed on an "AS IS" BASIS,
11 : : // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 : : // See the License for the specific language governing permissions and
13 : : // limitations under the License.
14 : :
15 : : #include <hpactor/net/event_loop.hpp>
16 : : #include <hpactor/net/http_client.hpp>
17 : : #include <hpactor/net/http_parser.hpp>
18 : : #include <hpactor/net/http_types.hpp>
19 : : #include <hpactor/rpc/rpc_channel.hpp>
20 : : #include <hpactor/types/types.hpp>
21 : :
22 : : #include <arpa/inet.h>
23 : : #include <fcntl.h>
24 : : #include <sys/socket.h>
25 : : #include <unistd.h>
26 : :
27 : : #include <chrono>
28 : : #include <cstring>
29 : : #include <future>
30 : : #include <string>
31 : : #include <unordered_map>
32 : : #include <vector>
33 : :
34 : : namespace hpactor {
35 : : namespace net {
36 : :
37 : : // =============================================================================
38 : : // URL Parsing
39 : : // =============================================================================
40 : : struct ParsedUrl {
41 : : std::string scheme;
42 : : std::string host;
43 : : uint16_t port;
44 : : std::string path;
45 : : };
46 : :
47 : : // Note: uses std::stoi for port — assumes well-formed numeric ports.
48 : : // Validation of malformed URLs (e.g. "http://host:abc/path") is out of scope
49 : : // for this phase.
50 : 1 : static ParsedUrl parse_url(const std::string& url) {
51 : 1 : ParsedUrl result;
52 : :
53 : 1 : auto scheme_end = url.find("://");
54 : 1 : if (scheme_end == std::string::npos) {
55 : 0 : result.scheme = "http";
56 : 0 : scheme_end = 0;
57 : : } else {
58 : 1 : result.scheme = url.substr(0, scheme_end);
59 : 1 : scheme_end += 3;
60 : : }
61 : :
62 : 1 : auto host_end = url.find_first_of(":/", scheme_end);
63 : 1 : result.host = url.substr(scheme_end, host_end - scheme_end);
64 : :
65 : 1 : if (host_end != std::string::npos && url[host_end] == ':') {
66 : 1 : auto port_end = url.find('/', host_end);
67 : : auto port_str = url.substr(host_end + 1, port_end == std::string::npos
68 : : ? std::string::npos
69 : 1 : : port_end - host_end - 1);
70 : 1 : result.port = static_cast<uint16_t>(std::stoi(port_str));
71 : 1 : host_end = port_end;
72 : 1 : } else {
73 : 0 : result.port = (result.scheme == "https") ? 443 : 80;
74 : : }
75 : :
76 : 1 : result.path = (host_end != std::string::npos) ? url.substr(host_end) : "/";
77 : 1 : return result;
78 : : }
79 : :
80 : : // =============================================================================
81 : : // HttpClient Implementation — blocking TCP (for test usage)
82 : : // =============================================================================
83 : :
84 : 1 : HttpClient::HttpClient(EventLoop* loop) : loop_(loop) {}
85 : :
86 : 1 : HttpClient::~HttpClient() {
87 : 1 : abort();
88 : 1 : }
89 : :
90 : : RpcFuture<StreamBuffer>
91 : 1 : HttpClient::request(HttpMethod method, const std::string& url,
92 : : std::vector<HttpHeader> headers, StreamBuffer body) {
93 : 1 : auto parsed = parse_url(url);
94 : :
95 : : // 1. Connect
96 : 1 : int fd = socket(AF_INET, SOCK_STREAM, 0);
97 : 1 : if (fd < 0) {
98 : 0 : std::promise<result<StreamBuffer>> p;
99 : 0 : p.set_value(
100 : 0 : result<StreamBuffer>::make(error(errors::http_connect_failed, "sock"
101 : : "et()"
102 : : " fai"
103 : : "le"
104 : 0 : "d")));
105 : 0 : return RpcFuture<StreamBuffer>(p.get_future(), default_timeout_);
106 : 0 : }
107 : :
108 : 1 : struct sockaddr_in addr{};
109 : 1 : addr.sin_family = AF_INET;
110 : 1 : addr.sin_port = htons(parsed.port);
111 : 1 : inet_pton(AF_INET, parsed.host.c_str(), &addr.sin_addr);
112 : :
113 : 1 : if (connect(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) < 0) {
114 : 0 : close(fd);
115 : 0 : std::promise<result<StreamBuffer>> p;
116 : 0 : p.set_value(
117 : 0 : result<StreamBuffer>::make(error(errors::http_connect_failed, "conn"
118 : : "ect("
119 : : ") "
120 : : "fail"
121 : : "e"
122 : 0 : "d")));
123 : 0 : return RpcFuture<StreamBuffer>(p.get_future(), default_timeout_);
124 : 0 : }
125 : :
126 : : // 2. Build HTTP/1.1 request wire bytes
127 : 1 : StreamBuffer wire;
128 : 1 : std::string request_line = to_string(method);
129 : 1 : request_line += " " + parsed.path + " HTTP/1.1\r\n";
130 : 1 : wire.append(reinterpret_cast<const uint8_t*>(request_line.data()),
131 : : request_line.size());
132 : :
133 : 1 : std::string host_header = "Host: " + parsed.host;
134 : 1 : if (parsed.port != 80) {
135 : 1 : host_header += ":" + std::to_string(parsed.port);
136 : : }
137 : 1 : host_header += "\r\n";
138 : 1 : wire.append(reinterpret_cast<const uint8_t*>(host_header.data()),
139 : : host_header.size());
140 : :
141 : 1 : for (const auto& h : headers) {
142 : 0 : std::string hdr = h.name + ": " + h.value + "\r\n";
143 : 0 : wire.append(reinterpret_cast<const uint8_t*>(hdr.data()), hdr.size());
144 : 0 : }
145 : :
146 : 1 : if (!body.empty()) {
147 : 1 : std::string cl = "Content-Length: " + std::to_string(body.size()) + "\r\n";
148 : 1 : wire.append(reinterpret_cast<const uint8_t*>(cl.data()), cl.size());
149 : 1 : }
150 : 1 : const uint8_t crlf[] = {'\r', '\n'};
151 : 1 : wire.append(crlf, 2);
152 : 1 : wire.append(body.data(), body.size());
153 : :
154 : : // 3. Send
155 : 1 : ssize_t sent = write(fd, wire.data(), wire.size());
156 : 1 : if (sent < 0) {
157 : 0 : close(fd);
158 : 0 : std::promise<result<StreamBuffer>> p;
159 : 0 : p.set_value(
160 : 0 : result<StreamBuffer>::make(error(errors::http_connect_failed, "writ"
161 : : "e() "
162 : : "fail"
163 : : "e"
164 : 0 : "d")));
165 : 0 : return RpcFuture<StreamBuffer>(p.get_future(), default_timeout_);
166 : 0 : }
167 : :
168 : : // 4. Read response via llhttp in HTTP_RESPONSE mode
169 : 1 : HttpParser parser(HttpParserMode::Response);
170 : 1 : int response_status = 0;
171 : 1 : StreamBuffer response_body;
172 : 1 : bool complete = false;
173 : :
174 : 1 : parser.set_on_response([&](int status,
175 : : const std::vector<HttpHeader>& /*resp_headers*/,
176 : : const StreamBuffer& resp_body) {
177 : 1 : response_status = status;
178 : 1 : response_body = resp_body;
179 : 1 : complete = true;
180 : 1 : });
181 : :
182 : 1 : StreamBuffer read_buf;
183 : : ssize_t n;
184 : 2 : while (!complete && (n = read(fd, read_buf.reserve_tail(4096), 4096)) > 0) {
185 : 1 : read_buf.commit_tail(static_cast<size_t>(n));
186 : 1 : size_t consumed = parser.execute(read_buf);
187 : 1 : if (consumed > 0) {
188 : 1 : read_buf.consume(consumed);
189 : : }
190 : : }
191 : 1 : close(fd);
192 : :
193 : : // 5. Fulfill promise
194 : 1 : std::promise<result<StreamBuffer>> p;
195 : 1 : if (response_status >= 200 && response_status < 300) {
196 : 1 : p.set_value(result<StreamBuffer>::make(std::move(response_body)));
197 : 0 : } else if (response_status == 0) {
198 : 0 : p.set_value(
199 : 0 : result<StreamBuffer>::make(error(errors::http_parse_error, "No "
200 : : "respons"
201 : : "e "
202 : : "receive"
203 : 0 : "d")));
204 : : } else {
205 : 0 : p.set_value(result<StreamBuffer>::make(error(
206 : 0 : errors::http_parse_error, "HTTP " + std::to_string(response_status))));
207 : : }
208 : 1 : return RpcFuture<StreamBuffer>(p.get_future(), default_timeout_);
209 : 1 : }
210 : :
211 : : RpcFuture<StreamBuffer>
212 : 0 : HttpClient::get(const std::string& url, std::vector<HttpHeader> headers) {
213 : 0 : return request(HttpMethod::GET, url, std::move(headers), {});
214 : : }
215 : :
216 : 1 : RpcFuture<StreamBuffer> HttpClient::post(const std::string& url, StreamBuffer body,
217 : : std::vector<HttpHeader> headers) {
218 : 1 : return request(HttpMethod::POST, url, std::move(headers), std::move(body));
219 : : }
220 : :
221 : 0 : RpcFuture<StreamBuffer> HttpClient::put(const std::string& url, StreamBuffer body,
222 : : std::vector<HttpHeader> headers) {
223 : 0 : return request(HttpMethod::PUT, url, std::move(headers), std::move(body));
224 : : }
225 : :
226 : : RpcFuture<StreamBuffer>
227 : 0 : HttpClient::del(const std::string& url, std::vector<HttpHeader> headers) {
228 : 0 : return request(HttpMethod::DELETE, url, std::move(headers), {});
229 : : }
230 : :
231 : 1 : void HttpClient::abort() {
232 : : // Full implementation deferred — blocking I/O mode has no in-flight state.
233 : 1 : }
234 : :
235 : : } // namespace net
236 : : } // namespace hpactor
|