1#[cfg(feature = "alloc")]
2use alloc::format;
3#[cfg(feature = "alloc")]
4use alloc::string::{String, ToString};
5#[cfg(feature = "alloc")]
6use alloc::vec;
7#[cfg(feature = "alloc")]
8use alloc::vec::Vec;
9
10#[cfg(feature = "alloc")]
11use byteor_pipeline_plan::{
12 ExecutionTargetV1, LaneGraphAuthoringV2, LaneIntent, PlanBoundary, PlanBoundaryKind,
13 PlanConnectionDestination, PlanConnectionSource, PlanStage,
14 PipelinePlanSourceV2, PipelinePlanV2,
15};
16use byteor_pipeline_spec::{
17 BridgeRoleCfgV1, EndpointCfgV1, EndpointKindV1, LaneCfgV1, LaneGraphV1, LaneKindV1,
18 MergePolicyV1, MergeRoleCfgV1, PipelineSpecV1, RoleCfgV1, RouterRoleCfgV1,
19 StageRoleCfgV1,
20};
21
22use crate::common::{
23 collect_stage_names, edge_required_kind, has_egress_boundary, has_ingress_boundary,
24 inlet_key, intent_to_kind, outlet_key, reclassify_lower_error_v2, reject_v2, sanitize_id,
25 validate_for_lowering_v2,
26};
27use crate::{
28 LowerDiagnosticCategoryV2, LowerDiagnosticV2, LowerError, LowerOptionsV1, LowerReportV2,
29 Result,
30};
31
32#[cfg(feature = "alloc")]
34pub fn lower_lane_graph_v2(
35 plan: &PipelinePlanV2,
36 opts: &LowerOptionsV1,
37) -> Result<LowerReportV2> {
38 if plan.target != ExecutionTargetV1::LaneGraph {
39 return Err(reject_v2(
40 ExecutionTargetV1::LaneGraph,
41 LowerDiagnosticCategoryV2::TargetMismatch,
42 "explicit plan target does not match requested lowering target",
43 ));
44 }
45
46 let PipelinePlanSourceV2::LaneGraph(source) = &plan.source else {
47 return Err(reject_v2(
48 ExecutionTargetV1::LaneGraph,
49 LowerDiagnosticCategoryV2::SourcePayloadMismatch,
50 "lane_graph lowering requires a LaneGraph source payload",
51 ));
52 };
53
54 let plan_diagnostics = validate_for_lowering_v2(plan, ExecutionTargetV1::LaneGraph)?;
55 let mut lower_diagnostics = Vec::new();
56 let mut effective_opts = opts.clone();
57 effective_opts.on_full = source.on_full;
58 let spec = lower_lane_graph_inner(
59 &plan.name,
60 &plan.stages,
61 &plan.boundaries,
62 source,
63 &effective_opts,
64 &mut lower_diagnostics,
65 )
66 .map_err(|err| reclassify_lower_error_v2(ExecutionTargetV1::LaneGraph, err))?;
67 byteor_pipeline_spec::validate_v1(&spec)
68 .map_err(|_| {
69 reject_v2(
70 ExecutionTargetV1::LaneGraph,
71 LowerDiagnosticCategoryV2::SpecValidation,
72 "lane_graph lowering produced an invalid v1 spec",
73 )
74 })?;
75
76 Ok(LowerReportV2 {
77 spec,
78 plan_diagnostics,
79 lower_diagnostics,
80 })
81}
82
83#[cfg(feature = "alloc")]
84fn lower_lane_graph_inner(
85 plan_name: &str,
86 stages: &[PlanStage],
87 boundaries: &[PlanBoundary],
88 source: &LaneGraphAuthoringV2,
89 opts: &LowerOptionsV1,
90 diagnostics: &mut Vec<LowerDiagnosticV2>,
91) -> Result<PipelineSpecV1> {
92 if plan_name.trim().is_empty() {
93 return Err(LowerError::Invalid("plan.name must be non-empty"));
94 }
95 if stages.is_empty() {
96 return Err(LowerError::Invalid("plan.stages must be non-empty"));
97 }
98
99 let stage_names = collect_stage_names(stages)?;
100
101 {
102 let mut seen: Vec<String> = Vec::new();
103 for boundary in boundaries {
104 if boundary.name.trim().is_empty() {
105 return Err(LowerError::Invalid("boundary name must be non-empty"));
106 }
107 let key = format!("{:?}:{}", boundary.kind, boundary.name);
108 if seen.contains(&key) {
109 return Err(LowerError::Invalid("boundaries must be unique per kind"));
110 }
111 seen.push(key);
112 }
113 }
114
115 let boundary_kind = intent_to_kind(source.boundary_lane)?;
116
117 let mut incoming_by_inlet: Vec<(String, Vec<usize>)> = Vec::new();
118 for (index, edge) in source.connections.iter().enumerate() {
119 match &edge.from {
120 PlanConnectionSource::StageOut { stage } => {
121 if !stage_names.contains(&stage.as_str()) {
122 return Err(LowerError::Invalid("connection source stage does not exist"));
123 }
124 }
125 PlanConnectionSource::BoundaryIngress { boundary } => {
126 if !has_ingress_boundary(boundaries, boundary) {
127 return Err(LowerError::Invalid(
128 "connection source ingress boundary does not exist",
129 ));
130 }
131 }
132 _ => {
133 return Err(LowerError::Unsupported(
134 "connection source variant is not supported in v1 lowering",
135 ));
136 }
137 }
138
139 match &edge.to {
140 PlanConnectionDestination::StageIn { stage } => {
141 if !stage_names.contains(&stage.as_str()) {
142 return Err(LowerError::Invalid(
143 "connection destination stage does not exist",
144 ));
145 }
146 }
147 PlanConnectionDestination::BoundaryEgress { boundary } => {
148 if !has_egress_boundary(boundaries, boundary) {
149 return Err(LowerError::Invalid(
150 "connection destination egress boundary does not exist",
151 ));
152 }
153 }
154 _ => {
155 return Err(LowerError::Unsupported(
156 "connection destination variant is not supported in v1 lowering",
157 ));
158 }
159 }
160
161 if matches!(edge.intent, LaneIntent::FanoutBroadcast { consumers: 0 }) {
162 return Err(LowerError::Invalid("fanout consumers must be > 0"));
163 }
164
165 let key = inlet_key(&edge.to);
166 match incoming_by_inlet.iter_mut().find(|(current, _)| current == &key) {
167 Some((_, entries)) => entries.push(index),
168 None => incoming_by_inlet.push((key, vec![index])),
169 }
170 }
171
172 let mut stage_in_kind: Vec<(String, LaneKindV1)> = Vec::new();
173 for stage_name in &stage_names {
174 let key = format!("stage:{stage_name}");
175 let edge_indices = incoming_by_inlet
176 .iter()
177 .find(|(current, _)| current == &key)
178 .map(|(_, entries)| entries.as_slice())
179 .unwrap_or(&[]);
180 if edge_indices.is_empty() {
181 return Err(LowerError::Invalid(
182 "stage nodes must have at least one incoming edge",
183 ));
184 }
185 let mut kind: Option<LaneKindV1> = None;
186 for &edge_index in edge_indices {
187 let resolved = edge_required_kind(&source.connections[edge_index], boundary_kind)?;
188 kind = match kind {
189 None => Some(resolved),
190 Some(prev) if prev == resolved => Some(prev),
191 Some(_) => {
192 return Err(LowerError::Invalid(
193 "all incoming edges into a stage must resolve to the same lane kind",
194 ));
195 }
196 };
197 }
198 stage_in_kind.push(((*stage_name).to_string(), kind.unwrap()));
199 }
200
201 let mut egress_in_kind: Vec<(String, LaneKindV1)> = Vec::new();
202 for boundary in boundaries
203 .iter()
204 .filter(|boundary| boundary.kind == PlanBoundaryKind::Egress)
205 {
206 let key = format!("egress:{}", boundary.name);
207 let edge_indices = incoming_by_inlet
208 .iter()
209 .find(|(current, _)| current == &key)
210 .map(|(_, entries)| entries.as_slice())
211 .unwrap_or(&[]);
212 if edge_indices.is_empty() {
213 return Err(LowerError::Invalid(
214 "egress boundaries must have at least one incoming edge",
215 ));
216 }
217 let mut kind: Option<LaneKindV1> = None;
218 for &edge_index in edge_indices {
219 let resolved = edge_required_kind(&source.connections[edge_index], boundary_kind)?;
220 kind = match kind {
221 None => Some(resolved),
222 Some(prev) if prev == resolved => Some(prev),
223 Some(_) => {
224 return Err(LowerError::Invalid(
225 "all incoming edges into an egress boundary must resolve to the same lane kind",
226 ));
227 }
228 };
229 }
230 egress_in_kind.push((boundary.name.clone(), kind.unwrap()));
231 }
232
233 let mut lanes = Vec::new();
234 let mut roles = Vec::new();
235
236 let mut boundary_lane_ingress: Vec<(String, String)> = Vec::new();
237 for boundary in boundaries
238 .iter()
239 .filter(|boundary| boundary.kind == PlanBoundaryKind::Ingress)
240 {
241 let lane_name = format!("lane_boundary_{}_out", sanitize_id(&boundary.name));
242 lanes.push(LaneCfgV1 {
243 name: lane_name.clone(),
244 kind: boundary_kind,
245 });
246 boundary_lane_ingress.push((boundary.name.clone(), lane_name));
247 }
248
249 let mut stage_lane_out: Vec<(String, String)> = Vec::new();
250 for (stage, kind) in &stage_in_kind {
251 let lane_name = format!("lane_node_{}_out", sanitize_id(stage));
252 lanes.push(LaneCfgV1 {
253 name: lane_name.clone(),
254 kind: *kind,
255 });
256 stage_lane_out.push((stage.clone(), lane_name));
257 }
258
259 let mut outgoing_by_outlet: Vec<(String, Vec<usize>)> = Vec::new();
260 for (index, edge) in source.connections.iter().enumerate() {
261 let key = outlet_key(&edge.from);
262 match outgoing_by_outlet.iter_mut().find(|(current, _)| current == &key) {
263 Some((_, entries)) => entries.push(index),
264 None => outgoing_by_outlet.push((key, vec![index])),
265 }
266 }
267
268 let mut outlet_base_lane: Vec<(String, (String, LaneKindV1))> = Vec::new();
269 for (key, _) in &outgoing_by_outlet {
270 if let Some(stage) = key.strip_prefix("stage:") {
271 let lane = stage_lane_out
272 .iter()
273 .find(|(name, _)| name == stage)
274 .map(|(_, lane)| lane.clone())
275 .ok_or(LowerError::Invalid("stage output lane missing"))?;
276 let kind = stage_in_kind
277 .iter()
278 .find(|(name, _)| name == stage)
279 .map(|(_, kind)| *kind)
280 .ok_or(LowerError::Invalid("stage input kind missing"))?;
281 outlet_base_lane.push((key.clone(), (lane, kind)));
282 } else if let Some(boundary) = key.strip_prefix("ingress:") {
283 let lane = boundary_lane_ingress
284 .iter()
285 .find(|(name, _)| name == boundary)
286 .map(|(_, lane)| lane.clone())
287 .ok_or(LowerError::Invalid("boundary output lane missing"))?;
288 outlet_base_lane.push((key.clone(), (lane, boundary_kind)));
289 }
290 }
291
292 let mut delivered_lane_by_edge =
293 vec![(String::new(), LaneKindV1::Events); source.connections.len()];
294
295 for (out_key, edge_indices) in &outgoing_by_outlet {
296 if edge_indices.len() <= 1 {
297 continue;
298 }
299
300 let mut consumers: Option<u32> = None;
301 for &edge_index in edge_indices {
302 match source.connections[edge_index].intent {
303 LaneIntent::FanoutBroadcast { consumers: current } => {
304 consumers = match consumers {
305 None => Some(current),
306 Some(prev) if prev == current => Some(prev),
307 Some(_) => {
308 return Err(LowerError::Invalid(
309 "fanout edges from a single outlet must use the same consumers value",
310 ));
311 }
312 };
313 }
314 _ => {
315 return Err(LowerError::Invalid(
316 "out-degree > 1 requires fanout intent on all outgoing edges",
317 ));
318 }
319 }
320 }
321
322 let consumers = consumers.unwrap_or(0);
323 if consumers as usize != edge_indices.len() {
324 return Err(LowerError::Invalid(
325 "fanout consumers must match the number of outgoing edges",
326 ));
327 }
328
329 let (src_lane, src_kind) = outlet_base_lane
330 .iter()
331 .find(|(key, _)| key == out_key)
332 .map(|(_, value)| value.clone())
333 .ok_or(LowerError::Invalid("fanout outlet base lane missing"))?;
334 if src_kind != LaneKindV1::Events {
335 return Err(LowerError::Invalid(
336 "fanout outlets must produce Events lanes in v1",
337 ));
338 }
339
340 let fanout_lane = format!("lane_fanout_{}", sanitize_id(out_key));
341 lanes.push(LaneCfgV1 {
342 name: fanout_lane.clone(),
343 kind: LaneKindV1::FanoutBroadcast { consumers },
344 });
345
346 let bridge_role = format!("role_bridge_fanout_{}", sanitize_id(out_key));
347 diagnostics.push(LowerDiagnosticV2::InsertedBridge {
348 role: bridge_role.clone(),
349 rx: src_lane.clone(),
350 tx: fanout_lane.clone(),
351 from: src_kind,
352 to: LaneKindV1::FanoutBroadcast { consumers },
353 });
354 roles.push(RoleCfgV1::Bridge(BridgeRoleCfgV1 {
355 name: bridge_role,
356 rx: src_lane,
357 tx: fanout_lane.clone(),
358 }));
359
360 let router_role = format!("role_router_{}", sanitize_id(out_key));
361 let mut tx_lanes = Vec::with_capacity(consumers as usize);
362 for consumer_index in 0..consumers {
363 let lane_name = format!("lane_fanout_{}_c{consumer_index}", sanitize_id(out_key));
364 lanes.push(LaneCfgV1 {
365 name: lane_name.clone(),
366 kind: LaneKindV1::Events,
367 });
368 tx_lanes.push(lane_name);
369 }
370
371 diagnostics.push(LowerDiagnosticV2::InsertedRouter {
372 role: router_role.clone(),
373 rx: fanout_lane.clone(),
374 tx: tx_lanes.clone(),
375 consumers,
376 });
377 roles.push(RoleCfgV1::Router(RouterRoleCfgV1 {
378 name: router_role,
379 rx: fanout_lane,
380 tx: tx_lanes.clone(),
381 }));
382
383 for (index, &edge_index) in edge_indices.iter().enumerate() {
384 delivered_lane_by_edge[edge_index] = (tx_lanes[index].clone(), LaneKindV1::Events);
385 }
386 }
387
388 for (out_key, edge_indices) in &outgoing_by_outlet {
389 if edge_indices.len() > 1 || edge_indices.is_empty() {
390 continue;
391 }
392
393 let edge_index = edge_indices[0];
394 if matches!(source.connections[edge_index].intent, LaneIntent::FanoutBroadcast { .. }) {
395 return Err(LowerError::Invalid(
396 "fanout intent requires out-degree greater than one",
397 ));
398 }
399
400 let dest_kind = edge_required_kind(&source.connections[edge_index], boundary_kind)?;
401 let (src_lane, src_kind) = outlet_base_lane
402 .iter()
403 .find(|(key, _)| key == out_key)
404 .map(|(_, value)| value.clone())
405 .ok_or(LowerError::Invalid("outlet base lane missing"))?;
406 if src_kind == dest_kind {
407 delivered_lane_by_edge[edge_index] = (src_lane, dest_kind);
408 } else {
409 let edge_lane = format!("lane_edge_{edge_index}");
410 lanes.push(LaneCfgV1 {
411 name: edge_lane.clone(),
412 kind: dest_kind,
413 });
414 let bridge_role = format!("role_bridge_{edge_index}");
415 diagnostics.push(LowerDiagnosticV2::InsertedBridge {
416 role: bridge_role.clone(),
417 rx: src_lane.clone(),
418 tx: edge_lane.clone(),
419 from: src_kind,
420 to: dest_kind,
421 });
422 roles.push(RoleCfgV1::Bridge(BridgeRoleCfgV1 {
423 name: bridge_role,
424 rx: src_lane,
425 tx: edge_lane.clone(),
426 }));
427 delivered_lane_by_edge[edge_index] = (edge_lane, dest_kind);
428 }
429 }
430
431 for (lane, _) in &delivered_lane_by_edge {
432 if lane.is_empty() {
433 return Err(LowerError::Invalid(
434 "internal error: edge did not get an assigned delivered lane",
435 ));
436 }
437 }
438
439 let mut delivered_by_inlet: Vec<(String, Vec<(String, LaneKindV1)>)> = Vec::new();
440 for (index, edge) in source.connections.iter().enumerate() {
441 let key = inlet_key(&edge.to);
442 let delivered = delivered_lane_by_edge[index].clone();
443 match delivered_by_inlet.iter_mut().find(|(current, _)| current == &key) {
444 Some((_, entries)) => entries.push(delivered),
445 None => delivered_by_inlet.push((key, vec![delivered])),
446 }
447 }
448
449 let mut stage_lane_in: Vec<(String, String)> = Vec::new();
450 for (stage, kind) in &stage_in_kind {
451 let key = format!("stage:{stage}");
452 let incoming = delivered_by_inlet
453 .iter()
454 .find(|(current, _)| current == &key)
455 .map(|(_, entries)| entries.as_slice())
456 .unwrap_or(&[]);
457 let rx_lane = if incoming.len() == 1 {
458 incoming[0].0.clone()
459 } else {
460 let tx_lane = format!("lane_merge_node_{}", sanitize_id(stage));
461 lanes.push(LaneCfgV1 {
462 name: tx_lane.clone(),
463 kind: *kind,
464 });
465 let rx_lanes: Vec<String> = incoming.iter().map(|(lane, _)| lane.clone()).collect();
466 let role = format!("role_merge_node_{}", sanitize_id(stage));
467 diagnostics.push(LowerDiagnosticV2::InsertedMerge {
468 role: role.clone(),
469 rx: rx_lanes.clone(),
470 tx: tx_lane.clone(),
471 policy: MergePolicyV1::RoundRobin,
472 });
473 roles.push(RoleCfgV1::Merge(MergeRoleCfgV1 {
474 name: role,
475 rx: rx_lanes,
476 tx: tx_lane.clone(),
477 policy: MergePolicyV1::RoundRobin,
478 }));
479 tx_lane
480 };
481 stage_lane_in.push((stage.clone(), rx_lane));
482 }
483
484 let mut boundary_lane_egress: Vec<(String, String)> = Vec::new();
485 for (boundary, kind) in &egress_in_kind {
486 let key = format!("egress:{boundary}");
487 let incoming = delivered_by_inlet
488 .iter()
489 .find(|(current, _)| current == &key)
490 .map(|(_, entries)| entries.as_slice())
491 .unwrap_or(&[]);
492 let lane = if incoming.len() == 1 {
493 incoming[0].0.clone()
494 } else {
495 let tx_lane = format!("lane_merge_boundary_{}", sanitize_id(boundary));
496 lanes.push(LaneCfgV1 {
497 name: tx_lane.clone(),
498 kind: *kind,
499 });
500 let rx_lanes: Vec<String> = incoming.iter().map(|(lane, _)| lane.clone()).collect();
501 let role = format!("role_merge_boundary_{}", sanitize_id(boundary));
502 diagnostics.push(LowerDiagnosticV2::InsertedMerge {
503 role: role.clone(),
504 rx: rx_lanes.clone(),
505 tx: tx_lane.clone(),
506 policy: MergePolicyV1::RoundRobin,
507 });
508 roles.push(RoleCfgV1::Merge(MergeRoleCfgV1 {
509 name: role,
510 rx: rx_lanes,
511 tx: tx_lane.clone(),
512 policy: MergePolicyV1::RoundRobin,
513 }));
514 tx_lane
515 };
516 boundary_lane_egress.push((boundary.clone(), lane));
517 }
518
519 for stage in stages {
520 let rx = stage_lane_in
521 .iter()
522 .find(|(name, _)| name == &stage.name)
523 .map(|(_, lane)| lane.clone())
524 .ok_or(LowerError::Invalid("stage rx lane missing"))?;
525 let tx = stage_lane_out
526 .iter()
527 .find(|(name, _)| name == &stage.name)
528 .map(|(_, lane)| lane.clone())
529 .ok_or(LowerError::Invalid("stage tx lane missing"))?;
530 roles.push(RoleCfgV1::Stage(StageRoleCfgV1 {
531 name: format!("role_stage_{}", sanitize_id(&stage.name)),
532 stage: stage.stage.clone(),
533 rx,
534 tx,
535 }));
536 }
537
538 let mut endpoints = Vec::new();
539 for boundary in boundaries {
540 match boundary.kind {
541 PlanBoundaryKind::Ingress => {
542 let lane = boundary_lane_ingress
543 .iter()
544 .find(|(name, _)| name == &boundary.name)
545 .map(|(_, lane)| lane.clone())
546 .ok_or(LowerError::Invalid("ingress boundary lane missing"))?;
547 endpoints.push(EndpointCfgV1 {
548 name: boundary.name.clone(),
549 kind: EndpointKindV1::Ingress,
550 lane,
551 });
552 }
553 PlanBoundaryKind::Egress => {
554 let lane = boundary_lane_egress
555 .iter()
556 .find(|(name, _)| name == &boundary.name)
557 .map(|(_, lane)| lane.clone())
558 .ok_or(LowerError::Invalid("egress boundary lane missing"))?;
559 endpoints.push(EndpointCfgV1 {
560 name: boundary.name.clone(),
561 kind: EndpointKindV1::Egress,
562 lane,
563 });
564 }
565 }
566 }
567
568 Ok(PipelineSpecV1::LaneGraph(LaneGraphV1 {
569 endpoints,
570 lanes,
571 roles,
572 on_full: opts.on_full,
573 }))
574}