1use std::io::{BufRead, BufReader, Read, Write};
2use std::net::{SocketAddr, TcpListener, TcpStream};
3use std::time::Duration;
4
5use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
6
7pub(crate) fn write_http_response(
8 stream: &mut TcpStream,
9 status_line: &str,
10) -> Result<(), AdapterError> {
11 stream
12 .write_all(
13 format!("HTTP/1.1 {status_line}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n")
14 .as_bytes(),
15 )
16 .map_err(|source| AdapterError::Io {
17 op: "write_http_response",
18 source,
19 })
20}
21
22pub(crate) fn read_http_line(
23 reader: &mut BufReader<TcpStream>,
24 op: &'static str,
25) -> Result<String, AdapterError> {
26 let mut line = String::new();
27 let n = reader
28 .read_line(&mut line)
29 .map_err(|source| AdapterError::Io { op, source })?;
30 if n == 0 {
31 return Err(AdapterError::Http {
32 op,
33 detail: "unexpected EOF".into(),
34 });
35 }
36 Ok(line)
37}
38
39fn parse_request_line(line: &str) -> Option<(&str, &str, &str)> {
40 let trimmed = line.trim_end_matches(['\r', '\n']);
41 let mut parts = trimmed.split_whitespace();
42 let method = parts.next()?;
43 let path = parts.next()?;
44 let version = parts.next()?;
45 Some((method, path, version))
46}
47
48pub struct HttpIngress {
53 name: String,
54 listener: TcpListener,
55 path: String,
56 max_body_bytes: usize,
57}
58
59impl HttpIngress {
60 pub fn bind(
62 name: impl Into<String>,
63 bind: &str,
64 path: impl Into<String>,
65 max_body_bytes: usize,
66 ) -> Result<Self, AdapterError> {
67 let path = path.into();
68 if !path.starts_with('/') {
69 return Err(AdapterError::Config {
70 detail: "http ingress path must start with '/'".into(),
71 });
72 }
73 let listener = TcpListener::bind(bind).map_err(|source| AdapterError::Io {
74 op: "tcp_bind",
75 source,
76 })?;
77 Ok(Self {
78 name: name.into(),
79 listener,
80 path,
81 max_body_bytes,
82 })
83 }
84
85 pub fn local_addr(&self) -> Result<SocketAddr, AdapterError> {
87 self.listener
88 .local_addr()
89 .map_err(|source| AdapterError::Io {
90 op: "local_addr",
91 source,
92 })
93 }
94
95 fn read_request_body(
96 &self,
97 stream: TcpStream,
98 out: &mut [u8],
99 ) -> Result<Option<usize>, AdapterError> {
100 let mut reader = BufReader::new(stream);
101 let request_line = read_http_line(&mut reader, "read_request_line")?;
102 let Some((method, target, _version)) = parse_request_line(&request_line) else {
103 write_http_response(reader.get_mut(), "400 Bad Request")?;
104 return Ok(None);
105 };
106
107 let mut content_length = None;
108 loop {
109 let line = read_http_line(&mut reader, "read_header_line")?;
110 if line == "\r\n" {
111 break;
112 }
113 let Some((name, value)) = line.split_once(':') else {
114 write_http_response(reader.get_mut(), "400 Bad Request")?;
115 return Ok(None);
116 };
117 if name.trim().eq_ignore_ascii_case("content-length") {
118 let parsed = value
119 .trim()
120 .parse::<usize>()
121 .map_err(|_| AdapterError::Http {
122 op: "parse_content_length",
123 detail: format!("invalid content-length header: {value:?}"),
124 })?;
125 content_length = Some(parsed);
126 }
127 }
128
129 if method != "POST" {
130 write_http_response(reader.get_mut(), "405 Method Not Allowed")?;
131 return Ok(None);
132 }
133 if target != self.path {
134 write_http_response(reader.get_mut(), "404 Not Found")?;
135 return Ok(None);
136 }
137
138 let content_length = content_length.ok_or_else(|| AdapterError::Http {
139 op: "read_request_headers",
140 detail: "missing content-length header".into(),
141 })?;
142 if content_length > self.max_body_bytes || content_length > out.len() {
143 write_http_response(reader.get_mut(), "413 Payload Too Large")?;
144 return Err(AdapterError::Http {
145 op: "read_request_body",
146 detail: format!(
147 "payload too large len={content_length} max={} buf={}",
148 self.max_body_bytes,
149 out.len()
150 ),
151 });
152 }
153
154 reader
155 .read_exact(&mut out[..content_length])
156 .map_err(|source| AdapterError::Io {
157 op: "read_body",
158 source,
159 })?;
160 write_http_response(reader.get_mut(), "202 Accepted")?;
161 Ok(Some(content_length))
162 }
163}
164
165impl Adapter for HttpIngress {
166 fn name(&self) -> &str {
167 &self.name
168 }
169
170 fn transport_kind(&self) -> &str {
171 "http"
172 }
173}
174
175impl IngressAdapter for HttpIngress {
176 fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
177 loop {
178 let (stream, _) = self.listener.accept().map_err(|source| AdapterError::Io {
179 op: "accept",
180 source,
181 })?;
182
183 if let Some(n) = self.read_request_body(stream, out)? {
184 return Ok(Some(n));
185 }
186 }
187 }
188}
189
190pub struct HttpEgress {
194 name: String,
195 url: String,
196 method: String,
197 headers: Vec<(String, String)>,
198 timeout: Duration,
199 max_retries: usize,
200}
201
202impl HttpEgress {
203 pub fn new(
205 name: impl Into<String>,
206 url: impl Into<String>,
207 headers: Vec<(String, String)>,
208 timeout: Duration,
209 max_retries: usize,
210 ) -> Result<Self, AdapterError> {
211 Self::with_method(name, url, "POST", headers, timeout, max_retries)
212 }
213
214 pub fn with_method(
216 name: impl Into<String>,
217 url: impl Into<String>,
218 method: impl Into<String>,
219 headers: Vec<(String, String)>,
220 timeout: Duration,
221 max_retries: usize,
222 ) -> Result<Self, AdapterError> {
223 let url = url.into();
224 if !(url.starts_with("http://") || url.starts_with("https://")) {
225 return Err(AdapterError::Config {
226 detail: "http egress url must start with http:// or https://".into(),
227 });
228 }
229 let method = method.into().to_ascii_uppercase();
230 if !matches!(method.as_str(), "POST" | "PUT" | "PATCH") {
231 return Err(AdapterError::Config {
232 detail: format!("http egress method must be POST, PUT, or PATCH: {method}"),
233 });
234 }
235 Ok(Self {
236 name: name.into(),
237 url,
238 method,
239 headers,
240 timeout,
241 max_retries,
242 })
243 }
244}
245
246impl Adapter for HttpEgress {
247 fn name(&self) -> &str {
248 &self.name
249 }
250
251 fn transport_kind(&self) -> &str {
252 "http"
253 }
254}
255
256impl EgressAdapter for HttpEgress {
257 fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
258 let agent = ureq::AgentBuilder::new().timeout(self.timeout).build();
259 let mut attempt = 0usize;
260 loop {
261 attempt = attempt.saturating_add(1);
262 let mut req = agent
263 .request(self.method.as_str(), &self.url)
264 .set("Content-Type", "application/octet-stream");
265 for (name, value) in &self.headers {
266 req = req.set(name, value);
267 }
268
269 match req.send_bytes(msg) {
270 Ok(resp) => {
271 if (200..300).contains(&resp.status()) {
272 return Ok(());
273 }
274 if attempt <= self.max_retries && (resp.status() == 429 || resp.status() >= 500)
275 {
276 continue;
277 }
278 return Err(AdapterError::Http {
279 op: "http_post",
280 detail: format!("status={}", resp.status()),
281 });
282 }
283 Err(err) => {
284 if attempt <= self.max_retries {
285 continue;
286 }
287 return Err(AdapterError::Http {
288 op: "http_post",
289 detail: err.to_string(),
290 });
291 }
292 }
293 }
294 }
295
296 fn flush(&mut self) -> Result<(), AdapterError> {
297 Ok(())
298 }
299}