byteor_pipeline_lower/
lane_graph.rs

1#[cfg(feature = "alloc")]
2use alloc::format;
3#[cfg(feature = "alloc")]
4use alloc::string::{String, ToString};
5#[cfg(feature = "alloc")]
6use alloc::vec;
7#[cfg(feature = "alloc")]
8use alloc::vec::Vec;
9
10#[cfg(feature = "alloc")]
11use byteor_pipeline_plan::{
12    ExecutionTargetV1, LaneGraphAuthoringV2, LaneIntent, PlanBoundary, PlanBoundaryKind,
13    PlanConnectionDestination, PlanConnectionSource, PlanStage,
14    PipelinePlanSourceV2, PipelinePlanV2,
15};
16use byteor_pipeline_spec::{
17    BridgeRoleCfgV1, EndpointCfgV1, EndpointKindV1, LaneCfgV1, LaneGraphV1, LaneKindV1,
18    MergePolicyV1, MergeRoleCfgV1, PipelineSpecV1, RoleCfgV1, RouterRoleCfgV1,
19    StageRoleCfgV1,
20};
21
22use crate::common::{
23    collect_stage_names, edge_required_kind, has_egress_boundary, has_ingress_boundary,
24    inlet_key, intent_to_kind, outlet_key, reclassify_lower_error_v2, reject_v2, sanitize_id,
25    validate_for_lowering_v2,
26};
27use crate::{
28    LowerDiagnosticCategoryV2, LowerDiagnosticV2, LowerError, LowerOptionsV1, LowerReportV2,
29    Result,
30};
31
32/// Lower a full-parity authoring document into `PipelineSpecV1::LaneGraph`.
33#[cfg(feature = "alloc")]
34pub fn lower_lane_graph_v2(
35    plan: &PipelinePlanV2,
36    opts: &LowerOptionsV1,
37) -> Result<LowerReportV2> {
38    if plan.target != ExecutionTargetV1::LaneGraph {
39        return Err(reject_v2(
40            ExecutionTargetV1::LaneGraph,
41            LowerDiagnosticCategoryV2::TargetMismatch,
42            "explicit plan target does not match requested lowering target",
43        ));
44    }
45
46    let PipelinePlanSourceV2::LaneGraph(source) = &plan.source else {
47        return Err(reject_v2(
48            ExecutionTargetV1::LaneGraph,
49            LowerDiagnosticCategoryV2::SourcePayloadMismatch,
50            "lane_graph lowering requires a LaneGraph source payload",
51        ));
52    };
53
54    let plan_diagnostics = validate_for_lowering_v2(plan, ExecutionTargetV1::LaneGraph)?;
55    let mut lower_diagnostics = Vec::new();
56    let mut effective_opts = opts.clone();
57    effective_opts.on_full = source.on_full;
58    let spec = lower_lane_graph_inner(
59        &plan.name,
60        &plan.stages,
61        &plan.boundaries,
62        source,
63        &effective_opts,
64        &mut lower_diagnostics,
65    )
66        .map_err(|err| reclassify_lower_error_v2(ExecutionTargetV1::LaneGraph, err))?;
67    byteor_pipeline_spec::validate_v1(&spec)
68        .map_err(|_| {
69            reject_v2(
70                ExecutionTargetV1::LaneGraph,
71                LowerDiagnosticCategoryV2::SpecValidation,
72                "lane_graph lowering produced an invalid v1 spec",
73            )
74        })?;
75
76    Ok(LowerReportV2 {
77        spec,
78        plan_diagnostics,
79        lower_diagnostics,
80    })
81}
82
83#[cfg(feature = "alloc")]
84fn lower_lane_graph_inner(
85    plan_name: &str,
86    stages: &[PlanStage],
87    boundaries: &[PlanBoundary],
88    source: &LaneGraphAuthoringV2,
89    opts: &LowerOptionsV1,
90    diagnostics: &mut Vec<LowerDiagnosticV2>,
91) -> Result<PipelineSpecV1> {
92    if plan_name.trim().is_empty() {
93        return Err(LowerError::Invalid("plan.name must be non-empty"));
94    }
95    if stages.is_empty() {
96        return Err(LowerError::Invalid("plan.stages must be non-empty"));
97    }
98
99    let stage_names = collect_stage_names(stages)?;
100
101    {
102        let mut seen: Vec<String> = Vec::new();
103        for boundary in boundaries {
104            if boundary.name.trim().is_empty() {
105                return Err(LowerError::Invalid("boundary name must be non-empty"));
106            }
107            let key = format!("{:?}:{}", boundary.kind, boundary.name);
108            if seen.contains(&key) {
109                return Err(LowerError::Invalid("boundaries must be unique per kind"));
110            }
111            seen.push(key);
112        }
113    }
114
115    let boundary_kind = intent_to_kind(source.boundary_lane)?;
116
117    let mut incoming_by_inlet: Vec<(String, Vec<usize>)> = Vec::new();
118    for (index, edge) in source.connections.iter().enumerate() {
119        match &edge.from {
120            PlanConnectionSource::StageOut { stage } => {
121                if !stage_names.contains(&stage.as_str()) {
122                    return Err(LowerError::Invalid("connection source stage does not exist"));
123                }
124            }
125            PlanConnectionSource::BoundaryIngress { boundary } => {
126                if !has_ingress_boundary(boundaries, boundary) {
127                    return Err(LowerError::Invalid(
128                        "connection source ingress boundary does not exist",
129                    ));
130                }
131            }
132            _ => {
133                return Err(LowerError::Unsupported(
134                    "connection source variant is not supported in v1 lowering",
135                ));
136            }
137        }
138
139        match &edge.to {
140            PlanConnectionDestination::StageIn { stage } => {
141                if !stage_names.contains(&stage.as_str()) {
142                    return Err(LowerError::Invalid(
143                        "connection destination stage does not exist",
144                    ));
145                }
146            }
147            PlanConnectionDestination::BoundaryEgress { boundary } => {
148                if !has_egress_boundary(boundaries, boundary) {
149                    return Err(LowerError::Invalid(
150                        "connection destination egress boundary does not exist",
151                    ));
152                }
153            }
154            _ => {
155                return Err(LowerError::Unsupported(
156                    "connection destination variant is not supported in v1 lowering",
157                ));
158            }
159        }
160
161        if matches!(edge.intent, LaneIntent::FanoutBroadcast { consumers: 0 }) {
162            return Err(LowerError::Invalid("fanout consumers must be > 0"));
163        }
164
165        let key = inlet_key(&edge.to);
166        match incoming_by_inlet.iter_mut().find(|(current, _)| current == &key) {
167            Some((_, entries)) => entries.push(index),
168            None => incoming_by_inlet.push((key, vec![index])),
169        }
170    }
171
172    let mut stage_in_kind: Vec<(String, LaneKindV1)> = Vec::new();
173    for stage_name in &stage_names {
174        let key = format!("stage:{stage_name}");
175        let edge_indices = incoming_by_inlet
176            .iter()
177            .find(|(current, _)| current == &key)
178            .map(|(_, entries)| entries.as_slice())
179            .unwrap_or(&[]);
180        if edge_indices.is_empty() {
181            return Err(LowerError::Invalid(
182                "stage nodes must have at least one incoming edge",
183            ));
184        }
185        let mut kind: Option<LaneKindV1> = None;
186        for &edge_index in edge_indices {
187            let resolved = edge_required_kind(&source.connections[edge_index], boundary_kind)?;
188            kind = match kind {
189                None => Some(resolved),
190                Some(prev) if prev == resolved => Some(prev),
191                Some(_) => {
192                    return Err(LowerError::Invalid(
193                        "all incoming edges into a stage must resolve to the same lane kind",
194                    ));
195                }
196            };
197        }
198        stage_in_kind.push(((*stage_name).to_string(), kind.unwrap()));
199    }
200
201    let mut egress_in_kind: Vec<(String, LaneKindV1)> = Vec::new();
202    for boundary in boundaries
203        .iter()
204        .filter(|boundary| boundary.kind == PlanBoundaryKind::Egress)
205    {
206        let key = format!("egress:{}", boundary.name);
207        let edge_indices = incoming_by_inlet
208            .iter()
209            .find(|(current, _)| current == &key)
210            .map(|(_, entries)| entries.as_slice())
211            .unwrap_or(&[]);
212        if edge_indices.is_empty() {
213            return Err(LowerError::Invalid(
214                "egress boundaries must have at least one incoming edge",
215            ));
216        }
217        let mut kind: Option<LaneKindV1> = None;
218        for &edge_index in edge_indices {
219            let resolved = edge_required_kind(&source.connections[edge_index], boundary_kind)?;
220            kind = match kind {
221                None => Some(resolved),
222                Some(prev) if prev == resolved => Some(prev),
223                Some(_) => {
224                    return Err(LowerError::Invalid(
225                        "all incoming edges into an egress boundary must resolve to the same lane kind",
226                    ));
227                }
228            };
229        }
230        egress_in_kind.push((boundary.name.clone(), kind.unwrap()));
231    }
232
233    let mut lanes = Vec::new();
234    let mut roles = Vec::new();
235
236    let mut boundary_lane_ingress: Vec<(String, String)> = Vec::new();
237    for boundary in boundaries
238        .iter()
239        .filter(|boundary| boundary.kind == PlanBoundaryKind::Ingress)
240    {
241        let lane_name = format!("lane_boundary_{}_out", sanitize_id(&boundary.name));
242        lanes.push(LaneCfgV1 {
243            name: lane_name.clone(),
244            kind: boundary_kind,
245        });
246        boundary_lane_ingress.push((boundary.name.clone(), lane_name));
247    }
248
249    let mut stage_lane_out: Vec<(String, String)> = Vec::new();
250    for (stage, kind) in &stage_in_kind {
251        let lane_name = format!("lane_node_{}_out", sanitize_id(stage));
252        lanes.push(LaneCfgV1 {
253            name: lane_name.clone(),
254            kind: *kind,
255        });
256        stage_lane_out.push((stage.clone(), lane_name));
257    }
258
259    let mut outgoing_by_outlet: Vec<(String, Vec<usize>)> = Vec::new();
260    for (index, edge) in source.connections.iter().enumerate() {
261        let key = outlet_key(&edge.from);
262        match outgoing_by_outlet.iter_mut().find(|(current, _)| current == &key) {
263            Some((_, entries)) => entries.push(index),
264            None => outgoing_by_outlet.push((key, vec![index])),
265        }
266    }
267
268    let mut outlet_base_lane: Vec<(String, (String, LaneKindV1))> = Vec::new();
269    for (key, _) in &outgoing_by_outlet {
270        if let Some(stage) = key.strip_prefix("stage:") {
271            let lane = stage_lane_out
272                .iter()
273                .find(|(name, _)| name == stage)
274                .map(|(_, lane)| lane.clone())
275                .ok_or(LowerError::Invalid("stage output lane missing"))?;
276            let kind = stage_in_kind
277                .iter()
278                .find(|(name, _)| name == stage)
279                .map(|(_, kind)| *kind)
280                .ok_or(LowerError::Invalid("stage input kind missing"))?;
281            outlet_base_lane.push((key.clone(), (lane, kind)));
282        } else if let Some(boundary) = key.strip_prefix("ingress:") {
283            let lane = boundary_lane_ingress
284                .iter()
285                .find(|(name, _)| name == boundary)
286                .map(|(_, lane)| lane.clone())
287                .ok_or(LowerError::Invalid("boundary output lane missing"))?;
288            outlet_base_lane.push((key.clone(), (lane, boundary_kind)));
289        }
290    }
291
292    let mut delivered_lane_by_edge =
293        vec![(String::new(), LaneKindV1::Events); source.connections.len()];
294
295    for (out_key, edge_indices) in &outgoing_by_outlet {
296        if edge_indices.len() <= 1 {
297            continue;
298        }
299
300        let mut consumers: Option<u32> = None;
301        for &edge_index in edge_indices {
302            match source.connections[edge_index].intent {
303                LaneIntent::FanoutBroadcast { consumers: current } => {
304                    consumers = match consumers {
305                        None => Some(current),
306                        Some(prev) if prev == current => Some(prev),
307                        Some(_) => {
308                            return Err(LowerError::Invalid(
309                                "fanout edges from a single outlet must use the same consumers value",
310                            ));
311                        }
312                    };
313                }
314                _ => {
315                    return Err(LowerError::Invalid(
316                        "out-degree > 1 requires fanout intent on all outgoing edges",
317                    ));
318                }
319            }
320        }
321
322        let consumers = consumers.unwrap_or(0);
323        if consumers as usize != edge_indices.len() {
324            return Err(LowerError::Invalid(
325                "fanout consumers must match the number of outgoing edges",
326            ));
327        }
328
329        let (src_lane, src_kind) = outlet_base_lane
330            .iter()
331            .find(|(key, _)| key == out_key)
332            .map(|(_, value)| value.clone())
333            .ok_or(LowerError::Invalid("fanout outlet base lane missing"))?;
334        if src_kind != LaneKindV1::Events {
335            return Err(LowerError::Invalid(
336                "fanout outlets must produce Events lanes in v1",
337            ));
338        }
339
340        let fanout_lane = format!("lane_fanout_{}", sanitize_id(out_key));
341        lanes.push(LaneCfgV1 {
342            name: fanout_lane.clone(),
343            kind: LaneKindV1::FanoutBroadcast { consumers },
344        });
345
346        let bridge_role = format!("role_bridge_fanout_{}", sanitize_id(out_key));
347            diagnostics.push(LowerDiagnosticV2::InsertedBridge {
348            role: bridge_role.clone(),
349            rx: src_lane.clone(),
350            tx: fanout_lane.clone(),
351            from: src_kind,
352            to: LaneKindV1::FanoutBroadcast { consumers },
353        });
354        roles.push(RoleCfgV1::Bridge(BridgeRoleCfgV1 {
355            name: bridge_role,
356            rx: src_lane,
357            tx: fanout_lane.clone(),
358        }));
359
360        let router_role = format!("role_router_{}", sanitize_id(out_key));
361        let mut tx_lanes = Vec::with_capacity(consumers as usize);
362        for consumer_index in 0..consumers {
363            let lane_name = format!("lane_fanout_{}_c{consumer_index}", sanitize_id(out_key));
364            lanes.push(LaneCfgV1 {
365                name: lane_name.clone(),
366                kind: LaneKindV1::Events,
367            });
368            tx_lanes.push(lane_name);
369        }
370
371        diagnostics.push(LowerDiagnosticV2::InsertedRouter {
372            role: router_role.clone(),
373            rx: fanout_lane.clone(),
374            tx: tx_lanes.clone(),
375            consumers,
376        });
377        roles.push(RoleCfgV1::Router(RouterRoleCfgV1 {
378            name: router_role,
379            rx: fanout_lane,
380            tx: tx_lanes.clone(),
381        }));
382
383        for (index, &edge_index) in edge_indices.iter().enumerate() {
384            delivered_lane_by_edge[edge_index] = (tx_lanes[index].clone(), LaneKindV1::Events);
385        }
386    }
387
388    for (out_key, edge_indices) in &outgoing_by_outlet {
389        if edge_indices.len() > 1 || edge_indices.is_empty() {
390            continue;
391        }
392
393        let edge_index = edge_indices[0];
394        if matches!(source.connections[edge_index].intent, LaneIntent::FanoutBroadcast { .. }) {
395            return Err(LowerError::Invalid(
396                "fanout intent requires out-degree greater than one",
397            ));
398        }
399
400        let dest_kind = edge_required_kind(&source.connections[edge_index], boundary_kind)?;
401        let (src_lane, src_kind) = outlet_base_lane
402            .iter()
403            .find(|(key, _)| key == out_key)
404            .map(|(_, value)| value.clone())
405            .ok_or(LowerError::Invalid("outlet base lane missing"))?;
406        if src_kind == dest_kind {
407            delivered_lane_by_edge[edge_index] = (src_lane, dest_kind);
408        } else {
409            let edge_lane = format!("lane_edge_{edge_index}");
410            lanes.push(LaneCfgV1 {
411                name: edge_lane.clone(),
412                kind: dest_kind,
413            });
414            let bridge_role = format!("role_bridge_{edge_index}");
415            diagnostics.push(LowerDiagnosticV2::InsertedBridge {
416                role: bridge_role.clone(),
417                rx: src_lane.clone(),
418                tx: edge_lane.clone(),
419                from: src_kind,
420                to: dest_kind,
421            });
422            roles.push(RoleCfgV1::Bridge(BridgeRoleCfgV1 {
423                name: bridge_role,
424                rx: src_lane,
425                tx: edge_lane.clone(),
426            }));
427            delivered_lane_by_edge[edge_index] = (edge_lane, dest_kind);
428        }
429    }
430
431    for (lane, _) in &delivered_lane_by_edge {
432        if lane.is_empty() {
433            return Err(LowerError::Invalid(
434                "internal error: edge did not get an assigned delivered lane",
435            ));
436        }
437    }
438
439    let mut delivered_by_inlet: Vec<(String, Vec<(String, LaneKindV1)>)> = Vec::new();
440    for (index, edge) in source.connections.iter().enumerate() {
441        let key = inlet_key(&edge.to);
442        let delivered = delivered_lane_by_edge[index].clone();
443        match delivered_by_inlet.iter_mut().find(|(current, _)| current == &key) {
444            Some((_, entries)) => entries.push(delivered),
445            None => delivered_by_inlet.push((key, vec![delivered])),
446        }
447    }
448
449    let mut stage_lane_in: Vec<(String, String)> = Vec::new();
450    for (stage, kind) in &stage_in_kind {
451        let key = format!("stage:{stage}");
452        let incoming = delivered_by_inlet
453            .iter()
454            .find(|(current, _)| current == &key)
455            .map(|(_, entries)| entries.as_slice())
456            .unwrap_or(&[]);
457        let rx_lane = if incoming.len() == 1 {
458            incoming[0].0.clone()
459        } else {
460            let tx_lane = format!("lane_merge_node_{}", sanitize_id(stage));
461            lanes.push(LaneCfgV1 {
462                name: tx_lane.clone(),
463                kind: *kind,
464            });
465            let rx_lanes: Vec<String> = incoming.iter().map(|(lane, _)| lane.clone()).collect();
466            let role = format!("role_merge_node_{}", sanitize_id(stage));
467            diagnostics.push(LowerDiagnosticV2::InsertedMerge {
468                role: role.clone(),
469                rx: rx_lanes.clone(),
470                tx: tx_lane.clone(),
471                policy: MergePolicyV1::RoundRobin,
472            });
473            roles.push(RoleCfgV1::Merge(MergeRoleCfgV1 {
474                name: role,
475                rx: rx_lanes,
476                tx: tx_lane.clone(),
477                policy: MergePolicyV1::RoundRobin,
478            }));
479            tx_lane
480        };
481        stage_lane_in.push((stage.clone(), rx_lane));
482    }
483
484    let mut boundary_lane_egress: Vec<(String, String)> = Vec::new();
485    for (boundary, kind) in &egress_in_kind {
486        let key = format!("egress:{boundary}");
487        let incoming = delivered_by_inlet
488            .iter()
489            .find(|(current, _)| current == &key)
490            .map(|(_, entries)| entries.as_slice())
491            .unwrap_or(&[]);
492        let lane = if incoming.len() == 1 {
493            incoming[0].0.clone()
494        } else {
495            let tx_lane = format!("lane_merge_boundary_{}", sanitize_id(boundary));
496            lanes.push(LaneCfgV1 {
497                name: tx_lane.clone(),
498                kind: *kind,
499            });
500            let rx_lanes: Vec<String> = incoming.iter().map(|(lane, _)| lane.clone()).collect();
501            let role = format!("role_merge_boundary_{}", sanitize_id(boundary));
502            diagnostics.push(LowerDiagnosticV2::InsertedMerge {
503                role: role.clone(),
504                rx: rx_lanes.clone(),
505                tx: tx_lane.clone(),
506                policy: MergePolicyV1::RoundRobin,
507            });
508            roles.push(RoleCfgV1::Merge(MergeRoleCfgV1 {
509                name: role,
510                rx: rx_lanes,
511                tx: tx_lane.clone(),
512                policy: MergePolicyV1::RoundRobin,
513            }));
514            tx_lane
515        };
516        boundary_lane_egress.push((boundary.clone(), lane));
517    }
518
519    for stage in stages {
520        let rx = stage_lane_in
521            .iter()
522            .find(|(name, _)| name == &stage.name)
523            .map(|(_, lane)| lane.clone())
524            .ok_or(LowerError::Invalid("stage rx lane missing"))?;
525        let tx = stage_lane_out
526            .iter()
527            .find(|(name, _)| name == &stage.name)
528            .map(|(_, lane)| lane.clone())
529            .ok_or(LowerError::Invalid("stage tx lane missing"))?;
530        roles.push(RoleCfgV1::Stage(StageRoleCfgV1 {
531            name: format!("role_stage_{}", sanitize_id(&stage.name)),
532            stage: stage.stage.clone(),
533            rx,
534            tx,
535        }));
536    }
537
538    let mut endpoints = Vec::new();
539    for boundary in boundaries {
540        match boundary.kind {
541            PlanBoundaryKind::Ingress => {
542                let lane = boundary_lane_ingress
543                    .iter()
544                    .find(|(name, _)| name == &boundary.name)
545                    .map(|(_, lane)| lane.clone())
546                    .ok_or(LowerError::Invalid("ingress boundary lane missing"))?;
547                endpoints.push(EndpointCfgV1 {
548                    name: boundary.name.clone(),
549                    kind: EndpointKindV1::Ingress,
550                    lane,
551                });
552            }
553            PlanBoundaryKind::Egress => {
554                let lane = boundary_lane_egress
555                    .iter()
556                    .find(|(name, _)| name == &boundary.name)
557                    .map(|(_, lane)| lane.clone())
558                    .ok_or(LowerError::Invalid("egress boundary lane missing"))?;
559                endpoints.push(EndpointCfgV1 {
560                    name: boundary.name.clone(),
561                    kind: EndpointKindV1::Egress,
562                    lane,
563                });
564            }
565        }
566    }
567
568    Ok(PipelineSpecV1::LaneGraph(LaneGraphV1 {
569        endpoints,
570        lanes,
571        roles,
572        on_full: opts.on_full,
573    }))
574}