byteor_pipeline_lower/
single_ring.rs

1#[cfg(feature = "alloc")]
2use alloc::vec::Vec;
3
4#[cfg(feature = "alloc")]
5use byteor_pipeline_plan::{
6    ExecutionTargetV1, PlanBoundaryKind, PipelinePlanSourceV2, PipelinePlanV2,
7};
8use byteor_pipeline_spec::{SingleRingBuilder, StageOpV1};
9
10use crate::common::{reject_v2, validate_for_lowering_v2};
11use crate::LowerDiagnosticCategoryV2;
12use crate::{LowerOptionsV1, LowerReportV2, Result};
13
14/// Lower a full-parity authoring document into `PipelineSpecV1::SingleRing`.
15#[cfg(feature = "alloc")]
16pub fn lower_single_ring_v2(
17    plan: &PipelinePlanV2,
18    _opts: &LowerOptionsV1,
19) -> Result<LowerReportV2> {
20    if plan.target != ExecutionTargetV1::SingleRing {
21        return Err(reject_v2(
22            ExecutionTargetV1::SingleRing,
23            LowerDiagnosticCategoryV2::TargetMismatch,
24            "explicit plan target does not match requested lowering target",
25        ));
26    }
27
28    let PipelinePlanSourceV2::SingleRing(source) = &plan.source else {
29        return Err(reject_v2(
30            ExecutionTargetV1::SingleRing,
31            LowerDiagnosticCategoryV2::SourcePayloadMismatch,
32            "single_ring lowering requires a SingleRing source payload",
33        ));
34    };
35
36    let plan_diagnostics = validate_for_lowering_v2(plan, ExecutionTargetV1::SingleRing)?;
37
38    let mut builder = SingleRingBuilder::new();
39    builder
40        .shards(source.constraints.shards)
41        .ordering(source.constraints.ordering)
42        .producer(source.constraints.producer)
43        .scheduling(source.constraints.scheduling)
44        .shard_key(source.constraints.shard_key);
45
46    let has_ingress = plan.boundaries.iter().any(|boundary| {
47        boundary.name == source.ingress_boundary && boundary.kind == PlanBoundaryKind::Ingress
48    });
49    if !has_ingress {
50        return Err(reject_v2(
51            ExecutionTargetV1::SingleRing,
52            LowerDiagnosticCategoryV2::BoundaryBinding,
53            "single_ring ingress boundary must reference an ingress boundary",
54        ));
55    }
56
57    let has_egress = plan.boundaries.iter().any(|boundary| {
58        boundary.name == source.egress_boundary && boundary.kind == PlanBoundaryKind::Egress
59    });
60    if !has_egress {
61        return Err(reject_v2(
62            ExecutionTargetV1::SingleRing,
63            LowerDiagnosticCategoryV2::BoundaryBinding,
64            "single_ring egress boundary must reference an egress boundary",
65        ));
66    }
67
68    let mut stage_catalog = Vec::<(&str, &str)>::with_capacity(plan.stages.len());
69    for stage in &plan.stages {
70        stage_catalog.push((stage.name.as_str(), stage.stage.as_str()));
71    }
72
73    let mut authored_indices = Vec::<(&str, u32)>::with_capacity(source.stages.len());
74    for authored_stage in &source.stages {
75        let stage_key = stage_catalog
76            .iter()
77            .find(|(name, _)| *name == authored_stage.stage.as_str())
78            .map(|(_, stage_key)| *stage_key)
79            .ok_or_else(|| {
80                reject_v2(
81                    ExecutionTargetV1::SingleRing,
82                    LowerDiagnosticCategoryV2::DagShape,
83                    "single_ring stage entry must reference a known stage",
84                )
85            })?;
86
87        let mut depends_on = Vec::with_capacity(authored_stage.depends_on.len());
88        for dependency in &authored_stage.depends_on {
89            let dep_idx = authored_indices
90                .iter()
91                .find(|(name, _)| *name == dependency.as_str())
92                .map(|(_, idx)| *idx)
93                .ok_or_else(|| {
94                    reject_v2(
95                        ExecutionTargetV1::SingleRing,
96                        LowerDiagnosticCategoryV2::DagShape,
97                        "single_ring stage dependencies must reference earlier authored stages",
98                    )
99                })?;
100            depends_on.push(dep_idx);
101        }
102
103        let stage_index = if depends_on.is_empty() {
104            builder
105                .push_source_stage(StageOpV1::ResolverKey {
106                    stage: stage_key.into(),
107                })
108                .map_err(|_| {
109                    reject_v2(
110                        ExecutionTargetV1::SingleRing,
111                        LowerDiagnosticCategoryV2::InternalInvariant,
112                        "failed to add source stage to single_ring lowering",
113                    )
114                })?
115        } else {
116            builder
117                .push_stage(
118                    StageOpV1::ResolverKey {
119                        stage: stage_key.into(),
120                    },
121                    &depends_on,
122                )
123                .map_err(|_| {
124                    reject_v2(
125                        ExecutionTargetV1::SingleRing,
126                        LowerDiagnosticCategoryV2::InternalInvariant,
127                        "failed to add dependent stage to single_ring lowering",
128                    )
129                })?
130        };
131
132        authored_indices.push((authored_stage.stage.as_str(), stage_index));
133    }
134
135    let spec = builder
136        .build()
137        .map_err(|_| {
138            reject_v2(
139                ExecutionTargetV1::SingleRing,
140                LowerDiagnosticCategoryV2::InternalInvariant,
141                "failed to build single_ring spec",
142            )
143        })?;
144    byteor_pipeline_spec::validate_v1(&spec)
145        .map_err(|_| {
146            reject_v2(
147                ExecutionTargetV1::SingleRing,
148                LowerDiagnosticCategoryV2::SpecValidation,
149                "single_ring lowering produced an invalid v1 spec",
150            )
151        })?;
152
153    Ok(LowerReportV2 {
154        spec,
155        plan_diagnostics,
156        lower_diagnostics: Vec::new(),
157    })
158}