byteor_replay/
replay.rs

1//! Replay orchestration entrypoints.
2
3use std::path::Path;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use byteor_pipeline_exec::ExecError;
7
8use crate::audit::{ReplayAudit, ReplayAuditAction, ReplayAuditInput, ReplayAuditPolicy};
9use crate::bundle::load_bundle;
10use crate::execution::run_sp_replay;
11use crate::journal::read_last_n_records;
12use crate::options::{accumulate_replay_accounting, replay_journal_buffer_limit_bytes};
13use crate::options::{ReplayMode, ReplayOptions};
14use crate::policy::{decide_for_action, decision_to_string};
15use crate::side_effects::side_effect_stages;
16
17/// Replay a selected journal slice into the spec from an incident bundle.
18pub fn replay_bundle_last_n(opts: ReplayOptions) -> Result<ReplayAudit, String> {
19    if opts.last_n == 0 {
20        return Err("replay: --last must be > 0".to_string());
21    }
22
23    let bundle = load_bundle(&opts.bundle_dir)?;
24
25    let journal_lane = select_journal_lane(&bundle, opts.journal_lane.as_deref())?;
26    let slice = read_last_n_records(
27        &journal_lane.journal_path,
28        opts.last_n,
29        replay_journal_buffer_limit_bytes(&opts)?,
30    )
31    .map_err(|e| e.to_string())?;
32
33    let approval_present = matches!(opts.approval.as_deref(), Some(s) if !s.is_empty());
34    let dry_run = opts.mode == ReplayMode::DryRun;
35    let selected_bytes: u64 = slice.selected.iter().map(|r| r.bytes.len() as u64).sum();
36    let selected_actions = slice.selected.len() as u64;
37
38    let side_effect_actions = side_effect_stages(&bundle.spec);
39    let mut audit_actions = Vec::new();
40    let mut replay_accounting = byteor_policy::ReplayAccounting::default();
41    for a in side_effect_actions {
42        replay_accounting =
43            accumulate_replay_accounting(replay_accounting, selected_actions, selected_bytes);
44
45        let decision = decide_for_action(
46            opts.env,
47            approval_present,
48            dry_run,
49            &a.action,
50            replay_accounting,
51        );
52        audit_actions.push(ReplayAuditAction {
53            action: a.action.clone(),
54            role: a.role.clone(),
55            input_lane: a.rx_lane.clone(),
56            output_lane: a.tx_lane.clone(),
57            stage: a.stage_key.clone(),
58            target: a.target.clone(),
59            decision: decision_to_string(decision),
60        });
61
62        match decision {
63            byteor_policy::Decision::Allow => {}
64            byteor_policy::Decision::DryRunOnly => {
65                if !dry_run {
66                    return Err("replay: policy allows replay only as dry-run".to_string());
67                }
68            }
69            byteor_policy::Decision::RequireApproval(g) => {
70                return Err(format!(
71                    "replay: policy requires approval token {}",
72                    g.token_name
73                ));
74            }
75            byteor_policy::Decision::Deny(r) => {
76                return Err(format!("replay: policy denied: {r:?}"));
77            }
78        }
79    }
80
81    run_sp_replay(&bundle.spec, opts.clean, dry_run, &opts, &slice)?;
82
83    let spec_sha256 = format!("sha256:{}", sha256_hex(&bundle.spec_kv_bytes));
84    let mut sample_sha256 = Vec::new();
85    for r in slice.selected.iter().take(3) {
86        sample_sha256.push(format!("sha256:{}", r.sha256_hex));
87    }
88
89    let now = SystemTime::now()
90        .duration_since(UNIX_EPOCH)
91        .unwrap_or_default();
92
93    let audit = ReplayAudit {
94        audit_v: 1,
95        created_unix_ms: now.as_millis(),
96        bundle_dir: opts.bundle_dir.display().to_string(),
97        spec_sha256,
98        input: ReplayAuditInput {
99            journal_lane: journal_lane.lane_name.clone(),
100            journal_path: journal_lane.journal_path.display().to_string(),
101            last_n: opts.last_n as u64,
102            scanned_records: slice.scanned_records,
103            selected_bytes,
104            sample_sha256,
105        },
106        policy: ReplayAuditPolicy {
107            env: opts.env.as_str().to_string(),
108            approval_provided: approval_present,
109            mode: if dry_run { "dry_run" } else { "execute" }.to_string(),
110        },
111        actions: audit_actions,
112    };
113
114    write_audit_json(&opts.out_audit_path, &audit)?;
115    Ok(audit)
116}
117
118fn write_audit_json(path: &Path, audit: &ReplayAudit) -> Result<(), String> {
119    let bytes = serde_json::to_vec_pretty(audit).map_err(|e| format!("replay: audit json: {e}"))?;
120    if let Some(parent) = path.parent() {
121        if !parent.as_os_str().is_empty() {
122            std::fs::create_dir_all(parent)
123                .map_err(|e| format!("replay: create audit dir: {e}"))?;
124        }
125    }
126    std::fs::write(path, bytes).map_err(|e| format!("replay: write audit json: {e}"))?;
127    Ok(())
128}
129
130fn select_journal_lane(
131    bundle: &crate::bundle::Bundle,
132    wanted: Option<&str>,
133) -> Result<crate::bundle::BundleJournalLane, String> {
134    if bundle.journal_lanes.is_empty() {
135        return Err("replay: bundle contains no journal lane files (lanes/ missing)".to_string());
136    }
137
138    if let Some(wanted) = wanted {
139        for j in &bundle.journal_lanes {
140            if j.lane_name == wanted {
141                return Ok(j.clone());
142            }
143        }
144        return Err(format!("replay: unknown --journal-lane {wanted}"));
145    }
146
147    if bundle.journal_lanes.len() == 1 {
148        return Ok(bundle.journal_lanes[0].clone());
149    }
150
151    let names: Vec<_> = bundle
152        .journal_lanes
153        .iter()
154        .map(|j| j.lane_name.as_str())
155        .collect();
156    Err(format!(
157        "replay: multiple journal lanes in bundle; specify --journal-lane (available={names:?})"
158    ))
159}
160
161fn sha256_hex(bytes: &[u8]) -> String {
162    use sha2::Digest;
163    let mut hasher = sha2::Sha256::new();
164    hasher.update(bytes);
165    let digest = hasher.finalize();
166    hex::encode(digest)
167}
168
169fn _to_string_exec_error(e: ExecError) -> String {
170    e.to_string()
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use byteor_core::config::Environment;
177
178    #[test]
179    fn replay_writes_audit_single_ring_identity() {
180        let out = tempfile::tempdir().unwrap();
181        let bundle_dir = out.path().join("bundle");
182        std::fs::create_dir_all(&bundle_dir).unwrap();
183
184        let spec =
185            byteor_pipeline_spec::single_ring_chain(&[byteor_pipeline_spec::StageOpV1::Identity])
186                .unwrap();
187        let kv = byteor_pipeline_spec::encode_kv_v1(&spec);
188        std::fs::write(bundle_dir.join("spec.kv"), kv).unwrap();
189
190        let lanes_dir = bundle_dir.join("lanes");
191        std::fs::create_dir_all(&lanes_dir).unwrap();
192        let journal_path = lanes_dir.join("journal.mmap");
193        {
194            let mut region =
195                indexbus_transport_shm::JournalRegion4::open_path(&journal_path).unwrap();
196            let mut pubr = region
197                .publisher(indexbus_log::JournalPublisherConfig::default())
198                .unwrap();
199            pubr.try_append(b"hello").unwrap();
200        }
201
202        let audit_path = out.path().join("audit.json");
203        let lane_path = out.path().join("single_ring.mmap");
204
205        let audit = replay_bundle_last_n(ReplayOptions {
206            bundle_dir: bundle_dir.clone(),
207            lane_graph_shm_dir: None,
208            single_ring_lane_path: Some(lane_path),
209            journal_lane: None,
210            last_n: 1,
211            mode: ReplayMode::DryRun,
212            dataguard: None,
213            env: Environment::Dev,
214            approval: None,
215            out_audit_path: audit_path.clone(),
216            clean: true,
217            drain_timeout: None,
218            journal_buffer_limit_bytes: None,
219        })
220        .unwrap();
221
222        assert_eq!(audit.audit_v, 1);
223        let audit_bytes = std::fs::read_to_string(&audit_path).unwrap();
224        let v: serde_json::Value = serde_json::from_str(&audit_bytes).unwrap();
225        assert_eq!(v["policy"]["mode"], serde_json::json!("dry_run"));
226        assert_eq!(v["input"]["last_n"], serde_json::json!(1));
227    }
228
229    #[test]
230    fn replay_denies_when_projected_actions_exceed_quota() {
231        let mut accounting = byteor_policy::ReplayAccounting::default();
232        let mut decision = byteor_policy::Decision::Allow;
233
234        for _ in 0..10_001 {
235            accounting = accumulate_replay_accounting(accounting, 1, 1);
236            decision = decide_for_action(Environment::Dev, false, true, "http_post", accounting);
237        }
238
239        assert_eq!(
240            decision,
241            byteor_policy::Decision::Deny(byteor_policy::DenyReason::ReplayQuotaExceeded)
242        );
243    }
244}