byteor_actions/
http_post.rs1use std::time::Duration;
11
12use byteor_pipeline_exec::{StageError, StatefulStage};
13
14use crate::error::ActionError;
15use crate::retry::{run_with_retry, RetryConfig};
16
17pub const HTTP_POST_PREFIX: &str = "http_post:";
19pub const BYTEOR_HTTP_POST_ALLOWLIST_ENV: &str = "BYTEOR_HTTP_POST_ALLOWLIST";
21pub const BYTEOR_HTTP_POST_TIMEOUT_ENV: &str = "BYTEOR_HTTP_POST_TIMEOUT";
23
24const DEFAULT_HTTP_POST_TIMEOUT: Duration = Duration::from_secs(5);
25
26pub fn parse_http_post_url(stage_key: &str) -> Option<&str> {
28 let url = stage_key.strip_prefix(HTTP_POST_PREFIX)?;
29 if url.is_empty() {
30 return None;
31 }
32 Some(url)
33}
34
35fn parse_timeout_override(raw: &str) -> Result<Duration, StageError> {
36 let raw = raw.trim();
37 if raw.is_empty() {
38 return Err(ActionError::non_retryable(format!(
39 "http_post: {BYTEOR_HTTP_POST_TIMEOUT_ENV} must not be empty"
40 ))
41 .into());
42 }
43
44 let (num, unit) = if let Some(v) = raw.strip_suffix("ms") {
45 (v, "ms")
46 } else if let Some(v) = raw.strip_suffix('s') {
47 (v, "s")
48 } else if let Some(v) = raw.strip_suffix('m') {
49 (v, "m")
50 } else {
51 (raw, "s")
52 };
53
54 let value: u64 = num.trim().parse().map_err(|_| {
55 ActionError::non_retryable(format!(
56 "http_post: invalid {BYTEOR_HTTP_POST_TIMEOUT_ENV}={raw:?}; expected <n>, <n>ms, <n>s, or <n>m"
57 ))
58 })?;
59
60 match unit {
61 "ms" => Ok(Duration::from_millis(value)),
62 "s" => Ok(Duration::from_secs(value)),
63 "m" => Ok(Duration::from_secs(value.saturating_mul(60))),
64 _ => Err(ActionError::non_retryable("http_post: unsupported timeout unit").into()),
65 }
66}
67
68fn resolve_timeout() -> Result<Duration, StageError> {
69 match std::env::var(BYTEOR_HTTP_POST_TIMEOUT_ENV) {
70 Ok(raw) => parse_timeout_override(&raw),
71 Err(std::env::VarError::NotPresent) => Ok(DEFAULT_HTTP_POST_TIMEOUT),
72 Err(std::env::VarError::NotUnicode(_)) => Err(ActionError::non_retryable(format!(
73 "http_post: {BYTEOR_HTTP_POST_TIMEOUT_ENV} must be valid unicode"
74 ))
75 .into()),
76 }
77}
78
79fn http_post_allowlist() -> Vec<String> {
80 let s = match std::env::var(BYTEOR_HTTP_POST_ALLOWLIST_ENV) {
81 Ok(s) => s,
82 Err(_) => return Vec::new(),
83 };
84
85 s.split(',')
86 .map(|x| x.trim())
87 .filter(|x| !x.is_empty())
88 .map(|x| x.to_string())
89 .collect()
90}
91
92pub struct HttpPostStage {
94 url: String,
95 timeout: Duration,
96 retry: RetryConfig,
97 enabled: bool,
98}
99
100impl HttpPostStage {
101 pub fn new(url: String, policy_decision: byteor_policy::Decision) -> Result<Self, StageError> {
103 let enabled = match policy_decision {
104 byteor_policy::Decision::Allow => true,
105 byteor_policy::Decision::DryRunOnly => false,
106 byteor_policy::Decision::RequireApproval(gate) => {
107 return Err(ActionError::non_retryable(format!(
108 "http_post: requires approval token name={}",
109 gate.token_name
110 ))
111 .into());
112 }
113 byteor_policy::Decision::Deny(reason) => {
114 return Err(ActionError::non_retryable(format!(
115 "http_post: denied by policy reason={reason:?}"
116 ))
117 .into());
118 }
119 };
120
121 if url.is_empty() {
122 return Err(ActionError::non_retryable("http_post: missing url").into());
123 }
124
125 if !(url.starts_with("http://") || url.starts_with("https://")) {
127 return Err(ActionError::non_retryable(
128 "http_post: url must start with http:// or https://",
129 )
130 .into());
131 }
132
133 let allow = http_post_allowlist();
134 if !allow.is_empty() && !allow.iter().any(|prefix| url.starts_with(prefix)) {
135 return Err(ActionError::non_retryable(format!(
136 "http_post: url not allowlisted (url={url}); set {BYTEOR_HTTP_POST_ALLOWLIST_ENV}=<prefix[,prefix...]>"
137 ))
138 .into());
139 }
140
141 Ok(Self {
142 url,
143 timeout: resolve_timeout()?,
144 retry: RetryConfig::from_env()?,
145 enabled,
146 })
147 }
148}
149
150impl StatefulStage for HttpPostStage {
151 fn call(&mut self, input: &[u8], output: &mut [u8]) -> Result<usize, StageError> {
152 let n = input.len().min(output.len());
154 output[..n].copy_from_slice(&input[..n]);
155
156 if !self.enabled {
157 return Ok(n);
158 }
159
160 let mut hasher = sha2::Sha256::new();
161 use sha2::Digest;
162 hasher.update(input);
163 let digest = hasher.finalize();
164 let key = format!("sha256:{}", hex::encode(digest));
165
166 run_with_retry(self.retry, || {
167 let agent = ureq::AgentBuilder::new().timeout(self.timeout).build();
168 let resp = agent
169 .post(&self.url)
170 .set("Content-Type", "application/octet-stream")
171 .set("Idempotency-Key", &key)
172 .send_bytes(input);
173
174 match resp {
175 Ok(r) => {
176 if r.status() >= 400 {
177 let msg = format!("http_post: status={}", r.status());
178 let err = if r.status() == 429 || r.status() >= 500 {
179 ActionError::retryable(msg)
180 } else {
181 ActionError::non_retryable(msg)
182 };
183 return Err(err.into());
184 }
185 }
186 Err(e) => {
187 return Err(ActionError::retryable(format!("http_post: {e}")).into());
189 }
190 }
191 Ok(())
192 })?;
193
194 Ok(n)
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use std::sync::{Mutex, OnceLock};
202
203 fn env_lock() -> &'static Mutex<()> {
204 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
205 LOCK.get_or_init(|| Mutex::new(()))
206 }
207
208 fn clear_http_post_env() {
209 std::env::remove_var(BYTEOR_HTTP_POST_TIMEOUT_ENV);
210 std::env::remove_var(BYTEOR_HTTP_POST_ALLOWLIST_ENV);
211 std::env::remove_var(crate::retry::BYTEOR_ACTION_MAX_RETRIES_ENV);
212 std::env::remove_var(crate::retry::BYTEOR_ACTION_RETRY_BASE_MS_ENV);
213 }
214
215 #[test]
216 fn http_post_uses_default_timeout_when_env_unset() {
217 let _guard = env_lock().lock().unwrap();
218 clear_http_post_env();
219
220 let stage = HttpPostStage::new(
221 "https://example.com".to_string(),
222 byteor_policy::Decision::Allow,
223 )
224 .unwrap();
225
226 assert_eq!(stage.timeout, DEFAULT_HTTP_POST_TIMEOUT);
227 }
228
229 #[test]
230 fn http_post_timeout_env_overrides_default() {
231 let _guard = env_lock().lock().unwrap();
232 clear_http_post_env();
233 std::env::set_var(BYTEOR_HTTP_POST_TIMEOUT_ENV, "250ms");
234
235 let stage = HttpPostStage::new(
236 "https://example.com".to_string(),
237 byteor_policy::Decision::Allow,
238 )
239 .unwrap();
240
241 assert_eq!(stage.timeout, Duration::from_millis(250));
242
243 clear_http_post_env();
244 }
245
246 #[test]
247 fn http_post_rejects_invalid_timeout_env() {
248 let _guard = env_lock().lock().unwrap();
249 clear_http_post_env();
250 std::env::set_var(BYTEOR_HTTP_POST_TIMEOUT_ENV, "abc");
251
252 let err = HttpPostStage::new(
253 "https://example.com".to_string(),
254 byteor_policy::Decision::Allow,
255 )
256 .err()
257 .unwrap();
258
259 assert!(err.to_string().contains(BYTEOR_HTTP_POST_TIMEOUT_ENV));
260
261 clear_http_post_env();
262 }
263
264 #[test]
265 fn http_post_allows_all_urls_when_allowlist_unset() {
266 let _guard = env_lock().lock().unwrap();
267 clear_http_post_env();
268
269 let stage = HttpPostStage::new(
270 "https://unlisted.example.com/path".to_string(),
271 byteor_policy::Decision::Allow,
272 )
273 .unwrap();
274
275 assert_eq!(stage.url, "https://unlisted.example.com/path");
276 }
277
278 #[test]
279 fn http_post_rejects_url_outside_allowlist() {
280 let _guard = env_lock().lock().unwrap();
281 clear_http_post_env();
282 std::env::set_var(
283 BYTEOR_HTTP_POST_ALLOWLIST_ENV,
284 "https://allowed.example.com/api,https://other.example.com/",
285 );
286
287 let err = HttpPostStage::new(
288 "https://blocked.example.com/path".to_string(),
289 byteor_policy::Decision::Allow,
290 )
291 .err()
292 .unwrap();
293
294 assert!(err.to_string().contains("url not allowlisted"));
295
296 clear_http_post_env();
297 }
298
299 #[test]
300 fn http_post_accepts_url_matching_allowlist_prefix() {
301 let _guard = env_lock().lock().unwrap();
302 clear_http_post_env();
303 std::env::set_var(
304 BYTEOR_HTTP_POST_ALLOWLIST_ENV,
305 "https://allowed.example.com/api",
306 );
307
308 let stage = HttpPostStage::new(
309 "https://allowed.example.com/api/v1/orders".to_string(),
310 byteor_policy::Decision::Allow,
311 )
312 .unwrap();
313
314 assert_eq!(stage.url, "https://allowed.example.com/api/v1/orders");
315
316 clear_http_post_env();
317 }
318}