1use crate::{
4 EndpointKindV1, LaneGraphV1, LaneKindV1, MergePolicyV1, OrderingV1, PipelineSpecV1, RoleCfgV1,
5 SingleRingProducerV1, SingleRingSchedulingV1, SpecError, StageOpV1, FANOUT_V1_MAX_CONSUMERS,
6 SEQUENCED_SLOTS_V1_GATING,
7};
8
9pub fn validate_v1(spec: &PipelineSpecV1) -> Result<(), SpecError> {
14 #[cfg(not(feature = "alloc"))]
15 {
16 let _ = spec;
17 return Err(SpecError::new("spec v1 requires alloc"));
18 }
19
20 #[cfg(feature = "alloc")]
21 match spec {
22 PipelineSpecV1::LaneGraph(lg) => validate_lane_graph_v1(lg),
23 PipelineSpecV1::SingleRing(ring) => {
24 if ring.shards == 0 {
25 return Err(SpecError::new("single_ring.shards must be >= 1"));
26 }
27
28 match ring.producer {
29 SingleRingProducerV1::Single => {}
30 SingleRingProducerV1::Mpmc => {
31 return Err(SpecError::new(
32 "single_ring producer mpmc is not supported in v1",
33 ));
34 }
35 }
36
37 match ring.scheduling {
38 SingleRingSchedulingV1::Dedicated => {}
39 SingleRingSchedulingV1::WorkQueue => {
40 return Err(SpecError::new(
41 "single_ring scheduling work_queue is not supported in v1",
42 ));
43 }
44 }
45
46 if ring.shards > 1 && ring.ordering == OrderingV1::Strict {
47 return Err(SpecError::new(
48 "single_ring ordering strict is not compatible with shards>1 in v1",
49 ));
50 }
51
52 if ring.stages.is_empty() {
53 return Err(SpecError::new("single_ring requires at least one stage"));
54 }
55
56 if ring.stages.len() > SEQUENCED_SLOTS_V1_GATING as usize {
60 return Err(SpecError::new("single_ring stage count exceeds v1 limit"));
61 }
62
63 for (i, st) in ring.stages.iter().enumerate() {
64 if let StageOpV1::ResolverKey { stage } = &st.op {
65 if stage.trim().is_empty() {
66 return Err(SpecError::new(
67 "single_ring stage identity must not be empty",
68 ));
69 }
70 if stage.contains('\n') || stage.contains('\r') {
71 return Err(SpecError::new(
72 "single_ring stage identity must not contain newlines",
73 ));
74 }
75 }
76
77 let mut prev: Option<u32> = None;
80 for &d in &st.depends_on {
81 if let Some(p) = prev {
82 if d <= p {
83 return Err(SpecError::new(
84 "single_ring dependencies must be strictly increasing (sorted, unique)",
85 ));
86 }
87 }
88 prev = Some(d);
89 }
90
91 if i > 0 && st.depends_on.is_empty() {
94 match st.op {
95 StageOpV1::Identity => {}
96 _ => {
97 return Err(SpecError::new(
98 "single_ring non-source stages must declare dependencies in v1",
99 ));
100 }
101 }
102 }
103
104 for &d in &st.depends_on {
106 let di = d as usize;
107 if di >= i {
108 return Err(SpecError::new(
109 "single_ring dependencies must reference earlier stages",
110 ));
111 }
112 }
113 }
114
115 Ok(())
116 }
117 }
118}
119
120#[cfg(feature = "alloc")]
121fn validate_lane_graph_v1(lg: &LaneGraphV1) -> Result<(), SpecError> {
122 use alloc::collections::BTreeMap;
123 use alloc::vec;
124
125 let profile_journal = cfg!(feature = "lane_graph_journal");
128 let profile_fanout = cfg!(feature = "lane_graph_fanout");
129 let profile_sequenced_slots = true;
130
131 if lg.roles.is_empty() {
132 return Err(SpecError::new("spec must contain at least one role"));
133 }
134
135 let lanes_by_name: BTreeMap<&str, LaneKindV1> =
136 lg.lanes.iter().map(|l| (l.name.as_str(), l.kind)).collect();
137
138 if lanes_by_name.len() != lg.lanes.len() {
139 return Err(SpecError::new("lane names must be unique"));
140 }
141
142 {
144 let mut endpoint_names = BTreeMap::<&str, ()>::new();
145 for ep in &lg.endpoints {
146 if endpoint_names.insert(ep.name.as_str(), ()).is_some() {
147 return Err(SpecError::new("endpoint names must be unique"));
148 }
149 if !lanes_by_name.contains_key(ep.lane.as_str()) {
150 return Err(SpecError::new("endpoint references unknown lane"));
151 }
152 }
153 }
154
155 for lane in &lg.lanes {
157 match lane.kind {
158 LaneKindV1::Events => {}
159 LaneKindV1::Journal => {
160 if !profile_journal {
161 return Err(SpecError::new("journal lanes are disabled"));
162 }
163 }
164 LaneKindV1::SequencedSlots { gating, .. } => {
165 if !profile_sequenced_slots {
166 return Err(SpecError::new("sequenced slots lanes are disabled"));
167 }
168 if gating != SEQUENCED_SLOTS_V1_GATING {
169 return Err(SpecError::new(
170 "sequenced slots gating must match SEQUENCED_SLOTS_V1_GATING",
171 ));
172 }
173 }
174 LaneKindV1::FanoutBroadcast { .. } => {
175 if !profile_fanout {
176 return Err(SpecError::new("fanout lanes are disabled"));
177 }
178 }
179 }
180 }
181
182 for lane in &lg.lanes {
184 if let LaneKindV1::FanoutBroadcast { consumers } = lane.kind {
185 if consumers == 0 {
186 return Err(SpecError::new("fanout consumers must be > 0"));
187 }
188 if consumers > FANOUT_V1_MAX_CONSUMERS {
189 return Err(SpecError::new("fanout consumers exceeds max"));
190 }
191 }
192
193 if let LaneKindV1::SequencedSlots { capacity, .. } = lane.kind {
194 if capacity == 0 {
195 return Err(SpecError::new("sequenced slots capacity must be > 0"));
196 }
197 }
198 }
199
200 let mut role_names = BTreeMap::<&str, ()>::new();
202 for role in &lg.roles {
203 if role_names.insert(role.name(), ()).is_some() {
204 return Err(SpecError::new("role names must be unique"));
205 }
206 for lane_ref in role.lane_refs() {
207 if !lanes_by_name.contains_key(lane_ref) {
208 return Err(SpecError::new("role references unknown lane"));
209 }
210 }
211 }
212
213 #[derive(Clone, Copy, Default)]
215 struct LaneStats {
216 producers: u32,
217 consumers: u32,
218 }
219
220 let mut stats = BTreeMap::<&str, LaneStats>::new();
221 for lane in &lg.lanes {
222 stats.insert(lane.name.as_str(), LaneStats::default());
223 }
224
225 for ep in &lg.endpoints {
226 let Some(s) = stats.get_mut(ep.lane.as_str()) else {
227 continue;
228 };
229 match ep.kind {
230 EndpointKindV1::Ingress => s.producers = s.producers.saturating_add(1),
231 EndpointKindV1::Egress => s.consumers = s.consumers.saturating_add(1),
232 }
233 }
234
235 for role in &lg.roles {
236 match role {
237 RoleCfgV1::Stage(cfg) => {
238 if cfg.stage.is_empty() {
239 return Err(SpecError::new(
240 "stage role stage identity must not be empty",
241 ));
242 }
243 if let Some(s) = stats.get_mut(cfg.rx.as_str()) {
244 s.consumers = s.consumers.saturating_add(1);
245 }
246 if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
247 s.producers = s.producers.saturating_add(1);
248 }
249 }
250 RoleCfgV1::Bridge(cfg) => {
251 if let Some(s) = stats.get_mut(cfg.rx.as_str()) {
252 s.consumers = s.consumers.saturating_add(1);
253 }
254 if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
255 s.producers = s.producers.saturating_add(1);
256 }
257 }
258 RoleCfgV1::Router(cfg) => {
259 if let Some(s) = stats.get_mut(cfg.rx.as_str()) {
260 s.consumers = s.consumers.saturating_add(1);
261 }
262 for tx in &cfg.tx {
263 if let Some(s) = stats.get_mut(tx.as_str()) {
264 s.producers = s.producers.saturating_add(1);
265 }
266 }
267 }
268 RoleCfgV1::Merge(cfg) => {
269 for rx in &cfg.rx {
270 if let Some(s) = stats.get_mut(rx.as_str()) {
271 s.consumers = s.consumers.saturating_add(1);
272 }
273 }
274 if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
275 s.producers = s.producers.saturating_add(1);
276 }
277 }
278 }
279 }
280
281 for lane in &lg.lanes {
283 let s = stats.get(lane.name.as_str()).copied().unwrap_or_default();
284 if s.producers == 0 {
285 return Err(SpecError::new("lane must have at least one producer"));
286 }
287 if s.consumers == 0 {
288 return Err(SpecError::new("lane must have at least one consumer"));
289 }
290
291 match lane.kind {
292 LaneKindV1::Events => {
293 if s.consumers > 1 {
294 return Err(SpecError::new("events lane must have at most one consumer"));
295 }
296 }
297 LaneKindV1::Journal => {
298 if s.producers > 1 {
299 return Err(SpecError::new(
300 "journal lane must have at most one producer",
301 ));
302 }
303 if s.consumers > 1 {
304 return Err(SpecError::new(
305 "journal lane must have at most one consumer",
306 ));
307 }
308 }
309 LaneKindV1::SequencedSlots { .. } => {
310 if s.producers > 1 {
311 return Err(SpecError::new(
312 "sequenced slots lane must have at most one producer",
313 ));
314 }
315 if s.consumers > 1 {
316 return Err(SpecError::new(
317 "sequenced slots lane must have at most one consumer",
318 ));
319 }
320 }
321 LaneKindV1::FanoutBroadcast { .. } => {
322 if s.producers > 1 {
323 return Err(SpecError::new("fanout lane must have at most one producer"));
324 }
325 if s.consumers > 1 {
326 return Err(SpecError::new("fanout lane must have at most one consumer"));
327 }
328 }
329 }
330 }
331
332 for role in &lg.roles {
334 let RoleCfgV1::Router(router) = role else {
335 continue;
336 };
337
338 if !profile_fanout {
339 return Err(SpecError::new("router roles require fanout support"));
340 }
341
342 let Some(rx_kind) = lanes_by_name.get(router.rx.as_str()) else {
343 continue;
344 };
345 let LaneKindV1::FanoutBroadcast { consumers } = *rx_kind else {
346 return Err(SpecError::new(
347 "router rx lane kind must be FanoutBroadcast in v1",
348 ));
349 };
350
351 if router.tx.len() != consumers as usize {
352 return Err(SpecError::new(
353 "router tx lane count must equal fanout consumers in v1",
354 ));
355 }
356
357 {
358 let mut unique = BTreeMap::<&str, ()>::new();
359 for tx in &router.tx {
360 if unique.insert(tx.as_str(), ()).is_some() {
361 return Err(SpecError::new("router tx lanes must be unique"));
362 }
363 }
364 }
365
366 for tx_lane in &router.tx {
367 let Some(tx_kind) = lanes_by_name.get(tx_lane.as_str()) else {
368 continue;
369 };
370 if !matches!(tx_kind, LaneKindV1::Events) {
371 return Err(SpecError::new("router tx lanes must be Events in v1"));
372 }
373 }
374 }
375
376 for role in &lg.roles {
378 let RoleCfgV1::Stage(stage) = role else {
379 continue;
380 };
381 let Some(rx_kind) = lanes_by_name.get(stage.rx.as_str()) else {
382 continue;
383 };
384 let Some(tx_kind) = lanes_by_name.get(stage.tx.as_str()) else {
385 continue;
386 };
387
388 if rx_kind != tx_kind {
389 return Err(SpecError::new(
390 "stage role rx lane kind must equal tx lane kind (bridges required)",
391 ));
392 }
393 if matches!(rx_kind, LaneKindV1::FanoutBroadcast { .. }) {
394 return Err(SpecError::new(
395 "stage roles must not consume FanoutBroadcast lanes in v1 (router required)",
396 ));
397 }
398 }
399
400 for role in &lg.roles {
402 let RoleCfgV1::Bridge(bridge) = role else {
403 continue;
404 };
405 let Some(rx_kind) = lanes_by_name.get(bridge.rx.as_str()) else {
406 continue;
407 };
408 if matches!(rx_kind, LaneKindV1::FanoutBroadcast { .. }) {
409 return Err(SpecError::new(
410 "bridge roles must not consume FanoutBroadcast lanes in v1 (router required)",
411 ));
412 }
413 }
414
415 for role in &lg.roles {
417 let RoleCfgV1::Merge(merge) = role else {
418 continue;
419 };
420
421 if merge.rx.is_empty() {
422 return Err(SpecError::new("merge role rx must not be empty"));
423 }
424
425 {
426 let mut unique = BTreeMap::<&str, ()>::new();
427 for rx in &merge.rx {
428 if unique.insert(rx.as_str(), ()).is_some() {
429 return Err(SpecError::new("merge rx lanes must be unique"));
430 }
431 }
432 }
433
434 let Some(tx_kind) = lanes_by_name.get(merge.tx.as_str()) else {
435 continue;
436 };
437
438 for rx_lane in &merge.rx {
439 let Some(rx_kind) = lanes_by_name.get(rx_lane.as_str()) else {
440 continue;
441 };
442
443 if rx_kind != tx_kind {
444 return Err(SpecError::new(
445 "merge role rx lane kind must equal tx lane kind (bridges required)",
446 ));
447 }
448 if matches!(rx_kind, LaneKindV1::FanoutBroadcast { .. }) {
449 return Err(SpecError::new(
450 "merge roles must not consume FanoutBroadcast lanes in v1 (router required)",
451 ));
452 }
453 if matches!(rx_kind, LaneKindV1::Journal) && !profile_journal {
454 return Err(SpecError::new("journal lanes are disabled"));
455 }
456 if matches!(rx_kind, LaneKindV1::SequencedSlots { .. }) && !profile_sequenced_slots {
457 return Err(SpecError::new("sequenced slots lanes are disabled"));
458 }
459 }
460
461 match merge.policy {
462 MergePolicyV1::RoundRobin => {}
463 }
464 }
465
466 let lane_names: alloc::vec::Vec<&str> = lg.lanes.iter().map(|l| l.name.as_str()).collect();
468 let mut lane_idx = BTreeMap::<&str, usize>::new();
469 for (i, name) in lane_names.iter().copied().enumerate() {
470 lane_idx.insert(name, i);
471 }
472
473 let mut adj: alloc::vec::Vec<alloc::vec::Vec<usize>> =
474 vec![alloc::vec::Vec::new(); lane_names.len()];
475 let mut add_edge = |from: &str, to: &str| {
476 let Some(&a) = lane_idx.get(from) else {
477 return;
478 };
479 let Some(&b) = lane_idx.get(to) else {
480 return;
481 };
482 adj[a].push(b);
483 };
484
485 for role in &lg.roles {
486 match role {
487 RoleCfgV1::Stage(cfg) => add_edge(cfg.rx.as_str(), cfg.tx.as_str()),
488 RoleCfgV1::Bridge(cfg) => add_edge(cfg.rx.as_str(), cfg.tx.as_str()),
489 RoleCfgV1::Router(cfg) => {
490 for tx in &cfg.tx {
491 add_edge(cfg.rx.as_str(), tx.as_str());
492 }
493 }
494 RoleCfgV1::Merge(cfg) => {
495 for rx in &cfg.rx {
496 add_edge(rx.as_str(), cfg.tx.as_str());
497 }
498 }
499 }
500 }
501
502 #[derive(Clone, Copy, PartialEq, Eq)]
504 enum Mark {
505 Temp,
506 Perm,
507 }
508 let mut marks: alloc::vec::Vec<Option<Mark>> = vec![None; lane_names.len()];
509 fn visit(
510 n: usize,
511 adj: &[alloc::vec::Vec<usize>],
512 marks: &mut [Option<Mark>],
513 ) -> Result<(), SpecError> {
514 match marks[n] {
515 Some(Mark::Perm) => return Ok(()),
516 Some(Mark::Temp) => return Err(SpecError::new("lane graph contains a cycle")),
517 None => {}
518 }
519 marks[n] = Some(Mark::Temp);
520 for &m in &adj[n] {
521 visit(m, adj, marks)?;
522 }
523 marks[n] = Some(Mark::Perm);
524 Ok(())
525 }
526 for n in 0..lane_names.len() {
527 if marks[n].is_none() {
528 visit(n, &adj, &mut marks)?;
529 }
530 }
531
532 Ok(())
533}