byteor_actions/
exec.rs

1//! Exec action stage.
2//!
3//! Stage identity syntax:
4//! - `exec:<program>`
5//! - `exec:<program>|<arg1>|<arg2>|...`
6//!
7//! This stage forwards the input bytes unchanged while also spawning `<program>` when policy
8//! allows side effects. The input bytes are written to the child process's stdin.
9//!
10//! Safety:
11//! - Disabled by default.
12//! - To enable, set `BYTEOR_EXEC_ALLOWLIST` to a comma-separated list of allowed program strings.
13//!   The `<program>` in the stage key must exactly match one of the entries.
14
15use std::io::Write;
16use std::process::{Command, Stdio};
17use std::time::{Duration, Instant};
18
19use byteor_pipeline_exec::{StageError, StatefulStage};
20
21use crate::error::ActionError;
22use crate::retry::{run_with_retry, RetryConfig};
23
24/// Prefix for exec stage keys.
25pub const EXEC_PREFIX: &str = "exec:";
26
27/// Environment variable containing a comma-separated allowlist of programs.
28pub const BYTEOR_EXEC_ALLOWLIST_ENV: &str = "BYTEOR_EXEC_ALLOWLIST";
29/// Environment variable overriding the exec timeout.
30pub const BYTEOR_EXEC_TIMEOUT_ENV: &str = "BYTEOR_EXEC_TIMEOUT";
31
32const DEFAULT_EXEC_TIMEOUT: Duration = Duration::from_secs(5);
33
34/// Parsed `exec:*` stage identity.
35#[derive(Clone, Debug)]
36pub struct ExecCmd<'a> {
37    /// Program string (must be allowlisted to run).
38    pub program: &'a str,
39    /// Arguments to pass to the program.
40    pub args: Vec<&'a str>,
41}
42
43/// Try to parse an `exec:<program>` or `exec:<program>|arg|...` stage key.
44pub fn parse_exec_cmd(stage_key: &str) -> Option<ExecCmd<'_>> {
45    let rest = stage_key.strip_prefix(EXEC_PREFIX)?;
46    if rest.is_empty() {
47        return None;
48    }
49    if rest.contains('\0') {
50        return None;
51    }
52
53    let mut parts = rest.split('|');
54    let program = parts.next()?;
55    if program.is_empty() {
56        return None;
57    }
58
59    let mut args = Vec::new();
60    for a in parts {
61        if a.is_empty() {
62            return None;
63        }
64        args.push(a);
65    }
66
67    Some(ExecCmd { program, args })
68}
69
70fn exec_allowlist() -> Vec<String> {
71    let s = match std::env::var(BYTEOR_EXEC_ALLOWLIST_ENV) {
72        Ok(s) => s,
73        Err(_) => return Vec::new(),
74    };
75
76    s.split(',')
77        .map(|x| x.trim())
78        .filter(|x| !x.is_empty())
79        .map(|x| x.to_string())
80        .collect()
81}
82
83fn parse_timeout_override(raw: &str) -> Result<Duration, StageError> {
84    let raw = raw.trim();
85    if raw.is_empty() {
86        return Err(ActionError::non_retryable(format!(
87            "exec: {BYTEOR_EXEC_TIMEOUT_ENV} must not be empty"
88        ))
89        .into());
90    }
91
92    let (num, unit) = if let Some(v) = raw.strip_suffix("ms") {
93        (v, "ms")
94    } else if let Some(v) = raw.strip_suffix('s') {
95        (v, "s")
96    } else if let Some(v) = raw.strip_suffix('m') {
97        (v, "m")
98    } else {
99        (raw, "s")
100    };
101
102    let value: u64 = num.trim().parse().map_err(|_| {
103        ActionError::non_retryable(format!(
104            "exec: invalid {BYTEOR_EXEC_TIMEOUT_ENV}={raw:?}; expected <n>, <n>ms, <n>s, or <n>m"
105        ))
106    })?;
107
108    match unit {
109        "ms" => Ok(Duration::from_millis(value)),
110        "s" => Ok(Duration::from_secs(value)),
111        "m" => Ok(Duration::from_secs(value.saturating_mul(60))),
112        _ => Err(ActionError::non_retryable("exec: unsupported timeout unit").into()),
113    }
114}
115
116fn resolve_timeout() -> Result<Duration, StageError> {
117    match std::env::var(BYTEOR_EXEC_TIMEOUT_ENV) {
118        Ok(raw) => parse_timeout_override(&raw),
119        Err(std::env::VarError::NotPresent) => Ok(DEFAULT_EXEC_TIMEOUT),
120        Err(std::env::VarError::NotUnicode(_)) => Err(ActionError::non_retryable(format!(
121            "exec: {BYTEOR_EXEC_TIMEOUT_ENV} must be valid unicode"
122        ))
123        .into()),
124    }
125}
126
127/// Exec action stage.
128pub struct ExecStage {
129    program: String,
130    args: Vec<String>,
131    timeout: Duration,
132    retry: RetryConfig,
133    enabled: bool,
134}
135
136impl ExecStage {
137    /// Create a new exec stage.
138    ///
139    /// This is disabled by default and requires `BYTEOR_EXEC_ALLOWLIST`.
140    pub fn new(
141        program: String,
142        args: Vec<String>,
143        policy_decision: byteor_policy::Decision,
144    ) -> Result<Self, StageError> {
145        let enabled = match policy_decision {
146            byteor_policy::Decision::Allow => true,
147            byteor_policy::Decision::DryRunOnly => false,
148            byteor_policy::Decision::RequireApproval(gate) => {
149                return Err(ActionError::non_retryable(format!(
150                    "exec: requires approval token name={}",
151                    gate.token_name
152                ))
153                .into());
154            }
155            byteor_policy::Decision::Deny(reason) => {
156                return Err(ActionError::non_retryable(format!(
157                    "exec: denied by policy reason={reason:?}"
158                ))
159                .into());
160            }
161        };
162
163        if program.is_empty() {
164            return Err(ActionError::non_retryable("exec: missing program").into());
165        }
166        if program.contains('\0') || args.iter().any(|a| a.contains('\0')) {
167            return Err(ActionError::non_retryable("exec: invalid NUL byte").into());
168        }
169
170        let allow = exec_allowlist();
171        if allow.is_empty() {
172            return Err(ActionError::non_retryable(format!(
173                "exec: disabled by default; set {BYTEOR_EXEC_ALLOWLIST_ENV}=<program[,program...]>"
174            ))
175            .into());
176        }
177        if !allow.iter().any(|a| a == &program) {
178            return Err(ActionError::non_retryable(format!(
179                "exec: program not allowlisted (program={program})"
180            ))
181            .into());
182        }
183
184        Ok(Self {
185            program,
186            args,
187            timeout: resolve_timeout()?,
188            retry: RetryConfig::from_env()?,
189            enabled,
190        })
191    }
192}
193
194impl StatefulStage for ExecStage {
195    fn call(&mut self, input: &[u8], output: &mut [u8]) -> Result<usize, StageError> {
196        // Always forward input unchanged.
197        let n = input.len().min(output.len());
198        output[..n].copy_from_slice(&input[..n]);
199
200        if !self.enabled {
201            return Ok(n);
202        }
203
204        run_with_retry(self.retry, || {
205            let mut child = Command::new(&self.program)
206                .args(&self.args)
207                .stdin(Stdio::piped())
208                .stdout(Stdio::null())
209                .stderr(Stdio::null())
210                .spawn()
211                .map_err(|e| ActionError::non_retryable(format!("exec: spawn: {e}")))?;
212
213            {
214                let Some(mut stdin) = child.stdin.take() else {
215                    return Err(ActionError::non_retryable("exec: missing child stdin").into());
216                };
217                stdin
218                    .write_all(input)
219                    .map_err(|e| ActionError::retryable(format!("exec: write stdin: {e}")))?;
220            }
221
222            let start = Instant::now();
223            loop {
224                match child.try_wait() {
225                    Ok(Some(status)) => {
226                        if !status.success() {
227                            return Err(ActionError::non_retryable(format!(
228                                "exec: status={status}"
229                            ))
230                            .into());
231                        }
232                        break;
233                    }
234                    Ok(None) => {
235                        if start.elapsed() >= self.timeout {
236                            let _ = child.kill();
237                            return Err(ActionError::retryable("exec: timeout").into());
238                        }
239                        std::thread::sleep(Duration::from_millis(10));
240                    }
241                    Err(e) => {
242                        return Err(ActionError::retryable(format!("exec: wait: {e}")).into());
243                    }
244                }
245            }
246
247            Ok(())
248        })?;
249
250        Ok(n)
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use std::sync::{Mutex, OnceLock};
258
259    fn env_lock() -> &'static Mutex<()> {
260        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
261        LOCK.get_or_init(|| Mutex::new(()))
262    }
263
264    fn set_minimal_allowlist() {
265        std::env::set_var(BYTEOR_EXEC_ALLOWLIST_ENV, "echo");
266    }
267
268    #[test]
269    fn exec_uses_default_timeout_when_env_unset() {
270        let _guard = env_lock().lock().unwrap();
271        set_minimal_allowlist();
272        std::env::remove_var(BYTEOR_EXEC_TIMEOUT_ENV);
273
274        let stage = ExecStage::new(
275            "echo".to_string(),
276            Vec::new(),
277            byteor_policy::Decision::Allow,
278        )
279        .unwrap();
280
281        assert_eq!(stage.timeout, DEFAULT_EXEC_TIMEOUT);
282
283        std::env::remove_var(BYTEOR_EXEC_ALLOWLIST_ENV);
284    }
285
286    #[test]
287    fn exec_timeout_env_overrides_default() {
288        let _guard = env_lock().lock().unwrap();
289        set_minimal_allowlist();
290        std::env::set_var(BYTEOR_EXEC_TIMEOUT_ENV, "250ms");
291
292        let stage = ExecStage::new(
293            "echo".to_string(),
294            Vec::new(),
295            byteor_policy::Decision::Allow,
296        )
297        .unwrap();
298
299        assert_eq!(stage.timeout, Duration::from_millis(250));
300
301        std::env::remove_var(BYTEOR_EXEC_TIMEOUT_ENV);
302        std::env::remove_var(BYTEOR_EXEC_ALLOWLIST_ENV);
303    }
304
305    #[test]
306    fn exec_rejects_invalid_timeout_env() {
307        let _guard = env_lock().lock().unwrap();
308        set_minimal_allowlist();
309        std::env::set_var(BYTEOR_EXEC_TIMEOUT_ENV, "abc");
310
311        let err = ExecStage::new(
312            "echo".to_string(),
313            Vec::new(),
314            byteor_policy::Decision::Allow,
315        )
316        .err()
317        .unwrap();
318
319        assert!(err.to_string().contains(BYTEOR_EXEC_TIMEOUT_ENV));
320
321        std::env::remove_var(BYTEOR_EXEC_TIMEOUT_ENV);
322        std::env::remove_var(BYTEOR_EXEC_ALLOWLIST_ENV);
323    }
324}