byteor_pipeline_plan/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2#![deny(missing_docs)]
3#![deny(unreachable_pub, rust_2018_idioms)]
4#![forbid(unsafe_code)]
5
6//! Model-neutral planning types for ByteOr pipeline authoring.
7
8#[cfg(feature = "alloc")]
9extern crate alloc;
10
11#[cfg(feature = "alloc")]
12use alloc::string::{String, ToString};
13#[cfg(feature = "alloc")]
14use alloc::vec::Vec;
15
16pub use byteor_pipeline_spec::{
17    OnFullV1, OrderingV1, ShardKeyV1, SingleRingProducerV1, SingleRingSchedulingV1,
18    FANOUT_V1_MAX_CONSUMERS, SEQUENCED_SLOTS_V1_GATING,
19};
20
21/// Author-selected lowering target.
22#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub enum ExecutionTargetV1 {
25    /// Lower to `PipelineSpecV1::SingleRing`.
26    #[serde(alias = "SingleRing", alias = "singlering")]
27    SingleRing,
28    /// Lower to `PipelineSpecV1::LaneGraph`.
29    #[serde(alias = "LaneGraph", alias = "lanegraph")]
30    LaneGraph,
31}
32
33/// Model-neutral connection intent.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
35#[non_exhaustive]
36pub enum LaneIntent {
37    /// Events lane.
38    Events,
39    /// Journal lane.
40    Journal,
41    /// Sequenced slots lane.
42    SequencedSlots {
43        /// Slot capacity.
44        capacity: u32,
45        /// Gating factor.
46        gating: u32,
47    },
48    /// Fanout broadcast boundary intent.
49    FanoutBroadcast {
50        /// Number of consumers.
51        consumers: u32,
52    },
53}
54
55/// Authoring boundary direction.
56#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
57pub enum PlanBoundaryKind {
58    /// External ingress into the plan.
59    Ingress,
60    /// External egress from the plan.
61    Egress,
62}
63
64/// Stage definition in a plan document.
65#[cfg(feature = "alloc")]
66#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
67pub struct PlanStage {
68    /// Stable stage node name used for references.
69    pub name: String,
70    /// Stable stage identity key used by lowering/runtime resolution.
71    pub stage: String,
72}
73
74/// Boundary definition in a plan document.
75#[cfg(feature = "alloc")]
76#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
77pub struct PlanBoundary {
78    /// Boundary name.
79    pub name: String,
80    /// Boundary direction.
81    pub kind: PlanBoundaryKind,
82}
83
84/// Source endpoint of a plan connection.
85#[cfg(feature = "alloc")]
86#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
87#[non_exhaustive]
88pub enum PlanConnectionSource {
89    /// Output of a stage.
90    StageOut {
91        /// Stage node name.
92        stage: String,
93    },
94    /// Output of an ingress boundary.
95    BoundaryIngress {
96        /// Boundary name.
97        boundary: String,
98    },
99}
100
101/// Destination endpoint of a plan connection.
102#[cfg(feature = "alloc")]
103#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
104#[non_exhaustive]
105pub enum PlanConnectionDestination {
106    /// Input to a stage.
107    StageIn {
108        /// Stage node name.
109        stage: String,
110    },
111    /// Input to an egress boundary.
112    BoundaryEgress {
113        /// Boundary name.
114        boundary: String,
115    },
116}
117
118/// A model-neutral plan connection.
119#[cfg(feature = "alloc")]
120#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
121pub struct PlanConnection {
122    /// Connection source.
123    pub from: PlanConnectionSource,
124    /// Connection destination.
125    pub to: PlanConnectionDestination,
126    /// Requested connection intent.
127    pub intent: LaneIntent,
128}
129
130/// Lowering constraints attached to a plan.
131#[cfg(feature = "alloc")]
132#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
133pub struct PlanConstraintSet {
134    /// Default boundary lane intent used for ingress and egress boundaries.
135    pub boundary_lane: LaneIntent,
136    /// Desired shard count for SingleRing lowering.
137    pub shards: u32,
138    /// Desired ordering for SingleRing lowering.
139    pub ordering: OrderingV1,
140    /// Desired producer model for SingleRing lowering.
141    pub producer: SingleRingProducerV1,
142    /// Desired scheduling model for SingleRing lowering.
143    pub scheduling: SingleRingSchedulingV1,
144    /// Desired shard-key extractor for SingleRing lowering.
145    pub shard_key: ShardKeyV1,
146}
147
148#[cfg(feature = "alloc")]
149impl Default for PlanConstraintSet {
150    fn default() -> Self {
151        Self {
152            boundary_lane: LaneIntent::Events,
153            shards: 1,
154            ordering: OrderingV1::Strict,
155            producer: SingleRingProducerV1::Single,
156            scheduling: SingleRingSchedulingV1::Dedicated,
157            shard_key: ShardKeyV1::FirstByte,
158        }
159    }
160}
161
162/// SingleRing-specific authoring constraints for the full-parity source contract.
163#[cfg(feature = "alloc")]
164#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
165pub struct SingleRingConstraintSetV2 {
166    /// Desired shard count.
167    pub shards: u32,
168    /// Desired ordering semantics.
169    pub ordering: OrderingV1,
170    /// Desired producer model.
171    pub producer: SingleRingProducerV1,
172    /// Desired scheduling model.
173    pub scheduling: SingleRingSchedulingV1,
174    /// Desired shard-key extractor.
175    pub shard_key: ShardKeyV1,
176}
177
178#[cfg(feature = "alloc")]
179impl Default for SingleRingConstraintSetV2 {
180    fn default() -> Self {
181        Self {
182            shards: 1,
183            ordering: OrderingV1::Strict,
184            producer: SingleRingProducerV1::Single,
185            scheduling: SingleRingSchedulingV1::Dedicated,
186            shard_key: ShardKeyV1::FirstByte,
187        }
188    }
189}
190
191/// One authored SingleRing stage instance in the full-parity source contract.
192#[cfg(feature = "alloc")]
193#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
194pub struct SingleRingStagePlanV2 {
195    /// Stage id referencing a shared `PlanStage` entry.
196    pub stage: String,
197    /// Upstream stage ids that this stage depends on.
198    pub depends_on: Vec<String>,
199}
200
201/// Full-parity SingleRing authoring payload.
202#[cfg(feature = "alloc")]
203#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
204pub struct SingleRingAuthoringV2 {
205    /// Boundary id that feeds the first producer-visible stage set.
206    pub ingress_boundary: String,
207    /// Boundary id that drains the final stage set.
208    pub egress_boundary: String,
209    /// Stages in topological order.
210    pub stages: Vec<SingleRingStagePlanV2>,
211    /// SingleRing execution constraints.
212    pub constraints: SingleRingConstraintSetV2,
213}
214
215/// Full-parity LaneGraph authoring payload.
216#[cfg(feature = "alloc")]
217#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
218pub struct LaneGraphAuthoringV2 {
219    /// Default boundary lane intent used for ingress and egress boundaries.
220    pub boundary_lane: LaneIntent,
221    /// Graph connections between boundaries and stages.
222    pub connections: Vec<PlanConnection>,
223    /// Default backpressure policy for lowered LaneGraph specs.
224    pub on_full: OnFullV1,
225}
226
227/// Target-specific authoring payload for the full-parity source contract.
228#[cfg(feature = "alloc")]
229#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
230pub enum PipelinePlanSourceV2 {
231    /// SingleRing authoring payload.
232    SingleRing(SingleRingAuthoringV2),
233    /// LaneGraph authoring payload.
234    LaneGraph(LaneGraphAuthoringV2),
235}
236
237/// Full-parity pipeline authoring document.
238///
239/// This is the supported replacement for the earlier narrower graph-shaped document.
240/// It keeps shared catalog metadata at the top level while allowing target-specific payloads to
241/// carry the semantics that cannot be represented safely in one fake model-neutral graph.
242#[cfg(feature = "alloc")]
243#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
244pub struct PipelinePlanV2 {
245    /// Plan name.
246    pub name: String,
247    /// Explicit author-selected target.
248    pub target: ExecutionTargetV1,
249    /// Shared stage catalog.
250    pub stages: Vec<PlanStage>,
251    /// Shared boundary catalog.
252    pub boundaries: Vec<PlanBoundary>,
253    /// Target-specific source payload.
254    pub source: PipelinePlanSourceV2,
255}
256
257/// Structured validation diagnostic for the full-parity source contract.
258#[cfg(feature = "alloc")]
259#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
260pub struct PlanValidationDiagnosticV2 {
261    /// Human-readable validation message.
262    pub message: String,
263}
264
265#[cfg(feature = "alloc")]
266impl PlanValidationDiagnosticV2 {
267    fn new(message: &str) -> Self {
268        Self {
269            message: message.to_string(),
270        }
271    }
272}
273
274/// Validate the full-parity authoring document and return all discovered diagnostics.
275#[cfg(feature = "alloc")]
276pub fn validate_plan_v2(plan: &PipelinePlanV2) -> Vec<PlanValidationDiagnosticV2> {
277    let mut diagnostics = Vec::new();
278
279    if plan.name.trim().is_empty() {
280        diagnostics.push(PlanValidationDiagnosticV2::new("plan.name must be non-empty"));
281    }
282
283    if plan.stages.is_empty() {
284        diagnostics.push(PlanValidationDiagnosticV2::new(
285            "plan.stages must contain at least one stage",
286        ));
287    }
288
289    let mut stage_names: Vec<&str> = Vec::with_capacity(plan.stages.len());
290    for stage in &plan.stages {
291        if stage.name.trim().is_empty() {
292            diagnostics.push(PlanValidationDiagnosticV2::new(
293                "plan stage name must be non-empty",
294            ));
295        }
296        if stage.stage.trim().is_empty() {
297            diagnostics.push(PlanValidationDiagnosticV2::new(
298                "plan stage identity must be non-empty",
299            ));
300        }
301        if stage_names.contains(&stage.name.as_str()) {
302            diagnostics.push(PlanValidationDiagnosticV2::new(
303                "plan stage names must be unique",
304            ));
305        } else {
306            stage_names.push(stage.name.as_str());
307        }
308    }
309
310    let mut ingress_names: Vec<&str> = Vec::new();
311    let mut egress_names: Vec<&str> = Vec::new();
312    for boundary in &plan.boundaries {
313        if boundary.name.trim().is_empty() {
314            diagnostics.push(PlanValidationDiagnosticV2::new(
315                "plan boundary name must be non-empty",
316            ));
317            continue;
318        }
319
320        let names = match boundary.kind {
321            PlanBoundaryKind::Ingress => &mut ingress_names,
322            PlanBoundaryKind::Egress => &mut egress_names,
323        };
324        if names.contains(&boundary.name.as_str()) {
325            diagnostics.push(PlanValidationDiagnosticV2::new(
326                "plan boundaries must be unique per kind",
327            ));
328        } else {
329            names.push(boundary.name.as_str());
330        }
331    }
332
333    match (&plan.target, &plan.source) {
334        (ExecutionTargetV1::SingleRing, PipelinePlanSourceV2::SingleRing(single_ring)) => {
335            if single_ring.constraints.shards == 0 {
336                diagnostics.push(PlanValidationDiagnosticV2::new(
337                    "single_ring.constraints.shards must be >= 1",
338                ));
339            }
340
341            if single_ring.constraints.shards > 1
342                && single_ring.constraints.ordering == OrderingV1::Strict
343            {
344                diagnostics.push(PlanValidationDiagnosticV2::new(
345                    "single_ring strict ordering requires shards == 1",
346                ));
347            }
348
349            if single_ring.constraints.producer == SingleRingProducerV1::Mpmc {
350                diagnostics.push(PlanValidationDiagnosticV2::new(
351                    "single_ring producer mpmc is not supported in v1",
352                ));
353            }
354
355            if single_ring.constraints.scheduling == SingleRingSchedulingV1::WorkQueue {
356                diagnostics.push(PlanValidationDiagnosticV2::new(
357                    "single_ring scheduling work_queue is not supported in v1",
358                ));
359            }
360
361            if !ingress_names.contains(&single_ring.ingress_boundary.as_str()) {
362                diagnostics.push(PlanValidationDiagnosticV2::new(
363                    "single_ring.ingress_boundary must reference an ingress boundary",
364                ));
365            }
366
367            if !egress_names.contains(&single_ring.egress_boundary.as_str()) {
368                diagnostics.push(PlanValidationDiagnosticV2::new(
369                    "single_ring.egress_boundary must reference an egress boundary",
370                ));
371            }
372
373            if single_ring.stages.is_empty() {
374                diagnostics.push(PlanValidationDiagnosticV2::new(
375                    "single_ring.stages must contain at least one stage",
376                ));
377            }
378
379            if single_ring.stages.len() > SEQUENCED_SLOTS_V1_GATING as usize {
380                diagnostics.push(PlanValidationDiagnosticV2::new(
381                    "single_ring stage count exceeds v1 limit",
382                ));
383            }
384
385            let mut authored_stage_ids = Vec::<&str>::with_capacity(single_ring.stages.len());
386            for stage in &single_ring.stages {
387                if !stage_names.contains(&stage.stage.as_str()) {
388                    diagnostics.push(PlanValidationDiagnosticV2::new(
389                        "single_ring stage entry must reference a known stage",
390                    ));
391                }
392                if authored_stage_ids.contains(&stage.stage.as_str()) {
393                    diagnostics.push(PlanValidationDiagnosticV2::new(
394                        "single_ring stages must be unique",
395                    ));
396                } else {
397                    authored_stage_ids.push(stage.stage.as_str());
398                }
399
400                let mut seen_dependencies = Vec::<&str>::with_capacity(stage.depends_on.len());
401                for dependency in &stage.depends_on {
402                    if dependency == &stage.stage {
403                        diagnostics.push(PlanValidationDiagnosticV2::new(
404                            "single_ring stage must not depend on itself",
405                        ));
406                    }
407                    if !stage_names.contains(&dependency.as_str()) {
408                        diagnostics.push(PlanValidationDiagnosticV2::new(
409                            "single_ring dependency must reference a known stage",
410                        ));
411                    }
412                    if seen_dependencies.contains(&dependency.as_str()) {
413                        diagnostics.push(PlanValidationDiagnosticV2::new(
414                            "single_ring dependencies must be unique per stage",
415                        ));
416                    } else {
417                        seen_dependencies.push(dependency.as_str());
418                    }
419                    if stage_names.contains(&dependency.as_str())
420                        && !authored_stage_ids.contains(&dependency.as_str())
421                    {
422                        diagnostics.push(PlanValidationDiagnosticV2::new(
423                            "single_ring dependencies must reference earlier authored stages",
424                        ));
425                    }
426                }
427            }
428        }
429        (ExecutionTargetV1::LaneGraph, PipelinePlanSourceV2::LaneGraph(lane_graph)) => {
430            if let LaneIntent::SequencedSlots { gating, .. } = lane_graph.boundary_lane {
431                if gating != SEQUENCED_SLOTS_V1_GATING {
432                    diagnostics.push(PlanValidationDiagnosticV2::new(
433                        "lane_graph boundary SequencedSlots gating must match SEQUENCED_SLOTS_V1_GATING",
434                    ));
435                }
436            }
437
438            if let LaneIntent::SequencedSlots { capacity, .. } = lane_graph.boundary_lane {
439                if capacity == 0 {
440                    diagnostics.push(PlanValidationDiagnosticV2::new(
441                        "lane_graph boundary SequencedSlots capacity must be > 0",
442                    ));
443                }
444            }
445
446            for connection in &lane_graph.connections {
447                match &connection.from {
448                    PlanConnectionSource::StageOut { stage } => {
449                        if !stage_names.contains(&stage.as_str()) {
450                            diagnostics.push(PlanValidationDiagnosticV2::new(
451                                "lane_graph connection references unknown source stage",
452                            ));
453                        }
454                    }
455                    PlanConnectionSource::BoundaryIngress { boundary } => {
456                        if !ingress_names.contains(&boundary.as_str()) {
457                            diagnostics.push(PlanValidationDiagnosticV2::new(
458                                "lane_graph connection references unknown ingress boundary",
459                            ));
460                        }
461                    }
462                }
463
464                match &connection.to {
465                    PlanConnectionDestination::StageIn { stage } => {
466                        if !stage_names.contains(&stage.as_str()) {
467                            diagnostics.push(PlanValidationDiagnosticV2::new(
468                                "lane_graph connection references unknown destination stage",
469                            ));
470                        }
471                    }
472                    PlanConnectionDestination::BoundaryEgress { boundary } => {
473                        if !egress_names.contains(&boundary.as_str()) {
474                            diagnostics.push(PlanValidationDiagnosticV2::new(
475                                "lane_graph connection references unknown egress boundary",
476                            ));
477                        }
478                    }
479                }
480
481                if let LaneIntent::SequencedSlots {
482                    capacity,
483                    gating,
484                } = connection.intent
485                {
486                    if capacity == 0 {
487                        diagnostics.push(PlanValidationDiagnosticV2::new(
488                            "lane_graph SequencedSlots capacity must be > 0",
489                        ));
490                    }
491                    if gating != SEQUENCED_SLOTS_V1_GATING {
492                        diagnostics.push(PlanValidationDiagnosticV2::new(
493                            "lane_graph SequencedSlots gating must match SEQUENCED_SLOTS_V1_GATING",
494                        ));
495                    }
496                }
497
498                if let LaneIntent::FanoutBroadcast { consumers } = connection.intent {
499                    if consumers == 0 {
500                        diagnostics.push(PlanValidationDiagnosticV2::new(
501                            "lane_graph fanout consumers must be > 0",
502                        ));
503                    }
504                    if consumers > FANOUT_V1_MAX_CONSUMERS {
505                        diagnostics.push(PlanValidationDiagnosticV2::new(
506                            "lane_graph fanout consumers exceeds v1 max",
507                        ));
508                    }
509                }
510            }
511        }
512        (ExecutionTargetV1::SingleRing, PipelinePlanSourceV2::LaneGraph(_)) => diagnostics.push(
513            PlanValidationDiagnosticV2::new(
514                "plan.target single_ring must use a SingleRing source payload",
515            ),
516        ),
517        (ExecutionTargetV1::LaneGraph, PipelinePlanSourceV2::SingleRing(_)) => diagnostics.push(
518            PlanValidationDiagnosticV2::new(
519                "plan.target lane_graph must use a LaneGraph source payload",
520            ),
521        ),
522    }
523
524    diagnostics
525}
526
527#[cfg(all(test, feature = "alloc"))]
528mod tests {
529    use super::*;
530
531    #[test]
532    fn execution_target_accepts_snake_case_and_legacy_pascal_case() {
533        let snake_case: ExecutionTargetV1 = serde_json::from_str("\"lane_graph\"").unwrap();
534        let pascal_case: ExecutionTargetV1 = serde_json::from_str("\"LaneGraph\"").unwrap();
535        let single_ring: ExecutionTargetV1 = serde_json::from_str("\"single_ring\"").unwrap();
536
537        assert_eq!(snake_case, ExecutionTargetV1::LaneGraph);
538        assert_eq!(pascal_case, ExecutionTargetV1::LaneGraph);
539        assert_eq!(single_ring, ExecutionTargetV1::SingleRing);
540        assert_eq!(serde_json::to_string(&ExecutionTargetV1::LaneGraph).unwrap(), "\"lane_graph\"");
541        assert_eq!(serde_json::to_string(&ExecutionTargetV1::SingleRing).unwrap(), "\"single_ring\"");
542    }
543
544    #[test]
545    fn validates_basic_lane_graph_v2_plan() {
546        let plan = PipelinePlanV2 {
547            name: "p1".into(),
548            target: ExecutionTargetV1::LaneGraph,
549            stages: alloc::vec![PlanStage {
550                name: "s0".into(),
551                stage: "identity".into(),
552            }],
553            boundaries: alloc::vec![
554                PlanBoundary {
555                    name: "in".into(),
556                    kind: PlanBoundaryKind::Ingress,
557                },
558                PlanBoundary {
559                    name: "out".into(),
560                    kind: PlanBoundaryKind::Egress,
561                },
562            ],
563            source: PipelinePlanSourceV2::LaneGraph(LaneGraphAuthoringV2 {
564                boundary_lane: LaneIntent::Events,
565                connections: alloc::vec![
566                    PlanConnection {
567                        from: PlanConnectionSource::BoundaryIngress {
568                            boundary: "in".into(),
569                        },
570                        to: PlanConnectionDestination::StageIn { stage: "s0".into() },
571                        intent: LaneIntent::Events,
572                    },
573                    PlanConnection {
574                        from: PlanConnectionSource::StageOut { stage: "s0".into() },
575                        to: PlanConnectionDestination::BoundaryEgress {
576                            boundary: "out".into(),
577                        },
578                        intent: LaneIntent::Events,
579                    },
580                ],
581                on_full: OnFullV1::Block,
582            }),
583        };
584
585        assert!(validate_plan_v2(&plan).is_empty());
586    }
587
588    #[test]
589    fn validates_basic_single_ring_v2_plan() {
590        let plan = PipelinePlanV2 {
591            name: "p2".into(),
592            target: ExecutionTargetV1::SingleRing,
593            stages: alloc::vec![
594                PlanStage {
595                    name: "s0".into(),
596                    stage: "identity".into(),
597                },
598                PlanStage {
599                    name: "s1".into(),
600                    stage: "add_u8".into(),
601                },
602            ],
603            boundaries: alloc::vec![
604                PlanBoundary {
605                    name: "in".into(),
606                    kind: PlanBoundaryKind::Ingress,
607                },
608                PlanBoundary {
609                    name: "out".into(),
610                    kind: PlanBoundaryKind::Egress,
611                },
612            ],
613            source: PipelinePlanSourceV2::SingleRing(SingleRingAuthoringV2 {
614                ingress_boundary: "in".into(),
615                egress_boundary: "out".into(),
616                stages: alloc::vec![
617                    SingleRingStagePlanV2 {
618                        stage: "s0".into(),
619                        depends_on: alloc::vec![],
620                    },
621                    SingleRingStagePlanV2 {
622                        stage: "s1".into(),
623                        depends_on: alloc::vec!["s0".into()],
624                    },
625                ],
626                constraints: SingleRingConstraintSetV2::default(),
627            }),
628        };
629
630        assert!(validate_plan_v2(&plan).is_empty());
631    }
632
633    #[test]
634    fn rejects_target_source_mismatch_in_v2_plan() {
635        let plan = PipelinePlanV2 {
636            name: "bad".into(),
637            target: ExecutionTargetV1::SingleRing,
638            stages: alloc::vec![PlanStage {
639                name: "s0".into(),
640                stage: "identity".into(),
641            }],
642            boundaries: alloc::vec![
643                PlanBoundary {
644                    name: "in".into(),
645                    kind: PlanBoundaryKind::Ingress,
646                },
647                PlanBoundary {
648                    name: "out".into(),
649                    kind: PlanBoundaryKind::Egress,
650                },
651            ],
652            source: PipelinePlanSourceV2::LaneGraph(LaneGraphAuthoringV2 {
653                boundary_lane: LaneIntent::Events,
654                connections: alloc::vec![],
655                on_full: OnFullV1::Block,
656            }),
657        };
658
659        assert_eq!(
660            validate_plan_v2(&plan),
661            alloc::vec![PlanValidationDiagnosticV2 {
662                message: "plan.target single_ring must use a SingleRing source payload".into(),
663            }]
664        );
665    }
666
667    #[test]
668    fn rejects_non_topological_single_ring_dependencies_in_v2_plan() {
669        let plan = PipelinePlanV2 {
670            name: "bad-dag".into(),
671            target: ExecutionTargetV1::SingleRing,
672            stages: alloc::vec![
673                PlanStage {
674                    name: "s0".into(),
675                    stage: "identity".into(),
676                },
677                PlanStage {
678                    name: "s1".into(),
679                    stage: "add_u8".into(),
680                },
681            ],
682            boundaries: alloc::vec![
683                PlanBoundary {
684                    name: "in".into(),
685                    kind: PlanBoundaryKind::Ingress,
686                },
687                PlanBoundary {
688                    name: "out".into(),
689                    kind: PlanBoundaryKind::Egress,
690                },
691            ],
692            source: PipelinePlanSourceV2::SingleRing(SingleRingAuthoringV2 {
693                ingress_boundary: "in".into(),
694                egress_boundary: "out".into(),
695                stages: alloc::vec![
696                    SingleRingStagePlanV2 {
697                        stage: "s0".into(),
698                        depends_on: alloc::vec!["s1".into()],
699                    },
700                    SingleRingStagePlanV2 {
701                        stage: "s1".into(),
702                        depends_on: alloc::vec![],
703                    },
704                ],
705                constraints: SingleRingConstraintSetV2::default(),
706            }),
707        };
708
709        assert_eq!(
710            validate_plan_v2(&plan),
711            alloc::vec![PlanValidationDiagnosticV2 {
712                message: "single_ring dependencies must reference earlier authored stages".into(),
713            }]
714        );
715    }
716
717    #[test]
718    fn validates_full_supported_single_ring_dag_shape_in_v2_plan() {
719        let plan = PipelinePlanV2 {
720            name: "supported-single-ring-dag".into(),
721            target: ExecutionTargetV1::SingleRing,
722            stages: alloc::vec![
723                PlanStage {
724                    name: "s0".into(),
725                    stage: "identity".into(),
726                },
727                PlanStage {
728                    name: "s1".into(),
729                    stage: "add_u8".into(),
730                },
731                PlanStage {
732                    name: "s2".into(),
733                    stage: "identity".into(),
734                },
735                PlanStage {
736                    name: "s3".into(),
737                    stage: "add_u8".into(),
738                },
739            ],
740            boundaries: alloc::vec![
741                PlanBoundary {
742                    name: "in".into(),
743                    kind: PlanBoundaryKind::Ingress,
744                },
745                PlanBoundary {
746                    name: "out".into(),
747                    kind: PlanBoundaryKind::Egress,
748                },
749            ],
750            source: PipelinePlanSourceV2::SingleRing(SingleRingAuthoringV2 {
751                ingress_boundary: "in".into(),
752                egress_boundary: "out".into(),
753                stages: alloc::vec![
754                    SingleRingStagePlanV2 {
755                        stage: "s0".into(),
756                        depends_on: alloc::vec![],
757                    },
758                    SingleRingStagePlanV2 {
759                        stage: "s1".into(),
760                        depends_on: alloc::vec!["s0".into()],
761                    },
762                    SingleRingStagePlanV2 {
763                        stage: "s2".into(),
764                        depends_on: alloc::vec!["s0".into()],
765                    },
766                    SingleRingStagePlanV2 {
767                        stage: "s3".into(),
768                        depends_on: alloc::vec!["s1".into(), "s2".into()],
769                    },
770                ],
771                constraints: SingleRingConstraintSetV2::default(),
772            }),
773        };
774
775        assert!(validate_plan_v2(&plan).is_empty());
776    }
777
778    #[test]
779    fn validates_full_lane_graph_source_shape_in_v2_plan() {
780        let plan = PipelinePlanV2 {
781            name: "supported-lane-graph-source".into(),
782            target: ExecutionTargetV1::LaneGraph,
783            stages: alloc::vec![
784                PlanStage {
785                    name: "s0".into(),
786                    stage: "identity".into(),
787                },
788                PlanStage {
789                    name: "s1".into(),
790                    stage: "add_u8".into(),
791                },
792                PlanStage {
793                    name: "s2".into(),
794                    stage: "identity".into(),
795                },
796            ],
797            boundaries: alloc::vec![
798                PlanBoundary {
799                    name: "in".into(),
800                    kind: PlanBoundaryKind::Ingress,
801                },
802                PlanBoundary {
803                    name: "out_primary".into(),
804                    kind: PlanBoundaryKind::Egress,
805                },
806                PlanBoundary {
807                    name: "out_secondary".into(),
808                    kind: PlanBoundaryKind::Egress,
809                },
810            ],
811            source: PipelinePlanSourceV2::LaneGraph(LaneGraphAuthoringV2 {
812                boundary_lane: LaneIntent::Events,
813                connections: alloc::vec![
814                    PlanConnection {
815                        from: PlanConnectionSource::BoundaryIngress {
816                            boundary: "in".into(),
817                        },
818                        to: PlanConnectionDestination::StageIn { stage: "s0".into() },
819                        intent: LaneIntent::Journal,
820                    },
821                    PlanConnection {
822                        from: PlanConnectionSource::StageOut { stage: "s0".into() },
823                        to: PlanConnectionDestination::StageIn { stage: "s1".into() },
824                        intent: LaneIntent::SequencedSlots {
825                            capacity: 8,
826                            gating: SEQUENCED_SLOTS_V1_GATING,
827                        },
828                    },
829                    PlanConnection {
830                        from: PlanConnectionSource::StageOut { stage: "s0".into() },
831                        to: PlanConnectionDestination::StageIn { stage: "s2".into() },
832                        intent: LaneIntent::FanoutBroadcast { consumers: 2 },
833                    },
834                    PlanConnection {
835                        from: PlanConnectionSource::StageOut { stage: "s1".into() },
836                        to: PlanConnectionDestination::BoundaryEgress {
837                            boundary: "out_primary".into(),
838                        },
839                        intent: LaneIntent::Events,
840                    },
841                    PlanConnection {
842                        from: PlanConnectionSource::StageOut { stage: "s2".into() },
843                        to: PlanConnectionDestination::BoundaryEgress {
844                            boundary: "out_secondary".into(),
845                        },
846                        intent: LaneIntent::Events,
847                    },
848                ],
849                on_full: OnFullV1::Drop,
850            }),
851        };
852
853        assert!(validate_plan_v2(&plan).is_empty());
854    }
855
856    #[test]
857    fn rejects_documented_single_ring_v1_limit_violations_in_v2_plan() {
858        let plan = PipelinePlanV2 {
859            name: "bad-single-ring-limits".into(),
860            target: ExecutionTargetV1::SingleRing,
861            stages: alloc::vec![
862                PlanStage {
863                    name: "s0".into(),
864                    stage: "identity".into(),
865                },
866                PlanStage {
867                    name: "s1".into(),
868                    stage: "add_u8".into(),
869                },
870                PlanStage {
871                    name: "s2".into(),
872                    stage: "identity".into(),
873                },
874                PlanStage {
875                    name: "s3".into(),
876                    stage: "add_u8".into(),
877                },
878                PlanStage {
879                    name: "s4".into(),
880                    stage: "identity".into(),
881                },
882            ],
883            boundaries: alloc::vec![
884                PlanBoundary {
885                    name: "in".into(),
886                    kind: PlanBoundaryKind::Ingress,
887                },
888                PlanBoundary {
889                    name: "out".into(),
890                    kind: PlanBoundaryKind::Egress,
891                },
892            ],
893            source: PipelinePlanSourceV2::SingleRing(SingleRingAuthoringV2 {
894                ingress_boundary: "in".into(),
895                egress_boundary: "out".into(),
896                stages: alloc::vec![
897                    SingleRingStagePlanV2 {
898                        stage: "s0".into(),
899                        depends_on: alloc::vec![],
900                    },
901                    SingleRingStagePlanV2 {
902                        stage: "s1".into(),
903                        depends_on: alloc::vec!["s0".into()],
904                    },
905                    SingleRingStagePlanV2 {
906                        stage: "s2".into(),
907                        depends_on: alloc::vec!["s1".into()],
908                    },
909                    SingleRingStagePlanV2 {
910                        stage: "s3".into(),
911                        depends_on: alloc::vec!["s2".into()],
912                    },
913                    SingleRingStagePlanV2 {
914                        stage: "s4".into(),
915                        depends_on: alloc::vec!["s3".into()],
916                    },
917                ],
918                constraints: SingleRingConstraintSetV2 {
919                    shards: 2,
920                    ordering: OrderingV1::Strict,
921                    producer: SingleRingProducerV1::Mpmc,
922                    scheduling: SingleRingSchedulingV1::WorkQueue,
923                    shard_key: ShardKeyV1::FirstByte,
924                },
925            }),
926        };
927
928        assert_eq!(
929            validate_plan_v2(&plan)
930                .into_iter()
931                .map(|diagnostic| diagnostic.message)
932                .collect::<alloc::vec::Vec<_>>(),
933            alloc::vec![
934                alloc::string::String::from("single_ring strict ordering requires shards == 1"),
935                alloc::string::String::from("single_ring producer mpmc is not supported in v1"),
936                alloc::string::String::from(
937                    "single_ring scheduling work_queue is not supported in v1",
938                ),
939                alloc::string::String::from("single_ring stage count exceeds v1 limit"),
940            ]
941        );
942    }
943
944    #[test]
945    fn rejects_documented_lane_graph_limit_violations_in_v2_plan() {
946        let plan = PipelinePlanV2 {
947            name: "bad-lane-graph-limits".into(),
948            target: ExecutionTargetV1::LaneGraph,
949            stages: alloc::vec![PlanStage {
950                name: "s0".into(),
951                stage: "identity".into(),
952            }],
953            boundaries: alloc::vec![
954                PlanBoundary {
955                    name: "in".into(),
956                    kind: PlanBoundaryKind::Ingress,
957                },
958                PlanBoundary {
959                    name: "out".into(),
960                    kind: PlanBoundaryKind::Egress,
961                },
962            ],
963            source: PipelinePlanSourceV2::LaneGraph(LaneGraphAuthoringV2 {
964                boundary_lane: LaneIntent::SequencedSlots {
965                    capacity: 0,
966                    gating: SEQUENCED_SLOTS_V1_GATING,
967                },
968                connections: alloc::vec![
969                    PlanConnection {
970                        from: PlanConnectionSource::BoundaryIngress {
971                            boundary: "in".into(),
972                        },
973                        to: PlanConnectionDestination::StageIn { stage: "s0".into() },
974                        intent: LaneIntent::SequencedSlots {
975                            capacity: 0,
976                            gating: 7,
977                        },
978                    },
979                    PlanConnection {
980                        from: PlanConnectionSource::StageOut { stage: "s0".into() },
981                        to: PlanConnectionDestination::BoundaryEgress {
982                            boundary: "out".into(),
983                        },
984                        intent: LaneIntent::FanoutBroadcast {
985                            consumers: FANOUT_V1_MAX_CONSUMERS + 1,
986                        },
987                    },
988                ],
989                on_full: OnFullV1::Block,
990            }),
991        };
992
993        assert_eq!(
994            validate_plan_v2(&plan)
995                .into_iter()
996                .map(|diagnostic| diagnostic.message)
997                .collect::<alloc::vec::Vec<_>>(),
998            alloc::vec![
999                alloc::string::String::from(
1000                    "lane_graph boundary SequencedSlots capacity must be > 0",
1001                ),
1002                alloc::string::String::from(
1003                    "lane_graph SequencedSlots capacity must be > 0",
1004                ),
1005                alloc::string::String::from(
1006                    "lane_graph SequencedSlots gating must match SEQUENCED_SLOTS_V1_GATING",
1007                ),
1008                alloc::string::String::from("lane_graph fanout consumers exceeds v1 max"),
1009            ]
1010        );
1011    }
1012}