byteor_adapters/
http.rs

1use std::io::{BufRead, BufReader, Read, Write};
2use std::net::{SocketAddr, TcpListener, TcpStream};
3use std::time::Duration;
4
5use crate::{Adapter, AdapterError, EgressAdapter, IngressAdapter};
6
7pub(crate) fn write_http_response(
8    stream: &mut TcpStream,
9    status_line: &str,
10) -> Result<(), AdapterError> {
11    stream
12        .write_all(
13            format!("HTTP/1.1 {status_line}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n")
14                .as_bytes(),
15        )
16        .map_err(|source| AdapterError::Io {
17            op: "write_http_response",
18            source,
19        })
20}
21
22pub(crate) fn read_http_line(
23    reader: &mut BufReader<TcpStream>,
24    op: &'static str,
25) -> Result<String, AdapterError> {
26    let mut line = String::new();
27    let n = reader
28        .read_line(&mut line)
29        .map_err(|source| AdapterError::Io { op, source })?;
30    if n == 0 {
31        return Err(AdapterError::Http {
32            op,
33            detail: "unexpected EOF".into(),
34        });
35    }
36    Ok(line)
37}
38
39fn parse_request_line(line: &str) -> Option<(&str, &str, &str)> {
40    let trimmed = line.trim_end_matches(['\r', '\n']);
41    let mut parts = trimmed.split_whitespace();
42    let method = parts.next()?;
43    let path = parts.next()?;
44    let version = parts.next()?;
45    Some((method, path, version))
46}
47
48/// Minimal blocking HTTP ingress adapter.
49///
50/// This adapter accepts `POST` requests on the configured path and exposes each request body as
51/// one ingress message.
52pub struct HttpIngress {
53    name: String,
54    listener: TcpListener,
55    path: String,
56    max_body_bytes: usize,
57}
58
59impl HttpIngress {
60    /// Bind a blocking HTTP ingress adapter.
61    pub fn bind(
62        name: impl Into<String>,
63        bind: &str,
64        path: impl Into<String>,
65        max_body_bytes: usize,
66    ) -> Result<Self, AdapterError> {
67        let path = path.into();
68        if !path.starts_with('/') {
69            return Err(AdapterError::Config {
70                detail: "http ingress path must start with '/'".into(),
71            });
72        }
73        let listener = TcpListener::bind(bind).map_err(|source| AdapterError::Io {
74            op: "tcp_bind",
75            source,
76        })?;
77        Ok(Self {
78            name: name.into(),
79            listener,
80            path,
81            max_body_bytes,
82        })
83    }
84
85    /// Return the bound socket address.
86    pub fn local_addr(&self) -> Result<SocketAddr, AdapterError> {
87        self.listener
88            .local_addr()
89            .map_err(|source| AdapterError::Io {
90                op: "local_addr",
91                source,
92            })
93    }
94
95    fn read_request_body(
96        &self,
97        stream: TcpStream,
98        out: &mut [u8],
99    ) -> Result<Option<usize>, AdapterError> {
100        let mut reader = BufReader::new(stream);
101        let request_line = read_http_line(&mut reader, "read_request_line")?;
102        let Some((method, target, _version)) = parse_request_line(&request_line) else {
103            write_http_response(reader.get_mut(), "400 Bad Request")?;
104            return Ok(None);
105        };
106
107        let mut content_length = None;
108        loop {
109            let line = read_http_line(&mut reader, "read_header_line")?;
110            if line == "\r\n" {
111                break;
112            }
113            let Some((name, value)) = line.split_once(':') else {
114                write_http_response(reader.get_mut(), "400 Bad Request")?;
115                return Ok(None);
116            };
117            if name.trim().eq_ignore_ascii_case("content-length") {
118                let parsed = value
119                    .trim()
120                    .parse::<usize>()
121                    .map_err(|_| AdapterError::Http {
122                        op: "parse_content_length",
123                        detail: format!("invalid content-length header: {value:?}"),
124                    })?;
125                content_length = Some(parsed);
126            }
127        }
128
129        if method != "POST" {
130            write_http_response(reader.get_mut(), "405 Method Not Allowed")?;
131            return Ok(None);
132        }
133        if target != self.path {
134            write_http_response(reader.get_mut(), "404 Not Found")?;
135            return Ok(None);
136        }
137
138        let content_length = content_length.ok_or_else(|| AdapterError::Http {
139            op: "read_request_headers",
140            detail: "missing content-length header".into(),
141        })?;
142        if content_length > self.max_body_bytes || content_length > out.len() {
143            write_http_response(reader.get_mut(), "413 Payload Too Large")?;
144            return Err(AdapterError::Http {
145                op: "read_request_body",
146                detail: format!(
147                    "payload too large len={content_length} max={} buf={}",
148                    self.max_body_bytes,
149                    out.len()
150                ),
151            });
152        }
153
154        reader
155            .read_exact(&mut out[..content_length])
156            .map_err(|source| AdapterError::Io {
157                op: "read_body",
158                source,
159            })?;
160        write_http_response(reader.get_mut(), "202 Accepted")?;
161        Ok(Some(content_length))
162    }
163}
164
165impl Adapter for HttpIngress {
166    fn name(&self) -> &str {
167        &self.name
168    }
169
170    fn transport_kind(&self) -> &str {
171        "http"
172    }
173}
174
175impl IngressAdapter for HttpIngress {
176    fn read_next(&mut self, out: &mut [u8]) -> Result<Option<usize>, AdapterError> {
177        loop {
178            let (stream, _) = self.listener.accept().map_err(|source| AdapterError::Io {
179                op: "accept",
180                source,
181            })?;
182
183            if let Some(n) = self.read_request_body(stream, out)? {
184                return Ok(Some(n));
185            }
186        }
187    }
188}
189
190/// Minimal blocking HTTP egress adapter.
191///
192/// Each `write_msg()` performs a `POST` with `application/octet-stream` body bytes.
193pub struct HttpEgress {
194    name: String,
195    url: String,
196    method: String,
197    headers: Vec<(String, String)>,
198    timeout: Duration,
199    max_retries: usize,
200}
201
202impl HttpEgress {
203    /// Create a blocking HTTP egress adapter.
204    pub fn new(
205        name: impl Into<String>,
206        url: impl Into<String>,
207        headers: Vec<(String, String)>,
208        timeout: Duration,
209        max_retries: usize,
210    ) -> Result<Self, AdapterError> {
211        Self::with_method(name, url, "POST", headers, timeout, max_retries)
212    }
213
214    /// Create a blocking HTTP egress adapter with an explicit method.
215    pub fn with_method(
216        name: impl Into<String>,
217        url: impl Into<String>,
218        method: impl Into<String>,
219        headers: Vec<(String, String)>,
220        timeout: Duration,
221        max_retries: usize,
222    ) -> Result<Self, AdapterError> {
223        let url = url.into();
224        if !(url.starts_with("http://") || url.starts_with("https://")) {
225            return Err(AdapterError::Config {
226                detail: "http egress url must start with http:// or https://".into(),
227            });
228        }
229        let method = method.into().to_ascii_uppercase();
230        if !matches!(method.as_str(), "POST" | "PUT" | "PATCH") {
231            return Err(AdapterError::Config {
232                detail: format!("http egress method must be POST, PUT, or PATCH: {method}"),
233            });
234        }
235        Ok(Self {
236            name: name.into(),
237            url,
238            method,
239            headers,
240            timeout,
241            max_retries,
242        })
243    }
244}
245
246impl Adapter for HttpEgress {
247    fn name(&self) -> &str {
248        &self.name
249    }
250
251    fn transport_kind(&self) -> &str {
252        "http"
253    }
254}
255
256impl EgressAdapter for HttpEgress {
257    fn write_msg(&mut self, msg: &[u8]) -> Result<(), AdapterError> {
258        let agent = ureq::AgentBuilder::new().timeout(self.timeout).build();
259        let mut attempt = 0usize;
260        loop {
261            attempt = attempt.saturating_add(1);
262            let mut req = agent
263                .request(self.method.as_str(), &self.url)
264                .set("Content-Type", "application/octet-stream");
265            for (name, value) in &self.headers {
266                req = req.set(name, value);
267            }
268
269            match req.send_bytes(msg) {
270                Ok(resp) => {
271                    if (200..300).contains(&resp.status()) {
272                        return Ok(());
273                    }
274                    if attempt <= self.max_retries && (resp.status() == 429 || resp.status() >= 500)
275                    {
276                        continue;
277                    }
278                    return Err(AdapterError::Http {
279                        op: "http_post",
280                        detail: format!("status={}", resp.status()),
281                    });
282                }
283                Err(err) => {
284                    if attempt <= self.max_retries {
285                        continue;
286                    }
287                    return Err(AdapterError::Http {
288                        op: "http_post",
289                        detail: err.to_string(),
290                    });
291                }
292            }
293        }
294    }
295
296    fn flush(&mut self) -> Result<(), AdapterError> {
297        Ok(())
298    }
299}