byteor_pipeline_spec/
validate.rs

1//! Spec validation.
2
3use crate::{
4    EndpointKindV1, LaneGraphV1, LaneKindV1, MergePolicyV1, OrderingV1, PipelineSpecV1, RoleCfgV1,
5    SingleRingProducerV1, SingleRingSchedulingV1, SpecError, StageOpV1, FANOUT_V1_MAX_CONSUMERS,
6    SEQUENCED_SLOTS_V1_GATING,
7};
8
9/// Validate the spec for v1.
10///
11/// This is a scaffold: real validation (cycles, join satisfiability, ordering
12/// constraints) will be implemented alongside the full v1 model.
13pub fn validate_v1(spec: &PipelineSpecV1) -> Result<(), SpecError> {
14    #[cfg(not(feature = "alloc"))]
15    {
16        let _ = spec;
17        return Err(SpecError::new("spec v1 requires alloc"));
18    }
19
20    #[cfg(feature = "alloc")]
21    match spec {
22        PipelineSpecV1::LaneGraph(lg) => validate_lane_graph_v1(lg),
23        PipelineSpecV1::SingleRing(ring) => {
24            if ring.shards == 0 {
25                return Err(SpecError::new("single_ring.shards must be >= 1"));
26            }
27
28            match ring.producer {
29                SingleRingProducerV1::Single => {}
30                SingleRingProducerV1::Mpmc => {
31                    return Err(SpecError::new(
32                        "single_ring producer mpmc is not supported in v1",
33                    ));
34                }
35            }
36
37            match ring.scheduling {
38                SingleRingSchedulingV1::Dedicated => {}
39                SingleRingSchedulingV1::WorkQueue => {
40                    return Err(SpecError::new(
41                        "single_ring scheduling work_queue is not supported in v1",
42                    ));
43                }
44            }
45
46            if ring.shards > 1 && ring.ordering == OrderingV1::Strict {
47                return Err(SpecError::new(
48                    "single_ring ordering strict is not compatible with shards>1 in v1",
49                ));
50            }
51
52            if ring.stages.is_empty() {
53                return Err(SpecError::new("single_ring requires at least one stage"));
54            }
55
56            // Current OSS backing uses `SequencedSlotsLayout4` with 4 gating sequences.
57            // v1 contract: the final stage is responsible for releasing slots (acts as the
58            // consumer), so we can use all 4 gating cells for stages.
59            if ring.stages.len() > SEQUENCED_SLOTS_V1_GATING as usize {
60                return Err(SpecError::new("single_ring stage count exceeds v1 limit"));
61            }
62
63            for (i, st) in ring.stages.iter().enumerate() {
64                if let StageOpV1::ResolverKey { stage } = &st.op {
65                    if stage.trim().is_empty() {
66                        return Err(SpecError::new(
67                            "single_ring stage identity must not be empty",
68                        ));
69                    }
70                    if stage.contains('\n') || stage.contains('\r') {
71                        return Err(SpecError::new(
72                            "single_ring stage identity must not contain newlines",
73                        ));
74                    }
75                }
76
77                // Executor-alignment: dependency lists must be stable and deduped.
78                // The shipped executors treat `depends_on` as a set of barrier indices.
79                let mut prev: Option<u32> = None;
80                for &d in &st.depends_on {
81                    if let Some(p) = prev {
82                        if d <= p {
83                            return Err(SpecError::new(
84                                "single_ring dependencies must be strictly increasing (sorted, unique)",
85                            ));
86                        }
87                    }
88                    prev = Some(d);
89                }
90
91                // Executor-alignment: stages with no dependencies run off the producer cursor.
92                // In v1, only `Identity` is explicitly read-only; all other ops may mutate.
93                if i > 0 && st.depends_on.is_empty() {
94                    match st.op {
95                        StageOpV1::Identity => {}
96                        _ => {
97                            return Err(SpecError::new(
98                                "single_ring non-source stages must declare dependencies in v1",
99                            ));
100                        }
101                    }
102                }
103
104                // Enforce topological order and prevent self-deps.
105                for &d in &st.depends_on {
106                    let di = d as usize;
107                    if di >= i {
108                        return Err(SpecError::new(
109                            "single_ring dependencies must reference earlier stages",
110                        ));
111                    }
112                }
113            }
114
115            Ok(())
116        }
117    }
118}
119
120#[cfg(feature = "alloc")]
121fn validate_lane_graph_v1(lg: &LaneGraphV1) -> Result<(), SpecError> {
122    use alloc::collections::BTreeMap;
123    use alloc::vec;
124
125    // Keep this aligned with what OSS actually intends to support by default.
126    // Journal and fanout can be enabled later behind features.
127    let profile_journal = cfg!(feature = "lane_graph_journal");
128    let profile_fanout = cfg!(feature = "lane_graph_fanout");
129    let profile_sequenced_slots = true;
130
131    if lg.roles.is_empty() {
132        return Err(SpecError::new("spec must contain at least one role"));
133    }
134
135    let lanes_by_name: BTreeMap<&str, LaneKindV1> =
136        lg.lanes.iter().map(|l| (l.name.as_str(), l.kind)).collect();
137
138    if lanes_by_name.len() != lg.lanes.len() {
139        return Err(SpecError::new("lane names must be unique"));
140    }
141
142    // Endpoint names unique + lane references exist.
143    {
144        let mut endpoint_names = BTreeMap::<&str, ()>::new();
145        for ep in &lg.endpoints {
146            if endpoint_names.insert(ep.name.as_str(), ()).is_some() {
147                return Err(SpecError::new("endpoint names must be unique"));
148            }
149            if !lanes_by_name.contains_key(ep.lane.as_str()) {
150                return Err(SpecError::new("endpoint references unknown lane"));
151            }
152        }
153    }
154
155    // Lane kind support + per-kind constraints.
156    for lane in &lg.lanes {
157        match lane.kind {
158            LaneKindV1::Events => {}
159            LaneKindV1::Journal => {
160                if !profile_journal {
161                    return Err(SpecError::new("journal lanes are disabled"));
162                }
163            }
164            LaneKindV1::SequencedSlots { gating, .. } => {
165                if !profile_sequenced_slots {
166                    return Err(SpecError::new("sequenced slots lanes are disabled"));
167                }
168                if gating != SEQUENCED_SLOTS_V1_GATING {
169                    return Err(SpecError::new(
170                        "sequenced slots gating must match SEQUENCED_SLOTS_V1_GATING",
171                    ));
172                }
173            }
174            LaneKindV1::FanoutBroadcast { .. } => {
175                if !profile_fanout {
176                    return Err(SpecError::new("fanout lanes are disabled"));
177                }
178            }
179        }
180    }
181
182    // Cardinality limits (v1): keep runtime and validation aligned.
183    for lane in &lg.lanes {
184        if let LaneKindV1::FanoutBroadcast { consumers } = lane.kind {
185            if consumers == 0 {
186                return Err(SpecError::new("fanout consumers must be > 0"));
187            }
188            if consumers > FANOUT_V1_MAX_CONSUMERS {
189                return Err(SpecError::new("fanout consumers exceeds max"));
190            }
191        }
192
193        if let LaneKindV1::SequencedSlots { capacity, .. } = lane.kind {
194            if capacity == 0 {
195                return Err(SpecError::new("sequenced slots capacity must be > 0"));
196            }
197        }
198    }
199
200    // Role names unique + lane references exist.
201    let mut role_names = BTreeMap::<&str, ()>::new();
202    for role in &lg.roles {
203        if role_names.insert(role.name(), ()).is_some() {
204            return Err(SpecError::new("role names must be unique"));
205        }
206        for lane_ref in role.lane_refs() {
207            if !lanes_by_name.contains_key(lane_ref) {
208                return Err(SpecError::new("role references unknown lane"));
209            }
210        }
211    }
212
213    // Producer/consumer graph checks + per-kind cardinality constraints.
214    #[derive(Clone, Copy, Default)]
215    struct LaneStats {
216        producers: u32,
217        consumers: u32,
218    }
219
220    let mut stats = BTreeMap::<&str, LaneStats>::new();
221    for lane in &lg.lanes {
222        stats.insert(lane.name.as_str(), LaneStats::default());
223    }
224
225    for ep in &lg.endpoints {
226        let Some(s) = stats.get_mut(ep.lane.as_str()) else {
227            continue;
228        };
229        match ep.kind {
230            EndpointKindV1::Ingress => s.producers = s.producers.saturating_add(1),
231            EndpointKindV1::Egress => s.consumers = s.consumers.saturating_add(1),
232        }
233    }
234
235    for role in &lg.roles {
236        match role {
237            RoleCfgV1::Stage(cfg) => {
238                if cfg.stage.is_empty() {
239                    return Err(SpecError::new(
240                        "stage role stage identity must not be empty",
241                    ));
242                }
243                if let Some(s) = stats.get_mut(cfg.rx.as_str()) {
244                    s.consumers = s.consumers.saturating_add(1);
245                }
246                if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
247                    s.producers = s.producers.saturating_add(1);
248                }
249            }
250            RoleCfgV1::Bridge(cfg) => {
251                if let Some(s) = stats.get_mut(cfg.rx.as_str()) {
252                    s.consumers = s.consumers.saturating_add(1);
253                }
254                if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
255                    s.producers = s.producers.saturating_add(1);
256                }
257            }
258            RoleCfgV1::Router(cfg) => {
259                if let Some(s) = stats.get_mut(cfg.rx.as_str()) {
260                    s.consumers = s.consumers.saturating_add(1);
261                }
262                for tx in &cfg.tx {
263                    if let Some(s) = stats.get_mut(tx.as_str()) {
264                        s.producers = s.producers.saturating_add(1);
265                    }
266                }
267            }
268            RoleCfgV1::Merge(cfg) => {
269                for rx in &cfg.rx {
270                    if let Some(s) = stats.get_mut(rx.as_str()) {
271                        s.consumers = s.consumers.saturating_add(1);
272                    }
273                }
274                if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
275                    s.producers = s.producers.saturating_add(1);
276                }
277            }
278        }
279    }
280
281    // Every lane in the spec must be connected.
282    for lane in &lg.lanes {
283        let s = stats.get(lane.name.as_str()).copied().unwrap_or_default();
284        if s.producers == 0 {
285            return Err(SpecError::new("lane must have at least one producer"));
286        }
287        if s.consumers == 0 {
288            return Err(SpecError::new("lane must have at least one consumer"));
289        }
290
291        match lane.kind {
292            LaneKindV1::Events => {
293                if s.consumers > 1 {
294                    return Err(SpecError::new("events lane must have at most one consumer"));
295                }
296            }
297            LaneKindV1::Journal => {
298                if s.producers > 1 {
299                    return Err(SpecError::new(
300                        "journal lane must have at most one producer",
301                    ));
302                }
303                if s.consumers > 1 {
304                    return Err(SpecError::new(
305                        "journal lane must have at most one consumer",
306                    ));
307                }
308            }
309            LaneKindV1::SequencedSlots { .. } => {
310                if s.producers > 1 {
311                    return Err(SpecError::new(
312                        "sequenced slots lane must have at most one producer",
313                    ));
314                }
315                if s.consumers > 1 {
316                    return Err(SpecError::new(
317                        "sequenced slots lane must have at most one consumer",
318                    ));
319                }
320            }
321            LaneKindV1::FanoutBroadcast { .. } => {
322                if s.producers > 1 {
323                    return Err(SpecError::new("fanout lane must have at most one producer"));
324                }
325                if s.consumers > 1 {
326                    return Err(SpecError::new("fanout lane must have at most one consumer"));
327                }
328            }
329        }
330    }
331
332    // Router role constraints.
333    for role in &lg.roles {
334        let RoleCfgV1::Router(router) = role else {
335            continue;
336        };
337
338        if !profile_fanout {
339            return Err(SpecError::new("router roles require fanout support"));
340        }
341
342        let Some(rx_kind) = lanes_by_name.get(router.rx.as_str()) else {
343            continue;
344        };
345        let LaneKindV1::FanoutBroadcast { consumers } = *rx_kind else {
346            return Err(SpecError::new(
347                "router rx lane kind must be FanoutBroadcast in v1",
348            ));
349        };
350
351        if router.tx.len() != consumers as usize {
352            return Err(SpecError::new(
353                "router tx lane count must equal fanout consumers in v1",
354            ));
355        }
356
357        {
358            let mut unique = BTreeMap::<&str, ()>::new();
359            for tx in &router.tx {
360                if unique.insert(tx.as_str(), ()).is_some() {
361                    return Err(SpecError::new("router tx lanes must be unique"));
362                }
363            }
364        }
365
366        for tx_lane in &router.tx {
367            let Some(tx_kind) = lanes_by_name.get(tx_lane.as_str()) else {
368                continue;
369            };
370            if !matches!(tx_kind, LaneKindV1::Events) {
371                return Err(SpecError::new("router tx lanes must be Events in v1"));
372            }
373        }
374    }
375
376    // Stage lane-kind matrix: v1 stage roles must be same-kind rx/tx.
377    for role in &lg.roles {
378        let RoleCfgV1::Stage(stage) = role else {
379            continue;
380        };
381        let Some(rx_kind) = lanes_by_name.get(stage.rx.as_str()) else {
382            continue;
383        };
384        let Some(tx_kind) = lanes_by_name.get(stage.tx.as_str()) else {
385            continue;
386        };
387
388        if rx_kind != tx_kind {
389            return Err(SpecError::new(
390                "stage role rx lane kind must equal tx lane kind (bridges required)",
391            ));
392        }
393        if matches!(rx_kind, LaneKindV1::FanoutBroadcast { .. }) {
394            return Err(SpecError::new(
395                "stage roles must not consume FanoutBroadcast lanes in v1 (router required)",
396            ));
397        }
398    }
399
400    // Bridge constraints (fanout): must not consume fanout.
401    for role in &lg.roles {
402        let RoleCfgV1::Bridge(bridge) = role else {
403            continue;
404        };
405        let Some(rx_kind) = lanes_by_name.get(bridge.rx.as_str()) else {
406            continue;
407        };
408        if matches!(rx_kind, LaneKindV1::FanoutBroadcast { .. }) {
409            return Err(SpecError::new(
410                "bridge roles must not consume FanoutBroadcast lanes in v1 (router required)",
411            ));
412        }
413    }
414
415    // Merge role constraints.
416    for role in &lg.roles {
417        let RoleCfgV1::Merge(merge) = role else {
418            continue;
419        };
420
421        if merge.rx.is_empty() {
422            return Err(SpecError::new("merge role rx must not be empty"));
423        }
424
425        {
426            let mut unique = BTreeMap::<&str, ()>::new();
427            for rx in &merge.rx {
428                if unique.insert(rx.as_str(), ()).is_some() {
429                    return Err(SpecError::new("merge rx lanes must be unique"));
430                }
431            }
432        }
433
434        let Some(tx_kind) = lanes_by_name.get(merge.tx.as_str()) else {
435            continue;
436        };
437
438        for rx_lane in &merge.rx {
439            let Some(rx_kind) = lanes_by_name.get(rx_lane.as_str()) else {
440                continue;
441            };
442
443            if rx_kind != tx_kind {
444                return Err(SpecError::new(
445                    "merge role rx lane kind must equal tx lane kind (bridges required)",
446                ));
447            }
448            if matches!(rx_kind, LaneKindV1::FanoutBroadcast { .. }) {
449                return Err(SpecError::new(
450                    "merge roles must not consume FanoutBroadcast lanes in v1 (router required)",
451                ));
452            }
453            if matches!(rx_kind, LaneKindV1::Journal) && !profile_journal {
454                return Err(SpecError::new("journal lanes are disabled"));
455            }
456            if matches!(rx_kind, LaneKindV1::SequencedSlots { .. }) && !profile_sequenced_slots {
457                return Err(SpecError::new("sequenced slots lanes are disabled"));
458            }
459        }
460
461        match merge.policy {
462            MergePolicyV1::RoundRobin => {}
463        }
464    }
465
466    // Cycle detection: lane → lane graph from role wiring.
467    let lane_names: alloc::vec::Vec<&str> = lg.lanes.iter().map(|l| l.name.as_str()).collect();
468    let mut lane_idx = BTreeMap::<&str, usize>::new();
469    for (i, name) in lane_names.iter().copied().enumerate() {
470        lane_idx.insert(name, i);
471    }
472
473    let mut adj: alloc::vec::Vec<alloc::vec::Vec<usize>> =
474        vec![alloc::vec::Vec::new(); lane_names.len()];
475    let mut add_edge = |from: &str, to: &str| {
476        let Some(&a) = lane_idx.get(from) else {
477            return;
478        };
479        let Some(&b) = lane_idx.get(to) else {
480            return;
481        };
482        adj[a].push(b);
483    };
484
485    for role in &lg.roles {
486        match role {
487            RoleCfgV1::Stage(cfg) => add_edge(cfg.rx.as_str(), cfg.tx.as_str()),
488            RoleCfgV1::Bridge(cfg) => add_edge(cfg.rx.as_str(), cfg.tx.as_str()),
489            RoleCfgV1::Router(cfg) => {
490                for tx in &cfg.tx {
491                    add_edge(cfg.rx.as_str(), tx.as_str());
492                }
493            }
494            RoleCfgV1::Merge(cfg) => {
495                for rx in &cfg.rx {
496                    add_edge(rx.as_str(), cfg.tx.as_str());
497                }
498            }
499        }
500    }
501
502    // DFS cycle detection.
503    #[derive(Clone, Copy, PartialEq, Eq)]
504    enum Mark {
505        Temp,
506        Perm,
507    }
508    let mut marks: alloc::vec::Vec<Option<Mark>> = vec![None; lane_names.len()];
509    fn visit(
510        n: usize,
511        adj: &[alloc::vec::Vec<usize>],
512        marks: &mut [Option<Mark>],
513    ) -> Result<(), SpecError> {
514        match marks[n] {
515            Some(Mark::Perm) => return Ok(()),
516            Some(Mark::Temp) => return Err(SpecError::new("lane graph contains a cycle")),
517            None => {}
518        }
519        marks[n] = Some(Mark::Temp);
520        for &m in &adj[n] {
521            visit(m, adj, marks)?;
522        }
523        marks[n] = Some(Mark::Perm);
524        Ok(())
525    }
526    for n in 0..lane_names.len() {
527        if marks[n].is_none() {
528            visit(n, &adj, &mut marks)?;
529        }
530    }
531
532    Ok(())
533}