1use std::path::PathBuf;
4
5use byteor_dataguard_core::{DataGuardCtx, DataGuardStages, RedactPayloadMode};
6use byteor_pipeline_exec::{ResolvedStage, StageResolver};
7
8const HTTP_POST_PREFIX: &str = "http_post:";
9const EXEC_PREFIX: &str = "exec:";
10
11#[derive(Clone, Debug)]
17pub struct ReplayDataGuardConfig {
18 pub registry_root: PathBuf,
20 pub schema_id: u64,
22 pub msg_type: u32,
24 pub required_version: u16,
26 pub policy: byteor_redaction::RedactionPolicy,
28 pub mode: RedactPayloadMode,
30}
31
32fn identity_stage(input: &[u8], output: &mut [u8]) -> usize {
33 let n = input.len().min(output.len());
34 output[..n].copy_from_slice(&input[..n]);
35 n
36}
37
38fn is_http_post_stage(stage_key: &str) -> bool {
39 stage_key
40 .strip_prefix(HTTP_POST_PREFIX)
41 .is_some_and(|u| !u.is_empty())
42}
43
44fn is_exec_stage(stage_key: &str) -> bool {
45 let rest = stage_key.strip_prefix(EXEC_PREFIX);
46 match rest {
47 Some(r) if !r.is_empty() && !r.contains('\0') => true,
48 _ => false,
49 }
50}
51
52pub(crate) struct ReplayStages {
53 dry_run: bool,
54 dataguard: Option<DataGuardStages>,
55}
56
57impl ReplayStages {
58 pub(crate) fn new(dry_run: bool, dataguard: Option<ReplayDataGuardConfig>) -> Self {
59 let dataguard = dataguard.map(|cfg| {
60 let ctx = DataGuardCtx::new(cfg.registry_root);
61 DataGuardStages::new(
62 ctx,
63 cfg.schema_id,
64 cfg.msg_type,
65 cfg.required_version,
66 cfg.policy,
67 cfg.mode,
68 )
69 });
70
71 Self { dry_run, dataguard }
72 }
73}
74
75impl StageResolver for ReplayStages {
76 fn resolve(&self, stage: &str) -> Option<ResolvedStage> {
77 if stage == "identity" {
78 return Some(ResolvedStage::MapOk(identity_stage));
79 }
80
81 if let Some(dataguard) = &self.dataguard {
82 if let Some(stage) = dataguard.resolve(stage) {
83 return Some(stage);
84 }
85 }
86
87 if self.dry_run && (is_http_post_stage(stage) || is_exec_stage(stage)) {
89 return Some(ResolvedStage::MapOk(identity_stage));
90 }
91
92 if self.dry_run {
95 return Some(ResolvedStage::MapOk(identity_stage));
96 }
97
98 None
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105
106 use byteor_dataguard_core::{STAGE_KEY_CHECK, STAGE_KEY_FUSED, STAGE_KEY_REDACT};
107 use byteor_pipeline_exec::StageResolver;
108 use byteor_redaction::RedactionPolicy;
109 use byteor_schema::{Field, FieldType, FileSchemaRegistry, Schema};
110
111 fn write_schema(root: &std::path::Path) {
112 let registry = FileSchemaRegistry::new(root);
113 registry
114 .write(&Schema {
115 schema_id: 7,
116 msg_type: 9,
117 name: "orders".to_string(),
118 msg_version: 1,
119 fields: vec![Field {
120 name: "email".to_string(),
121 ty: FieldType::String,
122 optional: true,
123 }],
124 })
125 .unwrap();
126 }
127
128 #[test]
129 fn replay_resolves_dataguard_stages_when_configured() {
130 let tmp = tempfile::tempdir().unwrap();
131 write_schema(tmp.path());
132
133 let resolver = ReplayStages::new(
134 false,
135 Some(ReplayDataGuardConfig {
136 registry_root: tmp.path().to_path_buf(),
137 schema_id: 7,
138 msg_type: 9,
139 required_version: 1,
140 policy: RedactionPolicy::new(vec![]),
141 mode: RedactPayloadMode::MapKv,
142 }),
143 );
144
145 assert!(resolver.resolve(STAGE_KEY_CHECK).is_some());
146 assert!(resolver.resolve(STAGE_KEY_REDACT).is_some());
147 assert!(resolver.resolve(STAGE_KEY_FUSED).is_some());
148 }
149
150 #[test]
151 fn replay_dry_run_keeps_unknown_stage_identity() {
152 let resolver = ReplayStages::new(true, None);
153 assert!(resolver.resolve("dataguard_fused").is_some());
154 }
155}