byteor_pipeline_lower/
single_ring.rs1#[cfg(feature = "alloc")]
2use alloc::vec::Vec;
3
4#[cfg(feature = "alloc")]
5use byteor_pipeline_plan::{
6 ExecutionTargetV1, PlanBoundaryKind, PipelinePlanSourceV2, PipelinePlanV2,
7};
8use byteor_pipeline_spec::{SingleRingBuilder, StageOpV1};
9
10use crate::common::{reject_v2, validate_for_lowering_v2};
11use crate::LowerDiagnosticCategoryV2;
12use crate::{LowerOptionsV1, LowerReportV2, Result};
13
14#[cfg(feature = "alloc")]
16pub fn lower_single_ring_v2(
17 plan: &PipelinePlanV2,
18 _opts: &LowerOptionsV1,
19) -> Result<LowerReportV2> {
20 if plan.target != ExecutionTargetV1::SingleRing {
21 return Err(reject_v2(
22 ExecutionTargetV1::SingleRing,
23 LowerDiagnosticCategoryV2::TargetMismatch,
24 "explicit plan target does not match requested lowering target",
25 ));
26 }
27
28 let PipelinePlanSourceV2::SingleRing(source) = &plan.source else {
29 return Err(reject_v2(
30 ExecutionTargetV1::SingleRing,
31 LowerDiagnosticCategoryV2::SourcePayloadMismatch,
32 "single_ring lowering requires a SingleRing source payload",
33 ));
34 };
35
36 let plan_diagnostics = validate_for_lowering_v2(plan, ExecutionTargetV1::SingleRing)?;
37
38 let mut builder = SingleRingBuilder::new();
39 builder
40 .shards(source.constraints.shards)
41 .ordering(source.constraints.ordering)
42 .producer(source.constraints.producer)
43 .scheduling(source.constraints.scheduling)
44 .shard_key(source.constraints.shard_key);
45
46 let has_ingress = plan.boundaries.iter().any(|boundary| {
47 boundary.name == source.ingress_boundary && boundary.kind == PlanBoundaryKind::Ingress
48 });
49 if !has_ingress {
50 return Err(reject_v2(
51 ExecutionTargetV1::SingleRing,
52 LowerDiagnosticCategoryV2::BoundaryBinding,
53 "single_ring ingress boundary must reference an ingress boundary",
54 ));
55 }
56
57 let has_egress = plan.boundaries.iter().any(|boundary| {
58 boundary.name == source.egress_boundary && boundary.kind == PlanBoundaryKind::Egress
59 });
60 if !has_egress {
61 return Err(reject_v2(
62 ExecutionTargetV1::SingleRing,
63 LowerDiagnosticCategoryV2::BoundaryBinding,
64 "single_ring egress boundary must reference an egress boundary",
65 ));
66 }
67
68 let mut stage_catalog = Vec::<(&str, &str)>::with_capacity(plan.stages.len());
69 for stage in &plan.stages {
70 stage_catalog.push((stage.name.as_str(), stage.stage.as_str()));
71 }
72
73 let mut authored_indices = Vec::<(&str, u32)>::with_capacity(source.stages.len());
74 for authored_stage in &source.stages {
75 let stage_key = stage_catalog
76 .iter()
77 .find(|(name, _)| *name == authored_stage.stage.as_str())
78 .map(|(_, stage_key)| *stage_key)
79 .ok_or_else(|| {
80 reject_v2(
81 ExecutionTargetV1::SingleRing,
82 LowerDiagnosticCategoryV2::DagShape,
83 "single_ring stage entry must reference a known stage",
84 )
85 })?;
86
87 let mut depends_on = Vec::with_capacity(authored_stage.depends_on.len());
88 for dependency in &authored_stage.depends_on {
89 let dep_idx = authored_indices
90 .iter()
91 .find(|(name, _)| *name == dependency.as_str())
92 .map(|(_, idx)| *idx)
93 .ok_or_else(|| {
94 reject_v2(
95 ExecutionTargetV1::SingleRing,
96 LowerDiagnosticCategoryV2::DagShape,
97 "single_ring stage dependencies must reference earlier authored stages",
98 )
99 })?;
100 depends_on.push(dep_idx);
101 }
102
103 let stage_index = if depends_on.is_empty() {
104 builder
105 .push_source_stage(StageOpV1::ResolverKey {
106 stage: stage_key.into(),
107 })
108 .map_err(|_| {
109 reject_v2(
110 ExecutionTargetV1::SingleRing,
111 LowerDiagnosticCategoryV2::InternalInvariant,
112 "failed to add source stage to single_ring lowering",
113 )
114 })?
115 } else {
116 builder
117 .push_stage(
118 StageOpV1::ResolverKey {
119 stage: stage_key.into(),
120 },
121 &depends_on,
122 )
123 .map_err(|_| {
124 reject_v2(
125 ExecutionTargetV1::SingleRing,
126 LowerDiagnosticCategoryV2::InternalInvariant,
127 "failed to add dependent stage to single_ring lowering",
128 )
129 })?
130 };
131
132 authored_indices.push((authored_stage.stage.as_str(), stage_index));
133 }
134
135 let spec = builder
136 .build()
137 .map_err(|_| {
138 reject_v2(
139 ExecutionTargetV1::SingleRing,
140 LowerDiagnosticCategoryV2::InternalInvariant,
141 "failed to build single_ring spec",
142 )
143 })?;
144 byteor_pipeline_spec::validate_v1(&spec)
145 .map_err(|_| {
146 reject_v2(
147 ExecutionTargetV1::SingleRing,
148 LowerDiagnosticCategoryV2::SpecValidation,
149 "single_ring lowering produced an invalid v1 spec",
150 )
151 })?;
152
153 Ok(LowerReportV2 {
154 spec,
155 plan_diagnostics,
156 lower_diagnostics: Vec::new(),
157 })
158}