1use std::io::Write;
16use std::process::{Command, Stdio};
17use std::time::{Duration, Instant};
18
19use byteor_pipeline_exec::{StageError, StatefulStage};
20
21use crate::error::ActionError;
22use crate::retry::{run_with_retry, RetryConfig};
23
24pub const EXEC_PREFIX: &str = "exec:";
26
27pub const BYTEOR_EXEC_ALLOWLIST_ENV: &str = "BYTEOR_EXEC_ALLOWLIST";
29pub const BYTEOR_EXEC_TIMEOUT_ENV: &str = "BYTEOR_EXEC_TIMEOUT";
31
32const DEFAULT_EXEC_TIMEOUT: Duration = Duration::from_secs(5);
33
34#[derive(Clone, Debug)]
36pub struct ExecCmd<'a> {
37 pub program: &'a str,
39 pub args: Vec<&'a str>,
41}
42
43pub fn parse_exec_cmd(stage_key: &str) -> Option<ExecCmd<'_>> {
45 let rest = stage_key.strip_prefix(EXEC_PREFIX)?;
46 if rest.is_empty() {
47 return None;
48 }
49 if rest.contains('\0') {
50 return None;
51 }
52
53 let mut parts = rest.split('|');
54 let program = parts.next()?;
55 if program.is_empty() {
56 return None;
57 }
58
59 let mut args = Vec::new();
60 for a in parts {
61 if a.is_empty() {
62 return None;
63 }
64 args.push(a);
65 }
66
67 Some(ExecCmd { program, args })
68}
69
70fn exec_allowlist() -> Vec<String> {
71 let s = match std::env::var(BYTEOR_EXEC_ALLOWLIST_ENV) {
72 Ok(s) => s,
73 Err(_) => return Vec::new(),
74 };
75
76 s.split(',')
77 .map(|x| x.trim())
78 .filter(|x| !x.is_empty())
79 .map(|x| x.to_string())
80 .collect()
81}
82
83fn parse_timeout_override(raw: &str) -> Result<Duration, StageError> {
84 let raw = raw.trim();
85 if raw.is_empty() {
86 return Err(ActionError::non_retryable(format!(
87 "exec: {BYTEOR_EXEC_TIMEOUT_ENV} must not be empty"
88 ))
89 .into());
90 }
91
92 let (num, unit) = if let Some(v) = raw.strip_suffix("ms") {
93 (v, "ms")
94 } else if let Some(v) = raw.strip_suffix('s') {
95 (v, "s")
96 } else if let Some(v) = raw.strip_suffix('m') {
97 (v, "m")
98 } else {
99 (raw, "s")
100 };
101
102 let value: u64 = num.trim().parse().map_err(|_| {
103 ActionError::non_retryable(format!(
104 "exec: invalid {BYTEOR_EXEC_TIMEOUT_ENV}={raw:?}; expected <n>, <n>ms, <n>s, or <n>m"
105 ))
106 })?;
107
108 match unit {
109 "ms" => Ok(Duration::from_millis(value)),
110 "s" => Ok(Duration::from_secs(value)),
111 "m" => Ok(Duration::from_secs(value.saturating_mul(60))),
112 _ => Err(ActionError::non_retryable("exec: unsupported timeout unit").into()),
113 }
114}
115
116fn resolve_timeout() -> Result<Duration, StageError> {
117 match std::env::var(BYTEOR_EXEC_TIMEOUT_ENV) {
118 Ok(raw) => parse_timeout_override(&raw),
119 Err(std::env::VarError::NotPresent) => Ok(DEFAULT_EXEC_TIMEOUT),
120 Err(std::env::VarError::NotUnicode(_)) => Err(ActionError::non_retryable(format!(
121 "exec: {BYTEOR_EXEC_TIMEOUT_ENV} must be valid unicode"
122 ))
123 .into()),
124 }
125}
126
127pub struct ExecStage {
129 program: String,
130 args: Vec<String>,
131 timeout: Duration,
132 retry: RetryConfig,
133 enabled: bool,
134}
135
136impl ExecStage {
137 pub fn new(
141 program: String,
142 args: Vec<String>,
143 policy_decision: byteor_policy::Decision,
144 ) -> Result<Self, StageError> {
145 let enabled = match policy_decision {
146 byteor_policy::Decision::Allow => true,
147 byteor_policy::Decision::DryRunOnly => false,
148 byteor_policy::Decision::RequireApproval(gate) => {
149 return Err(ActionError::non_retryable(format!(
150 "exec: requires approval token name={}",
151 gate.token_name
152 ))
153 .into());
154 }
155 byteor_policy::Decision::Deny(reason) => {
156 return Err(ActionError::non_retryable(format!(
157 "exec: denied by policy reason={reason:?}"
158 ))
159 .into());
160 }
161 };
162
163 if program.is_empty() {
164 return Err(ActionError::non_retryable("exec: missing program").into());
165 }
166 if program.contains('\0') || args.iter().any(|a| a.contains('\0')) {
167 return Err(ActionError::non_retryable("exec: invalid NUL byte").into());
168 }
169
170 let allow = exec_allowlist();
171 if allow.is_empty() {
172 return Err(ActionError::non_retryable(format!(
173 "exec: disabled by default; set {BYTEOR_EXEC_ALLOWLIST_ENV}=<program[,program...]>"
174 ))
175 .into());
176 }
177 if !allow.iter().any(|a| a == &program) {
178 return Err(ActionError::non_retryable(format!(
179 "exec: program not allowlisted (program={program})"
180 ))
181 .into());
182 }
183
184 Ok(Self {
185 program,
186 args,
187 timeout: resolve_timeout()?,
188 retry: RetryConfig::from_env()?,
189 enabled,
190 })
191 }
192}
193
194impl StatefulStage for ExecStage {
195 fn call(&mut self, input: &[u8], output: &mut [u8]) -> Result<usize, StageError> {
196 let n = input.len().min(output.len());
198 output[..n].copy_from_slice(&input[..n]);
199
200 if !self.enabled {
201 return Ok(n);
202 }
203
204 run_with_retry(self.retry, || {
205 let mut child = Command::new(&self.program)
206 .args(&self.args)
207 .stdin(Stdio::piped())
208 .stdout(Stdio::null())
209 .stderr(Stdio::null())
210 .spawn()
211 .map_err(|e| ActionError::non_retryable(format!("exec: spawn: {e}")))?;
212
213 {
214 let Some(mut stdin) = child.stdin.take() else {
215 return Err(ActionError::non_retryable("exec: missing child stdin").into());
216 };
217 stdin
218 .write_all(input)
219 .map_err(|e| ActionError::retryable(format!("exec: write stdin: {e}")))?;
220 }
221
222 let start = Instant::now();
223 loop {
224 match child.try_wait() {
225 Ok(Some(status)) => {
226 if !status.success() {
227 return Err(ActionError::non_retryable(format!(
228 "exec: status={status}"
229 ))
230 .into());
231 }
232 break;
233 }
234 Ok(None) => {
235 if start.elapsed() >= self.timeout {
236 let _ = child.kill();
237 return Err(ActionError::retryable("exec: timeout").into());
238 }
239 std::thread::sleep(Duration::from_millis(10));
240 }
241 Err(e) => {
242 return Err(ActionError::retryable(format!("exec: wait: {e}")).into());
243 }
244 }
245 }
246
247 Ok(())
248 })?;
249
250 Ok(n)
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use std::sync::{Mutex, OnceLock};
258
259 fn env_lock() -> &'static Mutex<()> {
260 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
261 LOCK.get_or_init(|| Mutex::new(()))
262 }
263
264 fn set_minimal_allowlist() {
265 std::env::set_var(BYTEOR_EXEC_ALLOWLIST_ENV, "echo");
266 }
267
268 #[test]
269 fn exec_uses_default_timeout_when_env_unset() {
270 let _guard = env_lock().lock().unwrap();
271 set_minimal_allowlist();
272 std::env::remove_var(BYTEOR_EXEC_TIMEOUT_ENV);
273
274 let stage = ExecStage::new(
275 "echo".to_string(),
276 Vec::new(),
277 byteor_policy::Decision::Allow,
278 )
279 .unwrap();
280
281 assert_eq!(stage.timeout, DEFAULT_EXEC_TIMEOUT);
282
283 std::env::remove_var(BYTEOR_EXEC_ALLOWLIST_ENV);
284 }
285
286 #[test]
287 fn exec_timeout_env_overrides_default() {
288 let _guard = env_lock().lock().unwrap();
289 set_minimal_allowlist();
290 std::env::set_var(BYTEOR_EXEC_TIMEOUT_ENV, "250ms");
291
292 let stage = ExecStage::new(
293 "echo".to_string(),
294 Vec::new(),
295 byteor_policy::Decision::Allow,
296 )
297 .unwrap();
298
299 assert_eq!(stage.timeout, Duration::from_millis(250));
300
301 std::env::remove_var(BYTEOR_EXEC_TIMEOUT_ENV);
302 std::env::remove_var(BYTEOR_EXEC_ALLOWLIST_ENV);
303 }
304
305 #[test]
306 fn exec_rejects_invalid_timeout_env() {
307 let _guard = env_lock().lock().unwrap();
308 set_minimal_allowlist();
309 std::env::set_var(BYTEOR_EXEC_TIMEOUT_ENV, "abc");
310
311 let err = ExecStage::new(
312 "echo".to_string(),
313 Vec::new(),
314 byteor_policy::Decision::Allow,
315 )
316 .err()
317 .unwrap();
318
319 assert!(err.to_string().contains(BYTEOR_EXEC_TIMEOUT_ENV));
320
321 std::env::remove_var(BYTEOR_EXEC_TIMEOUT_ENV);
322 std::env::remove_var(BYTEOR_EXEC_ALLOWLIST_ENV);
323 }
324}