1use std::path::Path;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use byteor_pipeline_exec::ExecError;
7
8use crate::audit::{ReplayAudit, ReplayAuditAction, ReplayAuditInput, ReplayAuditPolicy};
9use crate::bundle::load_bundle;
10use crate::execution::run_sp_replay;
11use crate::journal::read_last_n_records;
12use crate::options::{accumulate_replay_accounting, replay_journal_buffer_limit_bytes};
13use crate::options::{ReplayMode, ReplayOptions};
14use crate::policy::{decide_for_action, decision_to_string};
15use crate::side_effects::side_effect_stages;
16
17pub fn replay_bundle_last_n(opts: ReplayOptions) -> Result<ReplayAudit, String> {
19 if opts.last_n == 0 {
20 return Err("replay: --last must be > 0".to_string());
21 }
22
23 let bundle = load_bundle(&opts.bundle_dir)?;
24
25 let journal_lane = select_journal_lane(&bundle, opts.journal_lane.as_deref())?;
26 let slice = read_last_n_records(
27 &journal_lane.journal_path,
28 opts.last_n,
29 replay_journal_buffer_limit_bytes(&opts)?,
30 )
31 .map_err(|e| e.to_string())?;
32
33 let approval_present = matches!(opts.approval.as_deref(), Some(s) if !s.is_empty());
34 let dry_run = opts.mode == ReplayMode::DryRun;
35 let selected_bytes: u64 = slice.selected.iter().map(|r| r.bytes.len() as u64).sum();
36 let selected_actions = slice.selected.len() as u64;
37
38 let side_effect_actions = side_effect_stages(&bundle.spec);
39 let mut audit_actions = Vec::new();
40 let mut replay_accounting = byteor_policy::ReplayAccounting::default();
41 for a in side_effect_actions {
42 replay_accounting =
43 accumulate_replay_accounting(replay_accounting, selected_actions, selected_bytes);
44
45 let decision = decide_for_action(
46 opts.env,
47 approval_present,
48 dry_run,
49 &a.action,
50 replay_accounting,
51 );
52 audit_actions.push(ReplayAuditAction {
53 action: a.action.clone(),
54 role: a.role.clone(),
55 input_lane: a.rx_lane.clone(),
56 output_lane: a.tx_lane.clone(),
57 stage: a.stage_key.clone(),
58 target: a.target.clone(),
59 decision: decision_to_string(decision),
60 });
61
62 match decision {
63 byteor_policy::Decision::Allow => {}
64 byteor_policy::Decision::DryRunOnly => {
65 if !dry_run {
66 return Err("replay: policy allows replay only as dry-run".to_string());
67 }
68 }
69 byteor_policy::Decision::RequireApproval(g) => {
70 return Err(format!(
71 "replay: policy requires approval token {}",
72 g.token_name
73 ));
74 }
75 byteor_policy::Decision::Deny(r) => {
76 return Err(format!("replay: policy denied: {r:?}"));
77 }
78 }
79 }
80
81 run_sp_replay(&bundle.spec, opts.clean, dry_run, &opts, &slice)?;
82
83 let spec_sha256 = format!("sha256:{}", sha256_hex(&bundle.spec_kv_bytes));
84 let mut sample_sha256 = Vec::new();
85 for r in slice.selected.iter().take(3) {
86 sample_sha256.push(format!("sha256:{}", r.sha256_hex));
87 }
88
89 let now = SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .unwrap_or_default();
92
93 let audit = ReplayAudit {
94 audit_v: 1,
95 created_unix_ms: now.as_millis(),
96 bundle_dir: opts.bundle_dir.display().to_string(),
97 spec_sha256,
98 input: ReplayAuditInput {
99 journal_lane: journal_lane.lane_name.clone(),
100 journal_path: journal_lane.journal_path.display().to_string(),
101 last_n: opts.last_n as u64,
102 scanned_records: slice.scanned_records,
103 selected_bytes,
104 sample_sha256,
105 },
106 policy: ReplayAuditPolicy {
107 env: opts.env.as_str().to_string(),
108 approval_provided: approval_present,
109 mode: if dry_run { "dry_run" } else { "execute" }.to_string(),
110 },
111 actions: audit_actions,
112 };
113
114 write_audit_json(&opts.out_audit_path, &audit)?;
115 Ok(audit)
116}
117
118fn write_audit_json(path: &Path, audit: &ReplayAudit) -> Result<(), String> {
119 let bytes = serde_json::to_vec_pretty(audit).map_err(|e| format!("replay: audit json: {e}"))?;
120 if let Some(parent) = path.parent() {
121 if !parent.as_os_str().is_empty() {
122 std::fs::create_dir_all(parent)
123 .map_err(|e| format!("replay: create audit dir: {e}"))?;
124 }
125 }
126 std::fs::write(path, bytes).map_err(|e| format!("replay: write audit json: {e}"))?;
127 Ok(())
128}
129
130fn select_journal_lane(
131 bundle: &crate::bundle::Bundle,
132 wanted: Option<&str>,
133) -> Result<crate::bundle::BundleJournalLane, String> {
134 if bundle.journal_lanes.is_empty() {
135 return Err("replay: bundle contains no journal lane files (lanes/ missing)".to_string());
136 }
137
138 if let Some(wanted) = wanted {
139 for j in &bundle.journal_lanes {
140 if j.lane_name == wanted {
141 return Ok(j.clone());
142 }
143 }
144 return Err(format!("replay: unknown --journal-lane {wanted}"));
145 }
146
147 if bundle.journal_lanes.len() == 1 {
148 return Ok(bundle.journal_lanes[0].clone());
149 }
150
151 let names: Vec<_> = bundle
152 .journal_lanes
153 .iter()
154 .map(|j| j.lane_name.as_str())
155 .collect();
156 Err(format!(
157 "replay: multiple journal lanes in bundle; specify --journal-lane (available={names:?})"
158 ))
159}
160
161fn sha256_hex(bytes: &[u8]) -> String {
162 use sha2::Digest;
163 let mut hasher = sha2::Sha256::new();
164 hasher.update(bytes);
165 let digest = hasher.finalize();
166 hex::encode(digest)
167}
168
169fn _to_string_exec_error(e: ExecError) -> String {
170 e.to_string()
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use byteor_core::config::Environment;
177
178 #[test]
179 fn replay_writes_audit_single_ring_identity() {
180 let out = tempfile::tempdir().unwrap();
181 let bundle_dir = out.path().join("bundle");
182 std::fs::create_dir_all(&bundle_dir).unwrap();
183
184 let spec =
185 byteor_pipeline_spec::single_ring_chain(&[byteor_pipeline_spec::StageOpV1::Identity])
186 .unwrap();
187 let kv = byteor_pipeline_spec::encode_kv_v1(&spec);
188 std::fs::write(bundle_dir.join("spec.kv"), kv).unwrap();
189
190 let lanes_dir = bundle_dir.join("lanes");
191 std::fs::create_dir_all(&lanes_dir).unwrap();
192 let journal_path = lanes_dir.join("journal.mmap");
193 {
194 let mut region =
195 indexbus_transport_shm::JournalRegion4::open_path(&journal_path).unwrap();
196 let mut pubr = region
197 .publisher(indexbus_log::JournalPublisherConfig::default())
198 .unwrap();
199 pubr.try_append(b"hello").unwrap();
200 }
201
202 let audit_path = out.path().join("audit.json");
203 let lane_path = out.path().join("single_ring.mmap");
204
205 let audit = replay_bundle_last_n(ReplayOptions {
206 bundle_dir: bundle_dir.clone(),
207 lane_graph_shm_dir: None,
208 single_ring_lane_path: Some(lane_path),
209 journal_lane: None,
210 last_n: 1,
211 mode: ReplayMode::DryRun,
212 dataguard: None,
213 env: Environment::Dev,
214 approval: None,
215 out_audit_path: audit_path.clone(),
216 clean: true,
217 drain_timeout: None,
218 journal_buffer_limit_bytes: None,
219 })
220 .unwrap();
221
222 assert_eq!(audit.audit_v, 1);
223 let audit_bytes = std::fs::read_to_string(&audit_path).unwrap();
224 let v: serde_json::Value = serde_json::from_str(&audit_bytes).unwrap();
225 assert_eq!(v["policy"]["mode"], serde_json::json!("dry_run"));
226 assert_eq!(v["input"]["last_n"], serde_json::json!(1));
227 }
228
229 #[test]
230 fn replay_denies_when_projected_actions_exceed_quota() {
231 let mut accounting = byteor_policy::ReplayAccounting::default();
232 let mut decision = byteor_policy::Decision::Allow;
233
234 for _ in 0..10_001 {
235 accounting = accumulate_replay_accounting(accounting, 1, 1);
236 decision = decide_for_action(Environment::Dev, false, true, "http_post", accounting);
237 }
238
239 assert_eq!(
240 decision,
241 byteor_policy::Decision::Deny(byteor_policy::DenyReason::ReplayQuotaExceeded)
242 );
243 }
244}