byteor_actions/
http_post.rs

1//! HTTP POST action stage.
2//!
3//! Stage identity syntax:
4//! - `http_post:<url>`
5//!
6//! This stage forwards the input bytes unchanged while also POSTing the same bytes to `<url>`
7//! when policy allows side effects. It includes an `Idempotency-Key` header computed as
8//! `sha256:<hex>` of the input bytes.
9
10use std::time::Duration;
11
12use byteor_pipeline_exec::{StageError, StatefulStage};
13
14use crate::error::ActionError;
15use crate::retry::{run_with_retry, RetryConfig};
16
17/// Prefix for HTTP POST stage keys.
18pub const HTTP_POST_PREFIX: &str = "http_post:";
19/// Environment variable containing a comma-separated URL prefix allowlist.
20pub const BYTEOR_HTTP_POST_ALLOWLIST_ENV: &str = "BYTEOR_HTTP_POST_ALLOWLIST";
21/// Environment variable overriding the HTTP POST timeout.
22pub const BYTEOR_HTTP_POST_TIMEOUT_ENV: &str = "BYTEOR_HTTP_POST_TIMEOUT";
23
24const DEFAULT_HTTP_POST_TIMEOUT: Duration = Duration::from_secs(5);
25
26/// Try to parse an `http_post:<url>` stage key.
27pub fn parse_http_post_url(stage_key: &str) -> Option<&str> {
28    let url = stage_key.strip_prefix(HTTP_POST_PREFIX)?;
29    if url.is_empty() {
30        return None;
31    }
32    Some(url)
33}
34
35fn parse_timeout_override(raw: &str) -> Result<Duration, StageError> {
36    let raw = raw.trim();
37    if raw.is_empty() {
38        return Err(ActionError::non_retryable(format!(
39            "http_post: {BYTEOR_HTTP_POST_TIMEOUT_ENV} must not be empty"
40        ))
41        .into());
42    }
43
44    let (num, unit) = if let Some(v) = raw.strip_suffix("ms") {
45        (v, "ms")
46    } else if let Some(v) = raw.strip_suffix('s') {
47        (v, "s")
48    } else if let Some(v) = raw.strip_suffix('m') {
49        (v, "m")
50    } else {
51        (raw, "s")
52    };
53
54    let value: u64 = num.trim().parse().map_err(|_| {
55        ActionError::non_retryable(format!(
56            "http_post: invalid {BYTEOR_HTTP_POST_TIMEOUT_ENV}={raw:?}; expected <n>, <n>ms, <n>s, or <n>m"
57        ))
58    })?;
59
60    match unit {
61        "ms" => Ok(Duration::from_millis(value)),
62        "s" => Ok(Duration::from_secs(value)),
63        "m" => Ok(Duration::from_secs(value.saturating_mul(60))),
64        _ => Err(ActionError::non_retryable("http_post: unsupported timeout unit").into()),
65    }
66}
67
68fn resolve_timeout() -> Result<Duration, StageError> {
69    match std::env::var(BYTEOR_HTTP_POST_TIMEOUT_ENV) {
70        Ok(raw) => parse_timeout_override(&raw),
71        Err(std::env::VarError::NotPresent) => Ok(DEFAULT_HTTP_POST_TIMEOUT),
72        Err(std::env::VarError::NotUnicode(_)) => Err(ActionError::non_retryable(format!(
73            "http_post: {BYTEOR_HTTP_POST_TIMEOUT_ENV} must be valid unicode"
74        ))
75        .into()),
76    }
77}
78
79fn http_post_allowlist() -> Vec<String> {
80    let s = match std::env::var(BYTEOR_HTTP_POST_ALLOWLIST_ENV) {
81        Ok(s) => s,
82        Err(_) => return Vec::new(),
83    };
84
85    s.split(',')
86        .map(|x| x.trim())
87        .filter(|x| !x.is_empty())
88        .map(|x| x.to_string())
89        .collect()
90}
91
92/// HTTP POST action stage.
93pub struct HttpPostStage {
94    url: String,
95    timeout: Duration,
96    retry: RetryConfig,
97    enabled: bool,
98}
99
100impl HttpPostStage {
101    /// Create a new HTTP POST stage.
102    pub fn new(url: String, policy_decision: byteor_policy::Decision) -> Result<Self, StageError> {
103        let enabled = match policy_decision {
104            byteor_policy::Decision::Allow => true,
105            byteor_policy::Decision::DryRunOnly => false,
106            byteor_policy::Decision::RequireApproval(gate) => {
107                return Err(ActionError::non_retryable(format!(
108                    "http_post: requires approval token name={}",
109                    gate.token_name
110                ))
111                .into());
112            }
113            byteor_policy::Decision::Deny(reason) => {
114                return Err(ActionError::non_retryable(format!(
115                    "http_post: denied by policy reason={reason:?}"
116                ))
117                .into());
118            }
119        };
120
121        if url.is_empty() {
122            return Err(ActionError::non_retryable("http_post: missing url").into());
123        }
124
125        // Minimal sanity. (We avoid pulling in a full URL parser in the hot path.)
126        if !(url.starts_with("http://") || url.starts_with("https://")) {
127            return Err(ActionError::non_retryable(
128                "http_post: url must start with http:// or https://",
129            )
130            .into());
131        }
132
133        let allow = http_post_allowlist();
134        if !allow.is_empty() && !allow.iter().any(|prefix| url.starts_with(prefix)) {
135            return Err(ActionError::non_retryable(format!(
136                "http_post: url not allowlisted (url={url}); set {BYTEOR_HTTP_POST_ALLOWLIST_ENV}=<prefix[,prefix...]>"
137            ))
138            .into());
139        }
140
141        Ok(Self {
142            url,
143            timeout: resolve_timeout()?,
144            retry: RetryConfig::from_env()?,
145            enabled,
146        })
147    }
148}
149
150impl StatefulStage for HttpPostStage {
151    fn call(&mut self, input: &[u8], output: &mut [u8]) -> Result<usize, StageError> {
152        // Always forward input unchanged.
153        let n = input.len().min(output.len());
154        output[..n].copy_from_slice(&input[..n]);
155
156        if !self.enabled {
157            return Ok(n);
158        }
159
160        let mut hasher = sha2::Sha256::new();
161        use sha2::Digest;
162        hasher.update(input);
163        let digest = hasher.finalize();
164        let key = format!("sha256:{}", hex::encode(digest));
165
166        run_with_retry(self.retry, || {
167            let agent = ureq::AgentBuilder::new().timeout(self.timeout).build();
168            let resp = agent
169                .post(&self.url)
170                .set("Content-Type", "application/octet-stream")
171                .set("Idempotency-Key", &key)
172                .send_bytes(input);
173
174            match resp {
175                Ok(r) => {
176                    if r.status() >= 400 {
177                        let msg = format!("http_post: status={}", r.status());
178                        let err = if r.status() == 429 || r.status() >= 500 {
179                            ActionError::retryable(msg)
180                        } else {
181                            ActionError::non_retryable(msg)
182                        };
183                        return Err(err.into());
184                    }
185                }
186                Err(e) => {
187                    // Treat transport/timeouts as retryable.
188                    return Err(ActionError::retryable(format!("http_post: {e}")).into());
189                }
190            }
191            Ok(())
192        })?;
193
194        Ok(n)
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use std::sync::{Mutex, OnceLock};
202
203    fn env_lock() -> &'static Mutex<()> {
204        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
205        LOCK.get_or_init(|| Mutex::new(()))
206    }
207
208    fn clear_http_post_env() {
209        std::env::remove_var(BYTEOR_HTTP_POST_TIMEOUT_ENV);
210        std::env::remove_var(BYTEOR_HTTP_POST_ALLOWLIST_ENV);
211        std::env::remove_var(crate::retry::BYTEOR_ACTION_MAX_RETRIES_ENV);
212        std::env::remove_var(crate::retry::BYTEOR_ACTION_RETRY_BASE_MS_ENV);
213    }
214
215    #[test]
216    fn http_post_uses_default_timeout_when_env_unset() {
217        let _guard = env_lock().lock().unwrap();
218        clear_http_post_env();
219
220        let stage = HttpPostStage::new(
221            "https://example.com".to_string(),
222            byteor_policy::Decision::Allow,
223        )
224        .unwrap();
225
226        assert_eq!(stage.timeout, DEFAULT_HTTP_POST_TIMEOUT);
227    }
228
229    #[test]
230    fn http_post_timeout_env_overrides_default() {
231        let _guard = env_lock().lock().unwrap();
232        clear_http_post_env();
233        std::env::set_var(BYTEOR_HTTP_POST_TIMEOUT_ENV, "250ms");
234
235        let stage = HttpPostStage::new(
236            "https://example.com".to_string(),
237            byteor_policy::Decision::Allow,
238        )
239        .unwrap();
240
241        assert_eq!(stage.timeout, Duration::from_millis(250));
242
243        clear_http_post_env();
244    }
245
246    #[test]
247    fn http_post_rejects_invalid_timeout_env() {
248        let _guard = env_lock().lock().unwrap();
249        clear_http_post_env();
250        std::env::set_var(BYTEOR_HTTP_POST_TIMEOUT_ENV, "abc");
251
252        let err = HttpPostStage::new(
253            "https://example.com".to_string(),
254            byteor_policy::Decision::Allow,
255        )
256        .err()
257        .unwrap();
258
259        assert!(err.to_string().contains(BYTEOR_HTTP_POST_TIMEOUT_ENV));
260
261        clear_http_post_env();
262    }
263
264    #[test]
265    fn http_post_allows_all_urls_when_allowlist_unset() {
266        let _guard = env_lock().lock().unwrap();
267        clear_http_post_env();
268
269        let stage = HttpPostStage::new(
270            "https://unlisted.example.com/path".to_string(),
271            byteor_policy::Decision::Allow,
272        )
273        .unwrap();
274
275        assert_eq!(stage.url, "https://unlisted.example.com/path");
276    }
277
278    #[test]
279    fn http_post_rejects_url_outside_allowlist() {
280        let _guard = env_lock().lock().unwrap();
281        clear_http_post_env();
282        std::env::set_var(
283            BYTEOR_HTTP_POST_ALLOWLIST_ENV,
284            "https://allowed.example.com/api,https://other.example.com/",
285        );
286
287        let err = HttpPostStage::new(
288            "https://blocked.example.com/path".to_string(),
289            byteor_policy::Decision::Allow,
290        )
291        .err()
292        .unwrap();
293
294        assert!(err.to_string().contains("url not allowlisted"));
295
296        clear_http_post_env();
297    }
298
299    #[test]
300    fn http_post_accepts_url_matching_allowlist_prefix() {
301        let _guard = env_lock().lock().unwrap();
302        clear_http_post_env();
303        std::env::set_var(
304            BYTEOR_HTTP_POST_ALLOWLIST_ENV,
305            "https://allowed.example.com/api",
306        );
307
308        let stage = HttpPostStage::new(
309            "https://allowed.example.com/api/v1/orders".to_string(),
310            byteor_policy::Decision::Allow,
311        )
312        .unwrap();
313
314        assert_eq!(stage.url, "https://allowed.example.com/api/v1/orders");
315
316        clear_http_post_env();
317    }
318}