1#![cfg_attr(not(feature = "std"), no_std)]
2#![deny(missing_docs)]
3#![deny(unreachable_pub, rust_2018_idioms)]
4#![forbid(unsafe_code)]
5
6#[cfg(feature = "alloc")]
9extern crate alloc;
10
11#[cfg(feature = "alloc")]
12use alloc::string::{String, ToString};
13#[cfg(feature = "alloc")]
14use alloc::vec::Vec;
15
16pub use byteor_pipeline_spec::{
17 OnFullV1, OrderingV1, ShardKeyV1, SingleRingProducerV1, SingleRingSchedulingV1,
18 FANOUT_V1_MAX_CONSUMERS, SEQUENCED_SLOTS_V1_GATING,
19};
20
21#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub enum ExecutionTargetV1 {
25 #[serde(alias = "SingleRing", alias = "singlering")]
27 SingleRing,
28 #[serde(alias = "LaneGraph", alias = "lanegraph")]
30 LaneGraph,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
35#[non_exhaustive]
36pub enum LaneIntent {
37 Events,
39 Journal,
41 SequencedSlots {
43 capacity: u32,
45 gating: u32,
47 },
48 FanoutBroadcast {
50 consumers: u32,
52 },
53}
54
55#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
57pub enum PlanBoundaryKind {
58 Ingress,
60 Egress,
62}
63
64#[cfg(feature = "alloc")]
66#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
67pub struct PlanStage {
68 pub name: String,
70 pub stage: String,
72}
73
74#[cfg(feature = "alloc")]
76#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
77pub struct PlanBoundary {
78 pub name: String,
80 pub kind: PlanBoundaryKind,
82}
83
84#[cfg(feature = "alloc")]
86#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
87#[non_exhaustive]
88pub enum PlanConnectionSource {
89 StageOut {
91 stage: String,
93 },
94 BoundaryIngress {
96 boundary: String,
98 },
99}
100
101#[cfg(feature = "alloc")]
103#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
104#[non_exhaustive]
105pub enum PlanConnectionDestination {
106 StageIn {
108 stage: String,
110 },
111 BoundaryEgress {
113 boundary: String,
115 },
116}
117
118#[cfg(feature = "alloc")]
120#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
121pub struct PlanConnection {
122 pub from: PlanConnectionSource,
124 pub to: PlanConnectionDestination,
126 pub intent: LaneIntent,
128}
129
130#[cfg(feature = "alloc")]
132#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
133pub struct PlanConstraintSet {
134 pub boundary_lane: LaneIntent,
136 pub shards: u32,
138 pub ordering: OrderingV1,
140 pub producer: SingleRingProducerV1,
142 pub scheduling: SingleRingSchedulingV1,
144 pub shard_key: ShardKeyV1,
146}
147
148#[cfg(feature = "alloc")]
149impl Default for PlanConstraintSet {
150 fn default() -> Self {
151 Self {
152 boundary_lane: LaneIntent::Events,
153 shards: 1,
154 ordering: OrderingV1::Strict,
155 producer: SingleRingProducerV1::Single,
156 scheduling: SingleRingSchedulingV1::Dedicated,
157 shard_key: ShardKeyV1::FirstByte,
158 }
159 }
160}
161
162#[cfg(feature = "alloc")]
164#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
165pub struct SingleRingConstraintSetV2 {
166 pub shards: u32,
168 pub ordering: OrderingV1,
170 pub producer: SingleRingProducerV1,
172 pub scheduling: SingleRingSchedulingV1,
174 pub shard_key: ShardKeyV1,
176}
177
178#[cfg(feature = "alloc")]
179impl Default for SingleRingConstraintSetV2 {
180 fn default() -> Self {
181 Self {
182 shards: 1,
183 ordering: OrderingV1::Strict,
184 producer: SingleRingProducerV1::Single,
185 scheduling: SingleRingSchedulingV1::Dedicated,
186 shard_key: ShardKeyV1::FirstByte,
187 }
188 }
189}
190
191#[cfg(feature = "alloc")]
193#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
194pub struct SingleRingStagePlanV2 {
195 pub stage: String,
197 pub depends_on: Vec<String>,
199}
200
201#[cfg(feature = "alloc")]
203#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
204pub struct SingleRingAuthoringV2 {
205 pub ingress_boundary: String,
207 pub egress_boundary: String,
209 pub stages: Vec<SingleRingStagePlanV2>,
211 pub constraints: SingleRingConstraintSetV2,
213}
214
215#[cfg(feature = "alloc")]
217#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
218pub struct LaneGraphAuthoringV2 {
219 pub boundary_lane: LaneIntent,
221 pub connections: Vec<PlanConnection>,
223 pub on_full: OnFullV1,
225}
226
227#[cfg(feature = "alloc")]
229#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
230pub enum PipelinePlanSourceV2 {
231 SingleRing(SingleRingAuthoringV2),
233 LaneGraph(LaneGraphAuthoringV2),
235}
236
237#[cfg(feature = "alloc")]
243#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
244pub struct PipelinePlanV2 {
245 pub name: String,
247 pub target: ExecutionTargetV1,
249 pub stages: Vec<PlanStage>,
251 pub boundaries: Vec<PlanBoundary>,
253 pub source: PipelinePlanSourceV2,
255}
256
257#[cfg(feature = "alloc")]
259#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
260pub struct PlanValidationDiagnosticV2 {
261 pub message: String,
263}
264
265#[cfg(feature = "alloc")]
266impl PlanValidationDiagnosticV2 {
267 fn new(message: &str) -> Self {
268 Self {
269 message: message.to_string(),
270 }
271 }
272}
273
274#[cfg(feature = "alloc")]
276pub fn validate_plan_v2(plan: &PipelinePlanV2) -> Vec<PlanValidationDiagnosticV2> {
277 let mut diagnostics = Vec::new();
278
279 if plan.name.trim().is_empty() {
280 diagnostics.push(PlanValidationDiagnosticV2::new("plan.name must be non-empty"));
281 }
282
283 if plan.stages.is_empty() {
284 diagnostics.push(PlanValidationDiagnosticV2::new(
285 "plan.stages must contain at least one stage",
286 ));
287 }
288
289 let mut stage_names: Vec<&str> = Vec::with_capacity(plan.stages.len());
290 for stage in &plan.stages {
291 if stage.name.trim().is_empty() {
292 diagnostics.push(PlanValidationDiagnosticV2::new(
293 "plan stage name must be non-empty",
294 ));
295 }
296 if stage.stage.trim().is_empty() {
297 diagnostics.push(PlanValidationDiagnosticV2::new(
298 "plan stage identity must be non-empty",
299 ));
300 }
301 if stage_names.contains(&stage.name.as_str()) {
302 diagnostics.push(PlanValidationDiagnosticV2::new(
303 "plan stage names must be unique",
304 ));
305 } else {
306 stage_names.push(stage.name.as_str());
307 }
308 }
309
310 let mut ingress_names: Vec<&str> = Vec::new();
311 let mut egress_names: Vec<&str> = Vec::new();
312 for boundary in &plan.boundaries {
313 if boundary.name.trim().is_empty() {
314 diagnostics.push(PlanValidationDiagnosticV2::new(
315 "plan boundary name must be non-empty",
316 ));
317 continue;
318 }
319
320 let names = match boundary.kind {
321 PlanBoundaryKind::Ingress => &mut ingress_names,
322 PlanBoundaryKind::Egress => &mut egress_names,
323 };
324 if names.contains(&boundary.name.as_str()) {
325 diagnostics.push(PlanValidationDiagnosticV2::new(
326 "plan boundaries must be unique per kind",
327 ));
328 } else {
329 names.push(boundary.name.as_str());
330 }
331 }
332
333 match (&plan.target, &plan.source) {
334 (ExecutionTargetV1::SingleRing, PipelinePlanSourceV2::SingleRing(single_ring)) => {
335 if single_ring.constraints.shards == 0 {
336 diagnostics.push(PlanValidationDiagnosticV2::new(
337 "single_ring.constraints.shards must be >= 1",
338 ));
339 }
340
341 if single_ring.constraints.shards > 1
342 && single_ring.constraints.ordering == OrderingV1::Strict
343 {
344 diagnostics.push(PlanValidationDiagnosticV2::new(
345 "single_ring strict ordering requires shards == 1",
346 ));
347 }
348
349 if single_ring.constraints.producer == SingleRingProducerV1::Mpmc {
350 diagnostics.push(PlanValidationDiagnosticV2::new(
351 "single_ring producer mpmc is not supported in v1",
352 ));
353 }
354
355 if single_ring.constraints.scheduling == SingleRingSchedulingV1::WorkQueue {
356 diagnostics.push(PlanValidationDiagnosticV2::new(
357 "single_ring scheduling work_queue is not supported in v1",
358 ));
359 }
360
361 if !ingress_names.contains(&single_ring.ingress_boundary.as_str()) {
362 diagnostics.push(PlanValidationDiagnosticV2::new(
363 "single_ring.ingress_boundary must reference an ingress boundary",
364 ));
365 }
366
367 if !egress_names.contains(&single_ring.egress_boundary.as_str()) {
368 diagnostics.push(PlanValidationDiagnosticV2::new(
369 "single_ring.egress_boundary must reference an egress boundary",
370 ));
371 }
372
373 if single_ring.stages.is_empty() {
374 diagnostics.push(PlanValidationDiagnosticV2::new(
375 "single_ring.stages must contain at least one stage",
376 ));
377 }
378
379 if single_ring.stages.len() > SEQUENCED_SLOTS_V1_GATING as usize {
380 diagnostics.push(PlanValidationDiagnosticV2::new(
381 "single_ring stage count exceeds v1 limit",
382 ));
383 }
384
385 let mut authored_stage_ids = Vec::<&str>::with_capacity(single_ring.stages.len());
386 for stage in &single_ring.stages {
387 if !stage_names.contains(&stage.stage.as_str()) {
388 diagnostics.push(PlanValidationDiagnosticV2::new(
389 "single_ring stage entry must reference a known stage",
390 ));
391 }
392 if authored_stage_ids.contains(&stage.stage.as_str()) {
393 diagnostics.push(PlanValidationDiagnosticV2::new(
394 "single_ring stages must be unique",
395 ));
396 } else {
397 authored_stage_ids.push(stage.stage.as_str());
398 }
399
400 let mut seen_dependencies = Vec::<&str>::with_capacity(stage.depends_on.len());
401 for dependency in &stage.depends_on {
402 if dependency == &stage.stage {
403 diagnostics.push(PlanValidationDiagnosticV2::new(
404 "single_ring stage must not depend on itself",
405 ));
406 }
407 if !stage_names.contains(&dependency.as_str()) {
408 diagnostics.push(PlanValidationDiagnosticV2::new(
409 "single_ring dependency must reference a known stage",
410 ));
411 }
412 if seen_dependencies.contains(&dependency.as_str()) {
413 diagnostics.push(PlanValidationDiagnosticV2::new(
414 "single_ring dependencies must be unique per stage",
415 ));
416 } else {
417 seen_dependencies.push(dependency.as_str());
418 }
419 if stage_names.contains(&dependency.as_str())
420 && !authored_stage_ids.contains(&dependency.as_str())
421 {
422 diagnostics.push(PlanValidationDiagnosticV2::new(
423 "single_ring dependencies must reference earlier authored stages",
424 ));
425 }
426 }
427 }
428 }
429 (ExecutionTargetV1::LaneGraph, PipelinePlanSourceV2::LaneGraph(lane_graph)) => {
430 if let LaneIntent::SequencedSlots { gating, .. } = lane_graph.boundary_lane {
431 if gating != SEQUENCED_SLOTS_V1_GATING {
432 diagnostics.push(PlanValidationDiagnosticV2::new(
433 "lane_graph boundary SequencedSlots gating must match SEQUENCED_SLOTS_V1_GATING",
434 ));
435 }
436 }
437
438 if let LaneIntent::SequencedSlots { capacity, .. } = lane_graph.boundary_lane {
439 if capacity == 0 {
440 diagnostics.push(PlanValidationDiagnosticV2::new(
441 "lane_graph boundary SequencedSlots capacity must be > 0",
442 ));
443 }
444 }
445
446 for connection in &lane_graph.connections {
447 match &connection.from {
448 PlanConnectionSource::StageOut { stage } => {
449 if !stage_names.contains(&stage.as_str()) {
450 diagnostics.push(PlanValidationDiagnosticV2::new(
451 "lane_graph connection references unknown source stage",
452 ));
453 }
454 }
455 PlanConnectionSource::BoundaryIngress { boundary } => {
456 if !ingress_names.contains(&boundary.as_str()) {
457 diagnostics.push(PlanValidationDiagnosticV2::new(
458 "lane_graph connection references unknown ingress boundary",
459 ));
460 }
461 }
462 }
463
464 match &connection.to {
465 PlanConnectionDestination::StageIn { stage } => {
466 if !stage_names.contains(&stage.as_str()) {
467 diagnostics.push(PlanValidationDiagnosticV2::new(
468 "lane_graph connection references unknown destination stage",
469 ));
470 }
471 }
472 PlanConnectionDestination::BoundaryEgress { boundary } => {
473 if !egress_names.contains(&boundary.as_str()) {
474 diagnostics.push(PlanValidationDiagnosticV2::new(
475 "lane_graph connection references unknown egress boundary",
476 ));
477 }
478 }
479 }
480
481 if let LaneIntent::SequencedSlots {
482 capacity,
483 gating,
484 } = connection.intent
485 {
486 if capacity == 0 {
487 diagnostics.push(PlanValidationDiagnosticV2::new(
488 "lane_graph SequencedSlots capacity must be > 0",
489 ));
490 }
491 if gating != SEQUENCED_SLOTS_V1_GATING {
492 diagnostics.push(PlanValidationDiagnosticV2::new(
493 "lane_graph SequencedSlots gating must match SEQUENCED_SLOTS_V1_GATING",
494 ));
495 }
496 }
497
498 if let LaneIntent::FanoutBroadcast { consumers } = connection.intent {
499 if consumers == 0 {
500 diagnostics.push(PlanValidationDiagnosticV2::new(
501 "lane_graph fanout consumers must be > 0",
502 ));
503 }
504 if consumers > FANOUT_V1_MAX_CONSUMERS {
505 diagnostics.push(PlanValidationDiagnosticV2::new(
506 "lane_graph fanout consumers exceeds v1 max",
507 ));
508 }
509 }
510 }
511 }
512 (ExecutionTargetV1::SingleRing, PipelinePlanSourceV2::LaneGraph(_)) => diagnostics.push(
513 PlanValidationDiagnosticV2::new(
514 "plan.target single_ring must use a SingleRing source payload",
515 ),
516 ),
517 (ExecutionTargetV1::LaneGraph, PipelinePlanSourceV2::SingleRing(_)) => diagnostics.push(
518 PlanValidationDiagnosticV2::new(
519 "plan.target lane_graph must use a LaneGraph source payload",
520 ),
521 ),
522 }
523
524 diagnostics
525}
526
527#[cfg(all(test, feature = "alloc"))]
528mod tests {
529 use super::*;
530
531 #[test]
532 fn execution_target_accepts_snake_case_and_legacy_pascal_case() {
533 let snake_case: ExecutionTargetV1 = serde_json::from_str("\"lane_graph\"").unwrap();
534 let pascal_case: ExecutionTargetV1 = serde_json::from_str("\"LaneGraph\"").unwrap();
535 let single_ring: ExecutionTargetV1 = serde_json::from_str("\"single_ring\"").unwrap();
536
537 assert_eq!(snake_case, ExecutionTargetV1::LaneGraph);
538 assert_eq!(pascal_case, ExecutionTargetV1::LaneGraph);
539 assert_eq!(single_ring, ExecutionTargetV1::SingleRing);
540 assert_eq!(serde_json::to_string(&ExecutionTargetV1::LaneGraph).unwrap(), "\"lane_graph\"");
541 assert_eq!(serde_json::to_string(&ExecutionTargetV1::SingleRing).unwrap(), "\"single_ring\"");
542 }
543
544 #[test]
545 fn validates_basic_lane_graph_v2_plan() {
546 let plan = PipelinePlanV2 {
547 name: "p1".into(),
548 target: ExecutionTargetV1::LaneGraph,
549 stages: alloc::vec![PlanStage {
550 name: "s0".into(),
551 stage: "identity".into(),
552 }],
553 boundaries: alloc::vec![
554 PlanBoundary {
555 name: "in".into(),
556 kind: PlanBoundaryKind::Ingress,
557 },
558 PlanBoundary {
559 name: "out".into(),
560 kind: PlanBoundaryKind::Egress,
561 },
562 ],
563 source: PipelinePlanSourceV2::LaneGraph(LaneGraphAuthoringV2 {
564 boundary_lane: LaneIntent::Events,
565 connections: alloc::vec![
566 PlanConnection {
567 from: PlanConnectionSource::BoundaryIngress {
568 boundary: "in".into(),
569 },
570 to: PlanConnectionDestination::StageIn { stage: "s0".into() },
571 intent: LaneIntent::Events,
572 },
573 PlanConnection {
574 from: PlanConnectionSource::StageOut { stage: "s0".into() },
575 to: PlanConnectionDestination::BoundaryEgress {
576 boundary: "out".into(),
577 },
578 intent: LaneIntent::Events,
579 },
580 ],
581 on_full: OnFullV1::Block,
582 }),
583 };
584
585 assert!(validate_plan_v2(&plan).is_empty());
586 }
587
588 #[test]
589 fn validates_basic_single_ring_v2_plan() {
590 let plan = PipelinePlanV2 {
591 name: "p2".into(),
592 target: ExecutionTargetV1::SingleRing,
593 stages: alloc::vec![
594 PlanStage {
595 name: "s0".into(),
596 stage: "identity".into(),
597 },
598 PlanStage {
599 name: "s1".into(),
600 stage: "add_u8".into(),
601 },
602 ],
603 boundaries: alloc::vec![
604 PlanBoundary {
605 name: "in".into(),
606 kind: PlanBoundaryKind::Ingress,
607 },
608 PlanBoundary {
609 name: "out".into(),
610 kind: PlanBoundaryKind::Egress,
611 },
612 ],
613 source: PipelinePlanSourceV2::SingleRing(SingleRingAuthoringV2 {
614 ingress_boundary: "in".into(),
615 egress_boundary: "out".into(),
616 stages: alloc::vec![
617 SingleRingStagePlanV2 {
618 stage: "s0".into(),
619 depends_on: alloc::vec![],
620 },
621 SingleRingStagePlanV2 {
622 stage: "s1".into(),
623 depends_on: alloc::vec!["s0".into()],
624 },
625 ],
626 constraints: SingleRingConstraintSetV2::default(),
627 }),
628 };
629
630 assert!(validate_plan_v2(&plan).is_empty());
631 }
632
633 #[test]
634 fn rejects_target_source_mismatch_in_v2_plan() {
635 let plan = PipelinePlanV2 {
636 name: "bad".into(),
637 target: ExecutionTargetV1::SingleRing,
638 stages: alloc::vec![PlanStage {
639 name: "s0".into(),
640 stage: "identity".into(),
641 }],
642 boundaries: alloc::vec![
643 PlanBoundary {
644 name: "in".into(),
645 kind: PlanBoundaryKind::Ingress,
646 },
647 PlanBoundary {
648 name: "out".into(),
649 kind: PlanBoundaryKind::Egress,
650 },
651 ],
652 source: PipelinePlanSourceV2::LaneGraph(LaneGraphAuthoringV2 {
653 boundary_lane: LaneIntent::Events,
654 connections: alloc::vec![],
655 on_full: OnFullV1::Block,
656 }),
657 };
658
659 assert_eq!(
660 validate_plan_v2(&plan),
661 alloc::vec![PlanValidationDiagnosticV2 {
662 message: "plan.target single_ring must use a SingleRing source payload".into(),
663 }]
664 );
665 }
666
667 #[test]
668 fn rejects_non_topological_single_ring_dependencies_in_v2_plan() {
669 let plan = PipelinePlanV2 {
670 name: "bad-dag".into(),
671 target: ExecutionTargetV1::SingleRing,
672 stages: alloc::vec![
673 PlanStage {
674 name: "s0".into(),
675 stage: "identity".into(),
676 },
677 PlanStage {
678 name: "s1".into(),
679 stage: "add_u8".into(),
680 },
681 ],
682 boundaries: alloc::vec![
683 PlanBoundary {
684 name: "in".into(),
685 kind: PlanBoundaryKind::Ingress,
686 },
687 PlanBoundary {
688 name: "out".into(),
689 kind: PlanBoundaryKind::Egress,
690 },
691 ],
692 source: PipelinePlanSourceV2::SingleRing(SingleRingAuthoringV2 {
693 ingress_boundary: "in".into(),
694 egress_boundary: "out".into(),
695 stages: alloc::vec![
696 SingleRingStagePlanV2 {
697 stage: "s0".into(),
698 depends_on: alloc::vec!["s1".into()],
699 },
700 SingleRingStagePlanV2 {
701 stage: "s1".into(),
702 depends_on: alloc::vec![],
703 },
704 ],
705 constraints: SingleRingConstraintSetV2::default(),
706 }),
707 };
708
709 assert_eq!(
710 validate_plan_v2(&plan),
711 alloc::vec![PlanValidationDiagnosticV2 {
712 message: "single_ring dependencies must reference earlier authored stages".into(),
713 }]
714 );
715 }
716
717 #[test]
718 fn validates_full_supported_single_ring_dag_shape_in_v2_plan() {
719 let plan = PipelinePlanV2 {
720 name: "supported-single-ring-dag".into(),
721 target: ExecutionTargetV1::SingleRing,
722 stages: alloc::vec![
723 PlanStage {
724 name: "s0".into(),
725 stage: "identity".into(),
726 },
727 PlanStage {
728 name: "s1".into(),
729 stage: "add_u8".into(),
730 },
731 PlanStage {
732 name: "s2".into(),
733 stage: "identity".into(),
734 },
735 PlanStage {
736 name: "s3".into(),
737 stage: "add_u8".into(),
738 },
739 ],
740 boundaries: alloc::vec![
741 PlanBoundary {
742 name: "in".into(),
743 kind: PlanBoundaryKind::Ingress,
744 },
745 PlanBoundary {
746 name: "out".into(),
747 kind: PlanBoundaryKind::Egress,
748 },
749 ],
750 source: PipelinePlanSourceV2::SingleRing(SingleRingAuthoringV2 {
751 ingress_boundary: "in".into(),
752 egress_boundary: "out".into(),
753 stages: alloc::vec![
754 SingleRingStagePlanV2 {
755 stage: "s0".into(),
756 depends_on: alloc::vec![],
757 },
758 SingleRingStagePlanV2 {
759 stage: "s1".into(),
760 depends_on: alloc::vec!["s0".into()],
761 },
762 SingleRingStagePlanV2 {
763 stage: "s2".into(),
764 depends_on: alloc::vec!["s0".into()],
765 },
766 SingleRingStagePlanV2 {
767 stage: "s3".into(),
768 depends_on: alloc::vec!["s1".into(), "s2".into()],
769 },
770 ],
771 constraints: SingleRingConstraintSetV2::default(),
772 }),
773 };
774
775 assert!(validate_plan_v2(&plan).is_empty());
776 }
777
778 #[test]
779 fn validates_full_lane_graph_source_shape_in_v2_plan() {
780 let plan = PipelinePlanV2 {
781 name: "supported-lane-graph-source".into(),
782 target: ExecutionTargetV1::LaneGraph,
783 stages: alloc::vec![
784 PlanStage {
785 name: "s0".into(),
786 stage: "identity".into(),
787 },
788 PlanStage {
789 name: "s1".into(),
790 stage: "add_u8".into(),
791 },
792 PlanStage {
793 name: "s2".into(),
794 stage: "identity".into(),
795 },
796 ],
797 boundaries: alloc::vec![
798 PlanBoundary {
799 name: "in".into(),
800 kind: PlanBoundaryKind::Ingress,
801 },
802 PlanBoundary {
803 name: "out_primary".into(),
804 kind: PlanBoundaryKind::Egress,
805 },
806 PlanBoundary {
807 name: "out_secondary".into(),
808 kind: PlanBoundaryKind::Egress,
809 },
810 ],
811 source: PipelinePlanSourceV2::LaneGraph(LaneGraphAuthoringV2 {
812 boundary_lane: LaneIntent::Events,
813 connections: alloc::vec![
814 PlanConnection {
815 from: PlanConnectionSource::BoundaryIngress {
816 boundary: "in".into(),
817 },
818 to: PlanConnectionDestination::StageIn { stage: "s0".into() },
819 intent: LaneIntent::Journal,
820 },
821 PlanConnection {
822 from: PlanConnectionSource::StageOut { stage: "s0".into() },
823 to: PlanConnectionDestination::StageIn { stage: "s1".into() },
824 intent: LaneIntent::SequencedSlots {
825 capacity: 8,
826 gating: SEQUENCED_SLOTS_V1_GATING,
827 },
828 },
829 PlanConnection {
830 from: PlanConnectionSource::StageOut { stage: "s0".into() },
831 to: PlanConnectionDestination::StageIn { stage: "s2".into() },
832 intent: LaneIntent::FanoutBroadcast { consumers: 2 },
833 },
834 PlanConnection {
835 from: PlanConnectionSource::StageOut { stage: "s1".into() },
836 to: PlanConnectionDestination::BoundaryEgress {
837 boundary: "out_primary".into(),
838 },
839 intent: LaneIntent::Events,
840 },
841 PlanConnection {
842 from: PlanConnectionSource::StageOut { stage: "s2".into() },
843 to: PlanConnectionDestination::BoundaryEgress {
844 boundary: "out_secondary".into(),
845 },
846 intent: LaneIntent::Events,
847 },
848 ],
849 on_full: OnFullV1::Drop,
850 }),
851 };
852
853 assert!(validate_plan_v2(&plan).is_empty());
854 }
855
856 #[test]
857 fn rejects_documented_single_ring_v1_limit_violations_in_v2_plan() {
858 let plan = PipelinePlanV2 {
859 name: "bad-single-ring-limits".into(),
860 target: ExecutionTargetV1::SingleRing,
861 stages: alloc::vec![
862 PlanStage {
863 name: "s0".into(),
864 stage: "identity".into(),
865 },
866 PlanStage {
867 name: "s1".into(),
868 stage: "add_u8".into(),
869 },
870 PlanStage {
871 name: "s2".into(),
872 stage: "identity".into(),
873 },
874 PlanStage {
875 name: "s3".into(),
876 stage: "add_u8".into(),
877 },
878 PlanStage {
879 name: "s4".into(),
880 stage: "identity".into(),
881 },
882 ],
883 boundaries: alloc::vec![
884 PlanBoundary {
885 name: "in".into(),
886 kind: PlanBoundaryKind::Ingress,
887 },
888 PlanBoundary {
889 name: "out".into(),
890 kind: PlanBoundaryKind::Egress,
891 },
892 ],
893 source: PipelinePlanSourceV2::SingleRing(SingleRingAuthoringV2 {
894 ingress_boundary: "in".into(),
895 egress_boundary: "out".into(),
896 stages: alloc::vec![
897 SingleRingStagePlanV2 {
898 stage: "s0".into(),
899 depends_on: alloc::vec![],
900 },
901 SingleRingStagePlanV2 {
902 stage: "s1".into(),
903 depends_on: alloc::vec!["s0".into()],
904 },
905 SingleRingStagePlanV2 {
906 stage: "s2".into(),
907 depends_on: alloc::vec!["s1".into()],
908 },
909 SingleRingStagePlanV2 {
910 stage: "s3".into(),
911 depends_on: alloc::vec!["s2".into()],
912 },
913 SingleRingStagePlanV2 {
914 stage: "s4".into(),
915 depends_on: alloc::vec!["s3".into()],
916 },
917 ],
918 constraints: SingleRingConstraintSetV2 {
919 shards: 2,
920 ordering: OrderingV1::Strict,
921 producer: SingleRingProducerV1::Mpmc,
922 scheduling: SingleRingSchedulingV1::WorkQueue,
923 shard_key: ShardKeyV1::FirstByte,
924 },
925 }),
926 };
927
928 assert_eq!(
929 validate_plan_v2(&plan)
930 .into_iter()
931 .map(|diagnostic| diagnostic.message)
932 .collect::<alloc::vec::Vec<_>>(),
933 alloc::vec![
934 alloc::string::String::from("single_ring strict ordering requires shards == 1"),
935 alloc::string::String::from("single_ring producer mpmc is not supported in v1"),
936 alloc::string::String::from(
937 "single_ring scheduling work_queue is not supported in v1",
938 ),
939 alloc::string::String::from("single_ring stage count exceeds v1 limit"),
940 ]
941 );
942 }
943
944 #[test]
945 fn rejects_documented_lane_graph_limit_violations_in_v2_plan() {
946 let plan = PipelinePlanV2 {
947 name: "bad-lane-graph-limits".into(),
948 target: ExecutionTargetV1::LaneGraph,
949 stages: alloc::vec![PlanStage {
950 name: "s0".into(),
951 stage: "identity".into(),
952 }],
953 boundaries: alloc::vec![
954 PlanBoundary {
955 name: "in".into(),
956 kind: PlanBoundaryKind::Ingress,
957 },
958 PlanBoundary {
959 name: "out".into(),
960 kind: PlanBoundaryKind::Egress,
961 },
962 ],
963 source: PipelinePlanSourceV2::LaneGraph(LaneGraphAuthoringV2 {
964 boundary_lane: LaneIntent::SequencedSlots {
965 capacity: 0,
966 gating: SEQUENCED_SLOTS_V1_GATING,
967 },
968 connections: alloc::vec![
969 PlanConnection {
970 from: PlanConnectionSource::BoundaryIngress {
971 boundary: "in".into(),
972 },
973 to: PlanConnectionDestination::StageIn { stage: "s0".into() },
974 intent: LaneIntent::SequencedSlots {
975 capacity: 0,
976 gating: 7,
977 },
978 },
979 PlanConnection {
980 from: PlanConnectionSource::StageOut { stage: "s0".into() },
981 to: PlanConnectionDestination::BoundaryEgress {
982 boundary: "out".into(),
983 },
984 intent: LaneIntent::FanoutBroadcast {
985 consumers: FANOUT_V1_MAX_CONSUMERS + 1,
986 },
987 },
988 ],
989 on_full: OnFullV1::Block,
990 }),
991 };
992
993 assert_eq!(
994 validate_plan_v2(&plan)
995 .into_iter()
996 .map(|diagnostic| diagnostic.message)
997 .collect::<alloc::vec::Vec<_>>(),
998 alloc::vec![
999 alloc::string::String::from(
1000 "lane_graph boundary SequencedSlots capacity must be > 0",
1001 ),
1002 alloc::string::String::from(
1003 "lane_graph SequencedSlots capacity must be > 0",
1004 ),
1005 alloc::string::String::from(
1006 "lane_graph SequencedSlots gating must match SEQUENCED_SLOTS_V1_GATING",
1007 ),
1008 alloc::string::String::from("lane_graph fanout consumers exceeds v1 max"),
1009 ]
1010 );
1011 }
1012}