byteor_replay/
stages.rs

1//! Stage resolution for replay.
2
3use std::path::PathBuf;
4
5use byteor_dataguard_core::{DataGuardCtx, DataGuardStages, RedactPayloadMode};
6use byteor_pipeline_exec::{ResolvedStage, StageResolver};
7
8const HTTP_POST_PREFIX: &str = "http_post:";
9const EXEC_PREFIX: &str = "exec:";
10
11/// Explicit DataGuard configuration for replay stage resolution.
12///
13/// Replay accepts this as a parameter rather than inferring it from the bundle manifest.
14/// That keeps replay deterministic and avoids silently constructing a policy/schema context from
15/// incomplete incident bundle metadata.
16#[derive(Clone, Debug)]
17pub struct ReplayDataGuardConfig {
18    /// Root directory of the file-backed schema registry.
19    pub registry_root: PathBuf,
20    /// Required envelope schema id.
21    pub schema_id: u64,
22    /// Required envelope message type.
23    pub msg_type: u32,
24    /// Required minimum envelope schema version.
25    pub required_version: u16,
26    /// Deterministic redaction policy used by DataGuard stages.
27    pub policy: byteor_redaction::RedactionPolicy,
28    /// Payload representation for redaction/fused stages.
29    pub mode: RedactPayloadMode,
30}
31
32fn identity_stage(input: &[u8], output: &mut [u8]) -> usize {
33    let n = input.len().min(output.len());
34    output[..n].copy_from_slice(&input[..n]);
35    n
36}
37
38fn is_http_post_stage(stage_key: &str) -> bool {
39    stage_key
40        .strip_prefix(HTTP_POST_PREFIX)
41        .is_some_and(|u| !u.is_empty())
42}
43
44fn is_exec_stage(stage_key: &str) -> bool {
45    let rest = stage_key.strip_prefix(EXEC_PREFIX);
46    match rest {
47        Some(r) if !r.is_empty() && !r.contains('\0') => true,
48        _ => false,
49    }
50}
51
52pub(crate) struct ReplayStages {
53    dry_run: bool,
54    dataguard: Option<DataGuardStages>,
55}
56
57impl ReplayStages {
58    pub(crate) fn new(dry_run: bool, dataguard: Option<ReplayDataGuardConfig>) -> Self {
59        let dataguard = dataguard.map(|cfg| {
60            let ctx = DataGuardCtx::new(cfg.registry_root);
61            DataGuardStages::new(
62                ctx,
63                cfg.schema_id,
64                cfg.msg_type,
65                cfg.required_version,
66                cfg.policy,
67                cfg.mode,
68            )
69        });
70
71        Self { dry_run, dataguard }
72    }
73}
74
75impl StageResolver for ReplayStages {
76    fn resolve(&self, stage: &str) -> Option<ResolvedStage> {
77        if stage == "identity" {
78            return Some(ResolvedStage::MapOk(identity_stage));
79        }
80
81        if let Some(dataguard) = &self.dataguard {
82            if let Some(stage) = dataguard.resolve(stage) {
83                return Some(stage);
84            }
85        }
86
87        // Side-effecting stages are identity in dry-run.
88        if self.dry_run && (is_http_post_stage(stage) || is_exec_stage(stage)) {
89            return Some(ResolvedStage::MapOk(identity_stage));
90        }
91
92        // For now, treat unknown stage keys as identity in dry-run so operators can at least
93        // reproduce the plumbing and capture an audit artifact without shipping product stages.
94        if self.dry_run {
95            return Some(ResolvedStage::MapOk(identity_stage));
96        }
97
98        None
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105
106    use byteor_dataguard_core::{STAGE_KEY_CHECK, STAGE_KEY_FUSED, STAGE_KEY_REDACT};
107    use byteor_pipeline_exec::StageResolver;
108    use byteor_redaction::RedactionPolicy;
109    use byteor_schema::{Field, FieldType, FileSchemaRegistry, Schema};
110
111    fn write_schema(root: &std::path::Path) {
112        let registry = FileSchemaRegistry::new(root);
113        registry
114            .write(&Schema {
115                schema_id: 7,
116                msg_type: 9,
117                name: "orders".to_string(),
118                msg_version: 1,
119                fields: vec![Field {
120                    name: "email".to_string(),
121                    ty: FieldType::String,
122                    optional: true,
123                }],
124            })
125            .unwrap();
126    }
127
128    #[test]
129    fn replay_resolves_dataguard_stages_when_configured() {
130        let tmp = tempfile::tempdir().unwrap();
131        write_schema(tmp.path());
132
133        let resolver = ReplayStages::new(
134            false,
135            Some(ReplayDataGuardConfig {
136                registry_root: tmp.path().to_path_buf(),
137                schema_id: 7,
138                msg_type: 9,
139                required_version: 1,
140                policy: RedactionPolicy::new(vec![]),
141                mode: RedactPayloadMode::MapKv,
142            }),
143        );
144
145        assert!(resolver.resolve(STAGE_KEY_CHECK).is_some());
146        assert!(resolver.resolve(STAGE_KEY_REDACT).is_some());
147        assert!(resolver.resolve(STAGE_KEY_FUSED).is_some());
148    }
149
150    #[test]
151    fn replay_dry_run_keeps_unknown_stage_identity() {
152        let resolver = ReplayStages::new(true, None);
153        assert!(resolver.resolve("dataguard_fused").is_some());
154    }
155}