1use byteor_pipeline_spec::{validate_v1, PipelineSpecV1};
4
5use crate::ExecError;
6
7pub fn validate_sp(spec: &PipelineSpecV1) -> Result<(), ExecError> {
9 validate_v1(spec)
10 .map_err(|e| ExecError::with_context("spec validation failed", e.message().to_string()))
11}
12
13#[cfg(feature = "shm")]
14mod shm {
15 use std::collections::BTreeMap;
16 use std::path::PathBuf;
17 use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
18 use std::sync::Arc;
19 use std::thread::JoinHandle;
20 use std::time::{Duration, Instant};
21
22 use byteor_pipeline_backings_shm::{
23 attach_events_mpsc_rx, attach_events_mpsc_tx, attach_events_rx, attach_events_tx,
24 attach_fanout_router, attach_fanout_rx, attach_fanout_tx, AttachOptions, BackingError,
25 CursorBarrier, ShmEventsMpscRx, ShmEventsMpscTx, ShmEventsRx, ShmEventsTx, ShmFanoutRouter,
26 ShmFanoutRx, ShmFanoutTx, ShmJournalPublisher, ShmJournalSubscriber, ShmSequencedSlots,
27 ShmSequencedSlotsProducer, ShmSequencedSlotsSubscriber,
28 };
29 use byteor_pipeline_kernel::step::{publish_with_policy, TransformOutcome};
30 use byteor_pipeline_kernel::{
31 LaneRx, LaneRxBorrow, LaneTx, LaneTxError, LaneTxWith, LaneTxWithError, LaneTxWithResult,
32 };
33 use byteor_pipeline_spec::{
34 EndpointKindV1, LaneGraphV1, LaneKindV1, MergePolicyV1, OnFullV1, RoleCfgV1,
35 };
36 use indexbus_core::{RouterMode, SpinWait, WaitStrategy};
37
38 use crate::stage::resolve_or_err;
39 use crate::thread_init::{RoleKind, ThreadInitContext, ThreadInitHook};
40 use crate::{ExecError, ResolvedStage, StageResolver};
41
42 pub fn run_sp(
44 spec: &byteor_pipeline_spec::PipelineSpecV1,
45 resolver: &dyn StageResolver,
46 options: SpShmOptions,
47 ) -> Result<LaneGraphSpRuntime, ExecError> {
48 spawn_lane_graph_sp_shm_runtime(spec, resolver, options)
49 }
50
51 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
52 enum EventsAttachKind {
53 Spsc,
54 Mpsc,
55 }
56
57 fn backing_err(msg: &'static str, lane: &str, e: BackingError) -> ExecError {
58 ExecError::with_context_source(msg, lane.to_string(), e)
59 }
60
61 fn sanitize_lane_name(lane: &str) -> String {
62 lane.chars()
63 .map(|c| {
64 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
65 c
66 } else {
67 '_'
68 }
69 })
70 .collect()
71 }
72
73 #[derive(Clone, Debug, Default)]
78 pub struct SpShmOptions {
79 pub blocking: bool,
81 pub dir: Option<PathBuf>,
83 }
84
85 impl SpShmOptions {
86 fn lane_path(&self, lane: &str) -> Option<PathBuf> {
87 let dir = self.dir.as_ref()?;
88 let lane = sanitize_lane_name(lane);
89 Some(dir.join(format!("{lane}.mmap")))
90 }
91
92 pub fn lane_attach_options(&self, lane: &str) -> AttachOptions {
94 AttachOptions {
95 blocking: self.blocking,
96 prefault: false,
97 path: self.lane_path(lane),
98 queue: 0,
99 }
100 }
101
102 fn lane_attach_options_with_queue(&self, lane: &str, queue: usize) -> AttachOptions {
103 AttachOptions {
104 blocking: self.blocking,
105 prefault: false,
106 path: self.lane_path(lane),
107 queue,
108 }
109 }
110
111 fn ensure_dir(&self) -> Result<(), ExecError> {
112 let Some(dir) = &self.dir else {
113 return Ok(());
114 };
115 std::fs::create_dir_all(dir).map_err(|e| {
116 ExecError::with_context_source(
117 "failed to create shm dir",
118 dir.display().to_string(),
119 e,
120 )
121 })
122 }
123 }
124
125 enum AnyEventsRx {
126 Spsc(ShmEventsRx),
127 Mpsc(ShmEventsMpscRx),
128 }
129
130 impl LaneRx for AnyEventsRx {
131 fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
132 match self {
133 AnyEventsRx::Spsc(rx) => rx.recv(buf),
134 AnyEventsRx::Mpsc(rx) => rx.recv(buf),
135 }
136 }
137 }
138
139 impl LaneRxBorrow for AnyEventsRx {
140 fn recv_with<F, R>(
141 &mut self,
142 f: F,
143 ) -> core::result::Result<Option<R>, byteor_pipeline_kernel::LaneRxError>
144 where
145 F: FnOnce(&[u8]) -> R,
146 {
147 match self {
148 AnyEventsRx::Spsc(rx) => rx.recv_with(f),
149 AnyEventsRx::Mpsc(rx) => rx.recv_with(f),
150 }
151 }
152 }
153
154 enum AnyEventsTx {
155 Spsc(ShmEventsTx),
156 Mpsc(ShmEventsMpscTx),
157 }
158
159 struct RuntimeActivity {
160 active_roles: AtomicUsize,
161 progress_epoch: AtomicU64,
162 roles: Vec<Arc<RuntimeRoleActivity>>,
163 }
164
165 #[derive(Debug)]
166 struct RuntimeRoleActivity {
167 name: String,
168 role_kind: RoleKind,
169 active_work: AtomicUsize,
170 progress_epoch: AtomicU64,
171 }
172
173 impl RuntimeRoleActivity {
174 fn new(name: String, role_kind: RoleKind) -> Self {
175 Self {
176 name,
177 role_kind,
178 active_work: AtomicUsize::new(0),
179 progress_epoch: AtomicU64::new(0),
180 }
181 }
182
183 fn snapshot(&self) -> LaneGraphRoleActivitySnapshot {
184 LaneGraphRoleActivitySnapshot {
185 name: self.name.clone(),
186 role_kind: self.role_kind,
187 active_work: self.active_work.load(Ordering::Relaxed),
188 processed_count: self.progress_epoch.load(Ordering::Relaxed),
189 progress_epoch: self.progress_epoch.load(Ordering::Relaxed),
190 }
191 }
192 }
193
194 #[derive(Clone, Debug, PartialEq, Eq)]
196 pub struct LaneGraphRoleActivitySnapshot {
197 pub name: String,
199 pub role_kind: RoleKind,
201 pub active_work: usize,
203 pub processed_count: u64,
205 pub progress_epoch: u64,
207 }
208
209 impl LaneGraphRoleActivitySnapshot {
210 pub fn is_active(&self) -> bool {
212 self.active_work > 0
213 }
214
215 pub fn status(&self) -> &'static str {
217 if self.is_active() {
218 "active"
219 } else {
220 "idle"
221 }
222 }
223 }
224
225 impl RuntimeActivity {
226 fn new(roles: Vec<Arc<RuntimeRoleActivity>>) -> Self {
227 Self {
228 active_roles: AtomicUsize::new(0),
229 progress_epoch: AtomicU64::new(0),
230 roles,
231 }
232 }
233
234 fn start_work<'a>(&'a self, role: &'a RuntimeRoleActivity) -> RuntimeWorkGuard<'a> {
235 self.active_roles.fetch_add(1, Ordering::Relaxed);
236 role.active_work.fetch_add(1, Ordering::Relaxed);
237 RuntimeWorkGuard {
238 activity: self,
239 role,
240 progressed: false,
241 }
242 }
243 }
244
245 struct RuntimeWorkGuard<'a> {
246 activity: &'a RuntimeActivity,
247 role: &'a RuntimeRoleActivity,
248 progressed: bool,
249 }
250
251 impl RuntimeWorkGuard<'_> {
252 fn note_progress(&mut self) {
253 if !self.progressed {
254 self.progressed = true;
255 self.activity.progress_epoch.fetch_add(1, Ordering::Relaxed);
256 self.role.progress_epoch.fetch_add(1, Ordering::Relaxed);
257 }
258 }
259 }
260
261 impl Drop for RuntimeWorkGuard<'_> {
262 fn drop(&mut self) {
263 self.activity.active_roles.fetch_sub(1, Ordering::Relaxed);
264 self.role.active_work.fetch_sub(1, Ordering::Relaxed);
265 }
266 }
267
268 impl LaneTx for AnyEventsTx {
269 fn publish(&mut self, msg: &[u8]) -> core::result::Result<(), LaneTxError> {
270 match self {
271 AnyEventsTx::Spsc(tx) => tx.publish(msg),
272 AnyEventsTx::Mpsc(tx) => tx.publish(msg),
273 }
274 }
275 }
276
277 impl LaneTxWith for AnyEventsTx {
278 fn publish_with<F>(&mut self, f: F) -> core::result::Result<(), LaneTxError>
279 where
280 F: FnOnce(&mut [u8]) -> usize,
281 {
282 match self {
283 AnyEventsTx::Spsc(tx) => tx.publish_with(f),
284 AnyEventsTx::Mpsc(tx) => tx.publish_with(f),
285 }
286 }
287 }
288
289 impl byteor_pipeline_kernel::LaneTxWithResult for AnyEventsTx {
290 fn publish_with_result<F, E>(
291 &mut self,
292 f: F,
293 ) -> core::result::Result<(), byteor_pipeline_kernel::LaneTxWithError<E>>
294 where
295 F: FnOnce(&mut [u8]) -> core::result::Result<usize, E>,
296 {
297 match self {
298 AnyEventsTx::Spsc(tx) => tx.publish_with_result(f),
299 AnyEventsTx::Mpsc(tx) => tx.publish_with_result(f),
300 }
301 }
302 }
303
304 fn events_attach_kind_by_lane(lg: &LaneGraphV1) -> BTreeMap<String, EventsAttachKind> {
305 #[derive(Clone, Copy, Default)]
306 struct LaneStats {
307 producers: u32,
308 }
309
310 let mut stats = BTreeMap::<&str, LaneStats>::new();
311 for lane in &lg.lanes {
312 stats.insert(lane.name.as_str(), LaneStats::default());
313 }
314
315 for ep in &lg.endpoints {
316 let Some(s) = stats.get_mut(ep.lane.as_str()) else {
317 continue;
318 };
319 if matches!(ep.kind, EndpointKindV1::Ingress) {
320 s.producers = s.producers.saturating_add(1);
321 }
322 }
323
324 for role in &lg.roles {
325 match role {
326 RoleCfgV1::Stage(cfg) => {
327 if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
328 s.producers = s.producers.saturating_add(1);
329 }
330 }
331 RoleCfgV1::Bridge(cfg) => {
332 if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
333 s.producers = s.producers.saturating_add(1);
334 }
335 }
336 RoleCfgV1::Router(cfg) => {
337 for tx in &cfg.tx {
338 if let Some(s) = stats.get_mut(tx.as_str()) {
339 s.producers = s.producers.saturating_add(1);
340 }
341 }
342 }
343 RoleCfgV1::Merge(cfg) => {
344 if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
345 s.producers = s.producers.saturating_add(1);
346 }
347 }
348 }
349 }
350
351 let mut out = BTreeMap::<String, EventsAttachKind>::new();
352 for lane in &lg.lanes {
353 if !matches!(lane.kind, LaneKindV1::Events) {
354 continue;
355 }
356 let producers = stats
357 .get(lane.name.as_str())
358 .copied()
359 .unwrap_or_default()
360 .producers;
361
362 let kind = if producers > 1 {
365 EventsAttachKind::Mpsc
366 } else {
367 EventsAttachKind::Spsc
368 };
369 out.insert(lane.name.clone(), kind);
370 }
371 out
372 }
373
374 fn attach_events_rx_any(
375 opts: &SpShmOptions,
376 lane: &str,
377 kind: EventsAttachKind,
378 ) -> Result<AnyEventsRx, ExecError> {
379 let o = opts.lane_attach_options(lane);
380 match kind {
381 EventsAttachKind::Spsc => attach_events_rx(&o, lane)
382 .map(AnyEventsRx::Spsc)
383 .map_err(|e| backing_err("events rx attach failed", lane, e)),
384 EventsAttachKind::Mpsc => attach_events_mpsc_rx(&o, lane)
385 .map(AnyEventsRx::Mpsc)
386 .map_err(|e| backing_err("events mpsc rx attach failed", lane, e)),
387 }
388 }
389
390 fn attach_events_tx_any(
391 opts: &SpShmOptions,
392 lane: &str,
393 kind: EventsAttachKind,
394 ) -> Result<AnyEventsTx, ExecError> {
395 let o = opts.lane_attach_options(lane);
396 match kind {
397 EventsAttachKind::Spsc => attach_events_tx(&o, lane)
398 .map(AnyEventsTx::Spsc)
399 .map_err(|e| backing_err("events tx attach failed", lane, e)),
400 EventsAttachKind::Mpsc => attach_events_mpsc_tx(&o, lane)
401 .map(AnyEventsTx::Mpsc)
402 .map_err(|e| backing_err("events mpsc tx attach failed", lane, e)),
403 }
404 }
405
406 fn attach_fanout_tx_one(opts: &SpShmOptions, lane: &str) -> Result<ShmFanoutTx, ExecError> {
407 let o = opts.lane_attach_options(lane);
408 attach_fanout_tx(&o, lane).map_err(|e| backing_err("fanout tx attach failed", lane, e))
409 }
410
411 fn attach_fanout_router_one(
412 opts: &SpShmOptions,
413 lane: &str,
414 ) -> Result<ShmFanoutRouter, ExecError> {
415 let o = opts.lane_attach_options(lane);
416 attach_fanout_router(&o, lane)
417 .map_err(|e| backing_err("fanout router attach failed", lane, e))
418 }
419
420 fn attach_fanout_rx_one(
421 opts: &SpShmOptions,
422 lane: &str,
423 consumer_idx: usize,
424 ) -> Result<ShmFanoutRx, ExecError> {
425 let o = opts.lane_attach_options_with_queue(lane, consumer_idx);
426 attach_fanout_rx(&o, lane).map_err(|e| backing_err("fanout rx attach failed", lane, e))
427 }
428
429 fn attach_journal_rx_one(
430 opts: &SpShmOptions,
431 lane: &str,
432 ) -> Result<ShmJournalSubscriber, ExecError> {
433 let o = opts.lane_attach_options(lane);
434 byteor_pipeline_backings_shm::attach_journal_subscriber_tail(&o, lane)
435 .map_err(|e| backing_err("journal rx attach failed", lane, e))
436 }
437
438 fn attach_journal_tx_one(
439 opts: &SpShmOptions,
440 lane: &str,
441 ) -> Result<ShmJournalPublisher, ExecError> {
442 let o = opts.lane_attach_options(lane);
443 byteor_pipeline_backings_shm::attach_journal_publisher(
444 &o,
445 lane,
446 indexbus_log::JournalPublisherConfig::default(),
447 )
448 .map_err(|e| backing_err("journal tx attach failed", lane, e))
449 }
450
451 struct SequencedSlotsRxOne {
452 barrier: CursorBarrier,
453 sub: ShmSequencedSlotsSubscriber,
454 }
455
456 fn attach_sequenced_slots_rx_one(
457 opts: &SpShmOptions,
458 lane: &str,
459 ) -> Result<SequencedSlotsRxOne, ExecError> {
460 let o = opts.lane_attach_options(lane);
461 let shm = ShmSequencedSlots::attach(&o, lane)
462 .map_err(|e| backing_err("sequenced-slots rx attach failed", lane, e))?;
463 let barrier = CursorBarrier::new(&shm);
464 let sub = ShmSequencedSlotsSubscriber::attach(&o, lane, 0)
465 .map_err(|e| backing_err("sequenced-slots rx attach failed", lane, e))?;
466 Ok(SequencedSlotsRxOne { barrier, sub })
467 }
468
469 fn attach_sequenced_slots_tx_one(
470 opts: &SpShmOptions,
471 lane: &str,
472 ) -> Result<ShmSequencedSlotsProducer, ExecError> {
473 let o = opts.lane_attach_options(lane);
474 ShmSequencedSlotsProducer::attach(&o, lane, 1, 1)
475 .map_err(|e| backing_err("sequenced-slots tx attach failed", lane, e))
476 }
477
478 fn publish_with_policy_with<Tx: LaneTxWith>(
479 tx: &mut Tx,
480 on_full: OnFullV1,
481 wait: &mut impl WaitStrategy,
482 mut fill: impl FnMut(&mut [u8]) -> usize,
483 ) -> core::result::Result<(), LaneTxError> {
484 loop {
485 match tx.publish_with(|slot| fill(slot).min(slot.len())) {
486 Ok(()) => {
487 wait.reset();
488 return Ok(());
489 }
490 Err(LaneTxError::Full) => match on_full {
491 OnFullV1::Drop => return Ok(()),
492 OnFullV1::Block => {
493 wait.wait();
494 continue;
495 }
496 },
497 Err(error) => return Err(error),
498 }
499 }
500 }
501
502 fn publish_with_policy_with_result<Tx: LaneTxWithResult, E>(
503 tx: &mut Tx,
504 on_full: OnFullV1,
505 wait: &mut impl WaitStrategy,
506 mut fill: impl FnMut(&mut [u8]) -> core::result::Result<usize, E>,
507 ) -> core::result::Result<(), LaneTxWithError<E>> {
508 loop {
509 match tx.publish_with_result(|slot| fill(slot).map(|n| n.min(slot.len()))) {
510 Ok(()) => {
511 wait.reset();
512 return Ok(());
513 }
514 Err(LaneTxWithError::Full) => match on_full {
515 OnFullV1::Drop => return Ok(()),
516 OnFullV1::Block => {
517 wait.wait();
518 continue;
519 }
520 },
521 Err(error @ LaneTxWithError::Encode(_)) => return Err(error),
522 Err(error @ LaneTxWithError::Failed) => return Err(error),
523 }
524 }
525 }
526
527 fn run_stage_role_loop(
528 stop: &AtomicBool,
529 activity: &RuntimeActivity,
530 role_activity: &RuntimeRoleActivity,
531 rx: &mut AnyEventsRx,
532 tx: &mut AnyEventsTx,
533 mut stage: ResolvedStage,
534 on_full: OnFullV1,
535 ) -> Result<(), ExecError> {
536 let mut idle_wait = SpinWait::default();
537 let mut backpressure_wait = SpinWait::default();
538 let mut scratch = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
539
540 while !stop.load(Ordering::Relaxed) {
541 let did = match &mut stage {
542 ResolvedStage::MapOk(map_ok) => {
543 let res = rx
544 .recv_with(|input| {
545 let mut work = activity.start_work(role_activity);
546 publish_with_policy_with(tx, on_full, &mut backpressure_wait, |slot| {
547 map_ok(input, slot)
548 })
549 .map_err(|_| ExecError::new("stage publish failed"))?;
550 work.note_progress();
551 Ok::<(), ExecError>(())
552 })
553 .map_err(|_| ExecError::new("stage recv failed"))?;
554 match res {
555 None => false,
556 Some(r) => {
557 r?;
558 true
559 }
560 }
561 }
562 ResolvedStage::Map(map) => {
563 let res = rx
564 .recv_with(|input| {
565 let mut work = activity.start_work(role_activity);
566 publish_with_policy_with_result(
567 tx,
568 on_full,
569 &mut backpressure_wait,
570 |slot| map(input, slot),
571 )
572 .map_err(|error| match error {
573 byteor_pipeline_kernel::LaneTxWithError::Encode(error) => {
574 ExecError::with_context("stage error", error.to_string())
575 }
576 _ => ExecError::new("stage publish failed"),
577 })?;
578 work.note_progress();
579 Ok::<(), ExecError>(())
580 })
581 .map_err(|_| ExecError::new("stage recv failed"))?;
582 match res {
583 None => false,
584 Some(r) => {
585 r?;
586 true
587 }
588 }
589 }
590 ResolvedStage::Filter(filter) => {
591 let res = rx
592 .recv_with(|input| {
593 let mut work = activity.start_work(role_activity);
594 let keep = filter(input).map_err(|e| {
595 ExecError::with_context("stage error", e.to_string())
596 })?;
597 if keep {
598 publish_with_policy(tx, input, on_full, &mut backpressure_wait)
599 .map_err(|_| ExecError::new("stage publish failed"))?;
600 }
601 work.note_progress();
602 Ok::<(), ExecError>(())
603 })
604 .map_err(|_| ExecError::new("stage recv failed"))?;
605 match res {
606 None => false,
607 Some(r) => {
608 r?;
609 true
610 }
611 }
612 }
613 ResolvedStage::Inspect(inspect) => {
614 let res = rx
615 .recv_with(|input| {
616 let mut work = activity.start_work(role_activity);
617 inspect(input).map_err(|e| {
618 ExecError::with_context("stage error", e.to_string())
619 })?;
620 publish_with_policy(tx, input, on_full, &mut backpressure_wait)
621 .map_err(|_| ExecError::new("stage publish failed"))?;
622 work.note_progress();
623 Ok::<(), ExecError>(())
624 })
625 .map_err(|_| ExecError::new("stage recv failed"))?;
626 match res {
627 None => false,
628 Some(r) => {
629 r?;
630 true
631 }
632 }
633 }
634 ResolvedStage::StatefulTransform(stateful) => {
635 let res = rx
636 .recv_with(|input| {
637 let mut work = activity.start_work(role_activity);
638 let outcome = stateful.call(input, &mut scratch).map_err(|e| {
639 ExecError::with_context("stage error", e.to_string())
640 })?;
641
642 match outcome {
643 TransformOutcome::Drop => {}
644 TransformOutcome::ForwardInput => {
645 publish_with_policy(tx, input, on_full, &mut backpressure_wait)
646 .map_err(|_| ExecError::new("stage publish failed"))?;
647 }
648 TransformOutcome::ForwardOutput(out_n) => {
649 let out_n = out_n.min(scratch.len());
650 publish_with_policy(
651 tx,
652 &scratch[..out_n],
653 on_full,
654 &mut backpressure_wait,
655 )
656 .map_err(|_| ExecError::new("stage publish failed"))?;
657 }
658 _ => {
659 return Err(ExecError::new(
660 "unsupported transform outcome (non-exhaustive)",
661 ));
662 }
663 }
664
665 work.note_progress();
666 Ok::<(), ExecError>(())
667 })
668 .map_err(|_| ExecError::new("stage recv failed"))?;
669 match res {
670 None => false,
671 Some(r) => {
672 r?;
673 true
674 }
675 }
676 }
677 ResolvedStage::Stateful(stateful) => {
678 let res = rx
679 .recv_with(|input| {
680 let mut work = activity.start_work(role_activity);
681 publish_with_policy_with_result(
682 tx,
683 on_full,
684 &mut backpressure_wait,
685 |slot| stateful.call(input, slot),
686 )
687 .map_err(|error| match error {
688 byteor_pipeline_kernel::LaneTxWithError::Encode(error) => {
689 ExecError::with_context("stage error", error.to_string())
690 }
691 _ => ExecError::new("stage publish failed"),
692 })?;
693 work.note_progress();
694 Ok::<(), ExecError>(())
695 })
696 .map_err(|_| ExecError::new("stage recv failed"))?;
697 match res {
698 None => false,
699 Some(r) => {
700 r?;
701 true
702 }
703 }
704 }
705 };
706
707 if did {
708 idle_wait.reset();
709 } else {
710 idle_wait.wait();
711 }
712 }
713
714 Ok(())
715 }
716
717 fn run_stage_role_loop_copy<Rx, Tx>(
718 stop: &AtomicBool,
719 activity: &RuntimeActivity,
720 role_activity: &RuntimeRoleActivity,
721 rx: &mut Rx,
722 tx: &mut Tx,
723 mut stage: ResolvedStage,
724 on_full: OnFullV1,
725 ) -> Result<(), ExecError>
726 where
727 Rx: LaneRx,
728 Tx: LaneTx,
729 {
730 let mut idle_wait = SpinWait::default();
731 let mut backpressure_wait = SpinWait::default();
732 let mut input = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
733 let mut scratch = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
734
735 while !stop.load(Ordering::Relaxed) {
736 let Some(n) = rx.recv(&mut input) else {
737 idle_wait.wait();
738 continue;
739 };
740
741 let mut work = activity.start_work(role_activity);
742 let input = &input[..n];
743 match &mut stage {
744 ResolvedStage::MapOk(map_ok) => {
745 let out_n = map_ok(input, &mut scratch).min(scratch.len());
746 publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
747 .map_err(|_| ExecError::new("stage publish failed"))?;
748 }
749 ResolvedStage::Map(map) => {
750 let out_n = map(input, &mut scratch)
751 .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
752 .min(scratch.len());
753 publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
754 .map_err(|_| ExecError::new("stage publish failed"))?;
755 }
756 ResolvedStage::Filter(filter) => {
757 if filter(input)
758 .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
759 {
760 publish_with_policy(tx, input, on_full, &mut backpressure_wait)
761 .map_err(|_| ExecError::new("stage publish failed"))?;
762 }
763 }
764 ResolvedStage::Inspect(inspect) => {
765 inspect(input)
766 .map_err(|e| ExecError::with_context("stage error", e.to_string()))?;
767 publish_with_policy(tx, input, on_full, &mut backpressure_wait)
768 .map_err(|_| ExecError::new("stage publish failed"))?;
769 }
770 ResolvedStage::StatefulTransform(stateful) => {
771 let outcome = stateful
772 .call(input, &mut scratch)
773 .map_err(|e| ExecError::with_context("stage error", e.to_string()))?;
774 match outcome {
775 TransformOutcome::Drop => {}
776 TransformOutcome::ForwardInput => {
777 publish_with_policy(tx, input, on_full, &mut backpressure_wait)
778 .map_err(|_| ExecError::new("stage publish failed"))?;
779 }
780 TransformOutcome::ForwardOutput(out_n) => {
781 let out_n = out_n.min(scratch.len());
782 publish_with_policy(
783 tx,
784 &scratch[..out_n],
785 on_full,
786 &mut backpressure_wait,
787 )
788 .map_err(|_| ExecError::new("stage publish failed"))?;
789 }
790 _ => {
791 return Err(ExecError::new(
792 "unsupported transform outcome (non-exhaustive)",
793 ));
794 }
795 }
796 }
797 ResolvedStage::Stateful(stateful) => {
798 let out_n = stateful
799 .call(input, &mut scratch)
800 .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
801 .min(scratch.len());
802 publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
803 .map_err(|_| ExecError::new("stage publish failed"))?;
804 }
805 }
806
807 work.note_progress();
808 idle_wait.reset();
809 }
810
811 Ok(())
812 }
813
814 fn run_bridge_role_loop<Rx, Tx>(
815 stop: &AtomicBool,
816 activity: &RuntimeActivity,
817 role_activity: &RuntimeRoleActivity,
818 rx: &mut Rx,
819 tx: &mut Tx,
820 on_full: OnFullV1,
821 ) -> Result<(), ExecError>
822 where
823 Rx: LaneRxBorrow,
824 Tx: LaneTxWith,
825 {
826 let mut idle_wait = SpinWait::default();
827 let mut backpressure_wait = SpinWait::default();
828
829 while !stop.load(Ordering::Relaxed) {
830 let res = rx
831 .recv_with(|input| {
832 let mut work = activity.start_work(role_activity);
833 publish_with_policy_with(tx, on_full, &mut backpressure_wait, |slot| {
834 let n = input.len().min(slot.len());
835 slot[..n].copy_from_slice(&input[..n]);
836 n
837 })
838 .map_err(|_| ExecError::new("bridge publish failed"))?;
839 work.note_progress();
840 Ok::<(), ExecError>(())
841 })
842 .map_err(|_| ExecError::new("bridge recv failed"))?;
843 let did = match res {
844 None => false,
845 Some(r) => {
846 r?;
847 true
848 }
849 };
850 if did {
851 idle_wait.reset();
852 } else {
853 idle_wait.wait();
854 }
855 }
856 Ok(())
857 }
858
859 fn run_bridge_role_loop_copy<Rx, Tx>(
860 stop: &AtomicBool,
861 activity: &RuntimeActivity,
862 role_activity: &RuntimeRoleActivity,
863 rx: &mut Rx,
864 tx: &mut Tx,
865 on_full: OnFullV1,
866 ) -> Result<(), ExecError>
867 where
868 Rx: LaneRx,
869 Tx: LaneTx,
870 {
871 let mut idle_wait = SpinWait::default();
872 let mut backpressure_wait = SpinWait::default();
873 let mut buf = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
874
875 while !stop.load(Ordering::Relaxed) {
876 let Some(n) = rx.recv(&mut buf) else {
877 idle_wait.wait();
878 continue;
879 };
880
881 let mut work = activity.start_work(role_activity);
882 publish_with_policy(tx, &buf[..n], on_full, &mut backpressure_wait)
883 .map_err(|_| ExecError::new("bridge publish failed"))?;
884 work.note_progress();
885 idle_wait.reset();
886 }
887
888 Ok(())
889 }
890
891 fn run_stage_role_loop_sequenced_slots(
892 stop: &AtomicBool,
893 activity: &RuntimeActivity,
894 role_activity: &RuntimeRoleActivity,
895 rx: &mut SequencedSlotsRxOne,
896 tx: &mut ShmSequencedSlotsProducer,
897 mut stage: ResolvedStage,
898 on_full: OnFullV1,
899 ) -> Result<(), ExecError> {
900 let mut idle_wait = SpinWait::default();
901 let mut backpressure_wait = SpinWait::default();
902 let mut input = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
903 let mut scratch = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
904
905 while !stop.load(Ordering::Relaxed) {
906 let n = match rx.sub.recv_next_into_from_stoppable(
907 &rx.barrier,
908 &mut idle_wait,
909 &mut input,
910 stop,
911 ) {
912 Ok((_seq, n)) => n,
913 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => break,
914 Err(e) => {
915 return Err(ExecError::with_source(
916 "sequenced-slots stage recv failed",
917 e,
918 ))
919 }
920 };
921
922 let mut work = activity.start_work(role_activity);
923 let input = &input[..n];
924 match &mut stage {
925 ResolvedStage::MapOk(map_ok) => {
926 let out_n = map_ok(input, &mut scratch).min(scratch.len());
927 publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
928 .map_err(|_| ExecError::new("stage publish failed"))?;
929 }
930 ResolvedStage::Map(map) => {
931 let out_n = map(input, &mut scratch)
932 .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
933 .min(scratch.len());
934 publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
935 .map_err(|_| ExecError::new("stage publish failed"))?;
936 }
937 ResolvedStage::Filter(filter) => {
938 if filter(input)
939 .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
940 {
941 publish_with_policy(tx, input, on_full, &mut backpressure_wait)
942 .map_err(|_| ExecError::new("stage publish failed"))?;
943 }
944 }
945 ResolvedStage::Inspect(inspect) => {
946 inspect(input)
947 .map_err(|e| ExecError::with_context("stage error", e.to_string()))?;
948 publish_with_policy(tx, input, on_full, &mut backpressure_wait)
949 .map_err(|_| ExecError::new("stage publish failed"))?;
950 }
951 ResolvedStage::StatefulTransform(stateful) => {
952 let outcome = stateful
953 .call(input, &mut scratch)
954 .map_err(|e| ExecError::with_context("stage error", e.to_string()))?;
955 match outcome {
956 TransformOutcome::Drop => {}
957 TransformOutcome::ForwardInput => {
958 publish_with_policy(tx, input, on_full, &mut backpressure_wait)
959 .map_err(|_| ExecError::new("stage publish failed"))?;
960 }
961 TransformOutcome::ForwardOutput(out_n) => {
962 let out_n = out_n.min(scratch.len());
963 publish_with_policy(
964 tx,
965 &scratch[..out_n],
966 on_full,
967 &mut backpressure_wait,
968 )
969 .map_err(|_| ExecError::new("stage publish failed"))?;
970 }
971 _ => {
972 return Err(ExecError::new(
973 "unsupported transform outcome (non-exhaustive)",
974 ));
975 }
976 }
977 }
978 ResolvedStage::Stateful(stateful) => {
979 let out_n = stateful
980 .call(input, &mut scratch)
981 .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
982 .min(scratch.len());
983 publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
984 .map_err(|_| ExecError::new("stage publish failed"))?;
985 }
986 }
987
988 work.note_progress();
989 idle_wait.reset();
990 }
991
992 Ok(())
993 }
994
995 fn run_bridge_role_loop_sequenced_slots(
996 stop: &AtomicBool,
997 activity: &RuntimeActivity,
998 role_activity: &RuntimeRoleActivity,
999 rx: &mut SequencedSlotsRxOne,
1000 tx: &mut ShmSequencedSlotsProducer,
1001 on_full: OnFullV1,
1002 ) -> Result<(), ExecError> {
1003 let mut idle_wait = SpinWait::default();
1004 let mut backpressure_wait = SpinWait::default();
1005 let mut buf = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
1006
1007 while !stop.load(Ordering::Relaxed) {
1008 let n = match rx.sub.recv_next_into_from_stoppable(
1009 &rx.barrier,
1010 &mut idle_wait,
1011 &mut buf,
1012 stop,
1013 ) {
1014 Ok((_seq, n)) => n,
1015 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => break,
1016 Err(e) => {
1017 return Err(ExecError::with_source(
1018 "sequenced-slots bridge recv failed",
1019 e,
1020 ));
1021 }
1022 };
1023
1024 let mut work = activity.start_work(role_activity);
1025 publish_with_policy(tx, &buf[..n], on_full, &mut backpressure_wait)
1026 .map_err(|_| ExecError::new("bridge publish failed"))?;
1027 work.note_progress();
1028 idle_wait.reset();
1029 }
1030
1031 Ok(())
1032 }
1033
1034 fn run_merge_role_loop<Rx, Tx>(
1035 stop: &AtomicBool,
1036 activity: &RuntimeActivity,
1037 role_activity: &RuntimeRoleActivity,
1038 rxs: &mut [Rx],
1039 tx: &mut Tx,
1040 policy: MergePolicyV1,
1041 on_full: OnFullV1,
1042 ) -> Result<(), ExecError>
1043 where
1044 Rx: LaneRx,
1045 Tx: LaneTx,
1046 {
1047 if rxs.is_empty() {
1048 return Err(ExecError::new("merge requires at least one rx lane"));
1049 }
1050 if !matches!(policy, MergePolicyV1::RoundRobin) {
1051 return Err(ExecError::new("unsupported merge policy"));
1052 }
1053
1054 let mut idle_wait = SpinWait::default();
1055 let mut backpressure_wait = SpinWait::default();
1056 let mut rr: usize = 0;
1057 let mut buf = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
1058
1059 while !stop.load(Ordering::Relaxed) {
1060 let n = rxs.len();
1061 let start = rr % n;
1062 let mut did_any = false;
1063
1064 for offset in 0..n {
1065 let idx = (start + offset) % n;
1066 let Some(msg_n) = rxs[idx].recv(&mut buf) else {
1067 continue;
1068 };
1069 let mut work = activity.start_work(role_activity);
1070 publish_with_policy(tx, &buf[..msg_n], on_full, &mut backpressure_wait)
1071 .map_err(|_| ExecError::new("merge publish failed"))?;
1072 work.note_progress();
1073 rr = (idx + 1) % n;
1074 did_any = true;
1075 break;
1076 }
1077
1078 if !did_any {
1079 rr = (start + 1) % n;
1080 idle_wait.wait();
1081 } else {
1082 idle_wait.reset();
1083 }
1084 }
1085
1086 Ok(())
1087 }
1088
1089 fn run_router_role_loop(
1090 stop: &AtomicBool,
1091 activity: &RuntimeActivity,
1092 role_activity: &RuntimeRoleActivity,
1093 router: &ShmFanoutRouter,
1094 fanout_rxs: &mut [ShmFanoutRx],
1095 txs: &mut [AnyEventsTx],
1096 on_full: OnFullV1,
1097 ) -> Result<(), ExecError> {
1098 if fanout_rxs.is_empty() || txs.is_empty() {
1099 return Err(ExecError::new("router requires non-empty lanes"));
1100 }
1101 if fanout_rxs.len() != txs.len() {
1102 return Err(ExecError::new("router lane count mismatch"));
1103 }
1104
1105 let mut idle_wait = SpinWait::default();
1106 let mut backpressure_wait = SpinWait::default();
1107 let mut buf = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
1108
1109 while !stop.load(Ordering::Relaxed) {
1110 let routed = router.route_once(RouterMode::Broadcast);
1111 if !routed {
1112 idle_wait.wait();
1113 continue;
1114 }
1115
1116 let mut work = activity.start_work(role_activity);
1117 idle_wait.reset();
1118
1119 for (i, (rx, tx)) in fanout_rxs.iter_mut().zip(txs.iter_mut()).enumerate() {
1120 let Some(n) = rx.recv(&mut buf) else {
1121 return Err(ExecError::with_context(
1122 "fanout consumer recv failed",
1123 format!("consumer {i}"),
1124 ));
1125 };
1126 publish_with_policy(tx, &buf[..n], on_full, &mut backpressure_wait)
1127 .map_err(|_| ExecError::new("router publish failed"))?;
1128 }
1129
1130 work.note_progress();
1131 }
1132
1133 Ok(())
1134 }
1135
1136 pub struct LaneGraphSpRuntime {
1138 stop: Arc<AtomicBool>,
1139 activity: Arc<RuntimeActivity>,
1140 joins: Vec<JoinHandle<Result<(), ExecError>>>,
1141 }
1142
1143 impl LaneGraphSpRuntime {
1144 pub fn role_activity_snapshot(&self) -> Vec<LaneGraphRoleActivitySnapshot> {
1146 self.activity
1147 .roles
1148 .iter()
1149 .map(|role| role.snapshot())
1150 .collect()
1151 }
1152
1153 pub fn wait_until_idle(&self, timeout: Duration) -> Result<(), ExecError> {
1155 const POLL_INTERVAL: Duration = Duration::from_millis(10);
1156 const IDLE_SETTLE: Duration = Duration::from_millis(20);
1157
1158 let start = Instant::now();
1159 let mut stable_since: Option<Instant> = None;
1160 let mut last_epoch = self.activity.progress_epoch.load(Ordering::Relaxed);
1161
1162 loop {
1163 if self.joins.iter().any(|j| j.is_finished()) {
1164 return Err(ExecError::new("role exited before runtime became idle"));
1165 }
1166
1167 let active_roles = self.activity.active_roles.load(Ordering::Relaxed);
1168 let epoch = self.activity.progress_epoch.load(Ordering::Relaxed);
1169
1170 if active_roles == 0 {
1171 if epoch == last_epoch {
1172 let stable = stable_since.get_or_insert_with(Instant::now);
1173 if stable.elapsed() >= IDLE_SETTLE {
1174 return Ok(());
1175 }
1176 } else {
1177 last_epoch = epoch;
1178 stable_since = Some(Instant::now());
1179 }
1180 } else {
1181 last_epoch = epoch;
1182 stable_since = None;
1183 }
1184
1185 if start.elapsed() >= timeout {
1186 return Err(ExecError::with_context(
1187 "lane graph idle wait timed out",
1188 format!(
1189 "timeout_ms={} active_roles={} progress_epoch={epoch}",
1190 timeout.as_millis(),
1191 active_roles,
1192 ),
1193 ));
1194 }
1195
1196 std::thread::sleep(POLL_INTERVAL);
1197 }
1198 }
1199
1200 pub fn request_stop(&self) {
1202 self.stop.store(true, Ordering::Relaxed);
1203 }
1204
1205 pub fn join(&mut self) -> Result<(), ExecError> {
1207 for j in self.joins.drain(..) {
1208 let r = j
1209 .join()
1210 .map_err(|_| ExecError::new("role thread panicked"))?;
1211 r?;
1212 }
1213 Ok(())
1214 }
1215
1216 pub fn stop_and_join(mut self) -> Result<(), ExecError> {
1218 self.request_stop();
1219 self.join()
1220 }
1221 }
1222
1223 pub fn spawn_lane_graph_sp_shm_runtime(
1227 spec: &byteor_pipeline_spec::PipelineSpecV1,
1228 resolver: &dyn StageResolver,
1229 options: SpShmOptions,
1230 ) -> Result<LaneGraphSpRuntime, ExecError> {
1231 spawn_lane_graph_sp_shm_runtime_with_thread_init(spec, resolver, options, None)
1232 }
1233
1234 pub fn spawn_lane_graph_sp_shm_runtime_with_thread_init(
1237 spec: &byteor_pipeline_spec::PipelineSpecV1,
1238 resolver: &dyn StageResolver,
1239 options: SpShmOptions,
1240 thread_init: Option<ThreadInitHook>,
1241 ) -> Result<LaneGraphSpRuntime, ExecError> {
1242 options.ensure_dir()?;
1243
1244 crate::validate_sp(spec)?;
1245
1246 let byteor_pipeline_spec::PipelineSpecV1::LaneGraph(lg) = spec else {
1247 return Err(ExecError::new("spec is not a lane graph"));
1248 };
1249
1250 let lane_kinds: BTreeMap<String, LaneKindV1> =
1251 lg.lanes.iter().map(|l| (l.name.clone(), l.kind)).collect();
1252 let lane_kinds = Arc::new(lane_kinds);
1253
1254 let events_kinds = Arc::new(events_attach_kind_by_lane(lg));
1255
1256 let on_full = lg.on_full;
1257
1258 let stop = Arc::new(AtomicBool::new(false));
1259 let role_activities = lg
1260 .roles
1261 .iter()
1262 .map(|role| {
1263 Arc::new(RuntimeRoleActivity::new(
1264 role.name().to_string(),
1265 role_kind(role),
1266 ))
1267 })
1268 .collect::<Vec<_>>();
1269 let activity = Arc::new(RuntimeActivity::new(role_activities));
1270 let mut joins: Vec<JoinHandle<Result<(), ExecError>>> = Vec::with_capacity(lg.roles.len());
1271
1272 for (role, role_activity) in lg.roles.iter().cloned().zip(activity.roles.iter().cloned()) {
1273 let stop_t = stop.clone();
1274 let activity_t = activity.clone();
1275 let role_activity_t = role_activity.clone();
1276 let options_t = options.clone();
1277 let lane_kinds_t = lane_kinds.clone();
1278 let events_kinds_t = events_kinds.clone();
1279 let thread_init_t = thread_init.clone();
1280
1281 let role_name = role.name().to_string();
1282
1283 let join = match role {
1284 RoleCfgV1::Stage(cfg) => {
1285 let stage = resolve_or_err(resolver, cfg.stage.as_str())?;
1286 std::thread::Builder::new()
1287 .name(format!("byteor-sp:{}", role_name))
1288 .spawn(move || {
1289 if let Some(h) = &thread_init_t {
1290 let ctx =
1291 ThreadInitContext::new(role_name.clone(), RoleKind::Stage);
1292 h(&ctx)?;
1293 }
1294
1295 match (
1296 lane_kinds_t.get(cfg.rx.as_str()).copied(),
1297 lane_kinds_t.get(cfg.tx.as_str()).copied(),
1298 ) {
1299 (Some(LaneKindV1::Events), Some(LaneKindV1::Events)) => {
1300 let rx_kind = events_kinds_t
1301 .get(cfg.rx.as_str())
1302 .copied()
1303 .unwrap_or(EventsAttachKind::Mpsc);
1304 let tx_kind = events_kinds_t
1305 .get(cfg.tx.as_str())
1306 .copied()
1307 .unwrap_or(EventsAttachKind::Mpsc);
1308
1309 let mut rx =
1310 attach_events_rx_any(&options_t, cfg.rx.as_str(), rx_kind)?;
1311 let mut tx =
1312 attach_events_tx_any(&options_t, cfg.tx.as_str(), tx_kind)?;
1313
1314 run_stage_role_loop(
1315 stop_t.as_ref(),
1316 activity_t.as_ref(),
1317 role_activity_t.as_ref(),
1318 &mut rx,
1319 &mut tx,
1320 stage,
1321 on_full,
1322 )
1323 }
1324 (Some(LaneKindV1::Journal), Some(LaneKindV1::Journal)) => {
1325 let mut rx =
1326 attach_journal_rx_one(&options_t, cfg.rx.as_str())?;
1327 let mut tx =
1328 attach_journal_tx_one(&options_t, cfg.tx.as_str())?;
1329
1330 run_stage_role_loop_copy(
1331 stop_t.as_ref(),
1332 activity_t.as_ref(),
1333 role_activity_t.as_ref(),
1334 &mut rx,
1335 &mut tx,
1336 stage,
1337 on_full,
1338 )
1339 }
1340 (
1341 Some(LaneKindV1::SequencedSlots { .. }),
1342 Some(LaneKindV1::SequencedSlots { .. }),
1343 ) => {
1344 let mut rx =
1345 attach_sequenced_slots_rx_one(&options_t, cfg.rx.as_str())?;
1346 let mut tx =
1347 attach_sequenced_slots_tx_one(&options_t, cfg.tx.as_str())?;
1348
1349 run_stage_role_loop_sequenced_slots(
1350 stop_t.as_ref(),
1351 activity_t.as_ref(),
1352 role_activity_t.as_ref(),
1353 &mut rx,
1354 &mut tx,
1355 stage,
1356 on_full,
1357 )
1358 }
1359 _ => Err(ExecError::new("unsupported stage rx lane kind")),
1360 }
1361 })
1362 .map_err(|e| ExecError::with_source("failed to spawn role thread", e))?
1363 }
1364 RoleCfgV1::Bridge(cfg) => std::thread::Builder::new()
1365 .name(format!("byteor-sp:{}", role_name))
1366 .spawn(move || {
1367 if let Some(h) = &thread_init_t {
1368 let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Bridge);
1369 h(&ctx)?;
1370 }
1371
1372 let Some(rx_kind) = lane_kinds_t.get(cfg.rx.as_str()).copied() else {
1373 return Err(ExecError::new("unknown bridge rx lane"));
1374 };
1375 let Some(tx_kind) = lane_kinds_t.get(cfg.tx.as_str()).copied() else {
1376 return Err(ExecError::new("unknown bridge tx lane"));
1377 };
1378
1379 match (rx_kind, tx_kind) {
1380 (LaneKindV1::Events, LaneKindV1::Events) => {
1381 let rx_attach = events_kinds_t
1382 .get(cfg.rx.as_str())
1383 .copied()
1384 .unwrap_or(EventsAttachKind::Mpsc);
1385 let tx_attach = events_kinds_t
1386 .get(cfg.tx.as_str())
1387 .copied()
1388 .unwrap_or(EventsAttachKind::Mpsc);
1389 let mut rx =
1390 attach_events_rx_any(&options_t, cfg.rx.as_str(), rx_attach)?;
1391 let mut tx =
1392 attach_events_tx_any(&options_t, cfg.tx.as_str(), tx_attach)?;
1393 run_bridge_role_loop(
1394 stop_t.as_ref(),
1395 activity_t.as_ref(),
1396 role_activity_t.as_ref(),
1397 &mut rx,
1398 &mut tx,
1399 on_full,
1400 )
1401 }
1402 (LaneKindV1::Events, LaneKindV1::FanoutBroadcast { .. }) => {
1403 let rx_attach = events_kinds_t
1404 .get(cfg.rx.as_str())
1405 .copied()
1406 .unwrap_or(EventsAttachKind::Mpsc);
1407 let mut rx =
1408 attach_events_rx_any(&options_t, cfg.rx.as_str(), rx_attach)?;
1409 let mut tx = attach_fanout_tx_one(&options_t, cfg.tx.as_str())?;
1410 run_bridge_role_loop(
1411 stop_t.as_ref(),
1412 activity_t.as_ref(),
1413 role_activity_t.as_ref(),
1414 &mut rx,
1415 &mut tx,
1416 on_full,
1417 )
1418 }
1419 (LaneKindV1::Journal, LaneKindV1::Journal) => {
1420 let mut rx = attach_journal_rx_one(&options_t, cfg.rx.as_str())?;
1421 let mut tx = attach_journal_tx_one(&options_t, cfg.tx.as_str())?;
1422 run_bridge_role_loop_copy(
1423 stop_t.as_ref(),
1424 activity_t.as_ref(),
1425 role_activity_t.as_ref(),
1426 &mut rx,
1427 &mut tx,
1428 on_full,
1429 )
1430 }
1431 (
1432 LaneKindV1::SequencedSlots { .. },
1433 LaneKindV1::SequencedSlots { .. },
1434 ) => {
1435 let mut rx =
1436 attach_sequenced_slots_rx_one(&options_t, cfg.rx.as_str())?;
1437 let mut tx =
1438 attach_sequenced_slots_tx_one(&options_t, cfg.tx.as_str())?;
1439 run_bridge_role_loop_sequenced_slots(
1440 stop_t.as_ref(),
1441 activity_t.as_ref(),
1442 role_activity_t.as_ref(),
1443 &mut rx,
1444 &mut tx,
1445 on_full,
1446 )
1447 }
1448 _ => Err(ExecError::new("unsupported bridge lane kinds")),
1449 }
1450 })
1451 .map_err(|e| ExecError::with_source("failed to spawn role thread", e))?,
1452 RoleCfgV1::Merge(cfg) => std::thread::Builder::new()
1453 .name(format!("byteor-sp:{}", role_name))
1454 .spawn(move || {
1455 if let Some(h) = &thread_init_t {
1456 let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Merge);
1457 h(&ctx)?;
1458 }
1459
1460 match lane_kinds_t.get(cfg.tx.as_str()).copied() {
1461 Some(LaneKindV1::Events) => {
1462 let tx_attach = events_kinds_t
1463 .get(cfg.tx.as_str())
1464 .copied()
1465 .unwrap_or(EventsAttachKind::Mpsc);
1466 let mut tx =
1467 attach_events_tx_any(&options_t, cfg.tx.as_str(), tx_attach)?;
1468
1469 let mut rxs: Vec<AnyEventsRx> = Vec::with_capacity(cfg.rx.len());
1470 for rx_lane in &cfg.rx {
1471 let Some(LaneKindV1::Events) =
1472 lane_kinds_t.get(rx_lane.as_str()).copied()
1473 else {
1474 return Err(ExecError::new(
1475 "unsupported merge rx lane kind",
1476 ));
1477 };
1478 let rx_attach = events_kinds_t
1479 .get(rx_lane.as_str())
1480 .copied()
1481 .unwrap_or(EventsAttachKind::Mpsc);
1482 rxs.push(attach_events_rx_any(
1483 &options_t,
1484 rx_lane.as_str(),
1485 rx_attach,
1486 )?);
1487 }
1488
1489 run_merge_role_loop(
1490 stop_t.as_ref(),
1491 activity_t.as_ref(),
1492 role_activity_t.as_ref(),
1493 &mut rxs,
1494 &mut tx,
1495 cfg.policy,
1496 on_full,
1497 )
1498 }
1499 Some(LaneKindV1::Journal) => {
1500 let mut tx = attach_journal_tx_one(&options_t, cfg.tx.as_str())?;
1501
1502 let mut rxs: Vec<ShmJournalSubscriber> =
1503 Vec::with_capacity(cfg.rx.len());
1504 for rx_lane in &cfg.rx {
1505 let Some(LaneKindV1::Journal) =
1506 lane_kinds_t.get(rx_lane.as_str()).copied()
1507 else {
1508 return Err(ExecError::new(
1509 "unsupported merge rx lane kind",
1510 ));
1511 };
1512 rxs.push(attach_journal_rx_one(&options_t, rx_lane.as_str())?);
1513 }
1514
1515 run_merge_role_loop(
1516 stop_t.as_ref(),
1517 activity_t.as_ref(),
1518 role_activity_t.as_ref(),
1519 &mut rxs,
1520 &mut tx,
1521 cfg.policy,
1522 on_full,
1523 )
1524 }
1525 _ => Err(ExecError::new("unsupported merge tx lane kind")),
1526 }
1527 })
1528 .map_err(|e| ExecError::with_source("failed to spawn role thread", e))?,
1529 RoleCfgV1::Router(cfg) => std::thread::Builder::new()
1530 .name(format!("byteor-sp:{}", role_name))
1531 .spawn(move || {
1532 if let Some(h) = &thread_init_t {
1533 let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Router);
1534 h(&ctx)?;
1535 }
1536
1537 let Some(LaneKindV1::FanoutBroadcast { consumers }) =
1538 lane_kinds_t.get(cfg.rx.as_str()).copied()
1539 else {
1540 return Err(ExecError::new("unsupported router rx lane kind"));
1541 };
1542 let consumers = consumers as usize;
1543 if consumers > 4 {
1544 return Err(ExecError::new(
1545 "fanout consumers > 4 is not supported by shm backing",
1546 ));
1547 }
1548 if consumers != cfg.tx.len() {
1549 return Err(ExecError::new("router tx lane count mismatch"));
1550 }
1551
1552 let router = attach_fanout_router_one(&options_t, cfg.rx.as_str())?;
1553
1554 let mut fanout_rxs: Vec<ShmFanoutRx> = Vec::with_capacity(consumers);
1555 for idx in 0..consumers {
1556 fanout_rxs.push(attach_fanout_rx_one(
1557 &options_t,
1558 cfg.rx.as_str(),
1559 idx,
1560 )?);
1561 }
1562
1563 let mut txs: Vec<AnyEventsTx> = Vec::with_capacity(consumers);
1564 for tx_lane in &cfg.tx {
1565 let Some(LaneKindV1::Events) =
1566 lane_kinds_t.get(tx_lane.as_str()).copied()
1567 else {
1568 return Err(ExecError::new("router tx lanes must be events"));
1569 };
1570 let tx_attach = events_kinds_t
1571 .get(tx_lane.as_str())
1572 .copied()
1573 .unwrap_or(EventsAttachKind::Mpsc);
1574 txs.push(attach_events_tx_any(
1575 &options_t,
1576 tx_lane.as_str(),
1577 tx_attach,
1578 )?);
1579 }
1580
1581 run_router_role_loop(
1582 stop_t.as_ref(),
1583 activity_t.as_ref(),
1584 role_activity_t.as_ref(),
1585 &router,
1586 &mut fanout_rxs,
1587 &mut txs,
1588 on_full,
1589 )
1590 })
1591 .map_err(|e| ExecError::with_source("failed to spawn role thread", e))?,
1592 };
1593
1594 joins.push(join);
1595 }
1596
1597 Ok(LaneGraphSpRuntime {
1598 stop,
1599 activity,
1600 joins,
1601 })
1602 }
1603
1604 fn role_kind(role: &RoleCfgV1) -> RoleKind {
1605 match role {
1606 RoleCfgV1::Stage(_) => RoleKind::Stage,
1607 RoleCfgV1::Bridge(_) => RoleKind::Bridge,
1608 RoleCfgV1::Router(_) => RoleKind::Router,
1609 RoleCfgV1::Merge(_) => RoleKind::Merge,
1610 }
1611 }
1612
1613 pub fn run_lane_graph_mp_role_shm(
1620 spec: &byteor_pipeline_spec::PipelineSpecV1,
1621 resolver: &dyn StageResolver,
1622 options: SpShmOptions,
1623 role_name: &str,
1624 stop: &AtomicBool,
1625 ) -> Result<(), ExecError> {
1626 options.ensure_dir()?;
1627
1628 crate::validate_sp(spec)?;
1629
1630 let byteor_pipeline_spec::PipelineSpecV1::LaneGraph(lg) = spec else {
1631 return Err(ExecError::new("spec is not a lane graph"));
1632 };
1633
1634 let role = lg
1635 .roles
1636 .iter()
1637 .find(|r| r.name() == role_name)
1638 .ok_or_else(|| ExecError::with_context("unknown role", role_name.to_string()))?;
1639
1640 let lane_kinds: BTreeMap<String, LaneKindV1> =
1641 lg.lanes.iter().map(|l| (l.name.clone(), l.kind)).collect();
1642 let events_kinds = events_attach_kind_by_lane(lg);
1643 let on_full = lg.on_full;
1644 let role_activity = RuntimeRoleActivity::new(role.name().to_string(), role_kind(role));
1645 let activity = RuntimeActivity::new(Vec::new());
1646
1647 match role {
1648 RoleCfgV1::Stage(cfg) => {
1649 let stage = resolve_or_err(resolver, cfg.stage.as_str())?;
1650 match (
1651 lane_kinds.get(cfg.rx.as_str()).copied(),
1652 lane_kinds.get(cfg.tx.as_str()).copied(),
1653 ) {
1654 (Some(LaneKindV1::Events), Some(LaneKindV1::Events)) => {
1655 let rx_kind = events_kinds
1656 .get(cfg.rx.as_str())
1657 .copied()
1658 .unwrap_or(EventsAttachKind::Mpsc);
1659 let tx_kind = events_kinds
1660 .get(cfg.tx.as_str())
1661 .copied()
1662 .unwrap_or(EventsAttachKind::Mpsc);
1663
1664 let mut rx = attach_events_rx_any(&options, cfg.rx.as_str(), rx_kind)?;
1665 let mut tx = attach_events_tx_any(&options, cfg.tx.as_str(), tx_kind)?;
1666 run_stage_role_loop(
1667 stop,
1668 &activity,
1669 &role_activity,
1670 &mut rx,
1671 &mut tx,
1672 stage,
1673 on_full,
1674 )
1675 }
1676 (Some(LaneKindV1::Journal), Some(LaneKindV1::Journal)) => {
1677 let mut rx = attach_journal_rx_one(&options, cfg.rx.as_str())?;
1678 let mut tx = attach_journal_tx_one(&options, cfg.tx.as_str())?;
1679 run_stage_role_loop_copy(
1680 stop,
1681 &activity,
1682 &role_activity,
1683 &mut rx,
1684 &mut tx,
1685 stage,
1686 on_full,
1687 )
1688 }
1689 (
1690 Some(LaneKindV1::SequencedSlots { .. }),
1691 Some(LaneKindV1::SequencedSlots { .. }),
1692 ) => {
1693 let mut rx = attach_sequenced_slots_rx_one(&options, cfg.rx.as_str())?;
1694 let mut tx = attach_sequenced_slots_tx_one(&options, cfg.tx.as_str())?;
1695 run_stage_role_loop_sequenced_slots(
1696 stop,
1697 &activity,
1698 &role_activity,
1699 &mut rx,
1700 &mut tx,
1701 stage,
1702 on_full,
1703 )
1704 }
1705 _ => Err(ExecError::new("unsupported stage rx lane kind")),
1706 }
1707 }
1708 RoleCfgV1::Bridge(cfg) => {
1709 let Some(rx_kind) = lane_kinds.get(cfg.rx.as_str()).copied() else {
1710 return Err(ExecError::new("unknown bridge rx lane"));
1711 };
1712 let Some(tx_kind) = lane_kinds.get(cfg.tx.as_str()).copied() else {
1713 return Err(ExecError::new("unknown bridge tx lane"));
1714 };
1715
1716 match (rx_kind, tx_kind) {
1717 (LaneKindV1::Events, LaneKindV1::Events) => {
1718 let rx_attach = events_kinds
1719 .get(cfg.rx.as_str())
1720 .copied()
1721 .unwrap_or(EventsAttachKind::Mpsc);
1722 let tx_attach = events_kinds
1723 .get(cfg.tx.as_str())
1724 .copied()
1725 .unwrap_or(EventsAttachKind::Mpsc);
1726 let mut rx = attach_events_rx_any(&options, cfg.rx.as_str(), rx_attach)?;
1727 let mut tx = attach_events_tx_any(&options, cfg.tx.as_str(), tx_attach)?;
1728 run_bridge_role_loop(
1729 stop,
1730 &activity,
1731 &role_activity,
1732 &mut rx,
1733 &mut tx,
1734 on_full,
1735 )
1736 }
1737 (LaneKindV1::Events, LaneKindV1::FanoutBroadcast { .. }) => {
1738 let rx_attach = events_kinds
1739 .get(cfg.rx.as_str())
1740 .copied()
1741 .unwrap_or(EventsAttachKind::Mpsc);
1742 let mut rx = attach_events_rx_any(&options, cfg.rx.as_str(), rx_attach)?;
1743 let mut tx = attach_fanout_tx_one(&options, cfg.tx.as_str())?;
1744 run_bridge_role_loop(
1745 stop,
1746 &activity,
1747 &role_activity,
1748 &mut rx,
1749 &mut tx,
1750 on_full,
1751 )
1752 }
1753 (LaneKindV1::Journal, LaneKindV1::Journal) => {
1754 let mut rx = attach_journal_rx_one(&options, cfg.rx.as_str())?;
1755 let mut tx = attach_journal_tx_one(&options, cfg.tx.as_str())?;
1756 run_bridge_role_loop_copy(
1757 stop,
1758 &activity,
1759 &role_activity,
1760 &mut rx,
1761 &mut tx,
1762 on_full,
1763 )
1764 }
1765 (LaneKindV1::SequencedSlots { .. }, LaneKindV1::SequencedSlots { .. }) => {
1766 let mut rx = attach_sequenced_slots_rx_one(&options, cfg.rx.as_str())?;
1767 let mut tx = attach_sequenced_slots_tx_one(&options, cfg.tx.as_str())?;
1768 run_bridge_role_loop_sequenced_slots(
1769 stop,
1770 &activity,
1771 &role_activity,
1772 &mut rx,
1773 &mut tx,
1774 on_full,
1775 )
1776 }
1777 _ => Err(ExecError::new("unsupported bridge lane kinds")),
1778 }
1779 }
1780 RoleCfgV1::Merge(cfg) => match lane_kinds.get(cfg.tx.as_str()).copied() {
1781 Some(LaneKindV1::Events) => {
1782 let tx_attach = events_kinds
1783 .get(cfg.tx.as_str())
1784 .copied()
1785 .unwrap_or(EventsAttachKind::Mpsc);
1786 let mut tx = attach_events_tx_any(&options, cfg.tx.as_str(), tx_attach)?;
1787
1788 let mut rxs: Vec<AnyEventsRx> = Vec::with_capacity(cfg.rx.len());
1789 for rx_lane in &cfg.rx {
1790 let Some(LaneKindV1::Events) = lane_kinds.get(rx_lane.as_str()).copied()
1791 else {
1792 return Err(ExecError::new("unsupported merge rx lane kind"));
1793 };
1794 let rx_attach = events_kinds
1795 .get(rx_lane.as_str())
1796 .copied()
1797 .unwrap_or(EventsAttachKind::Mpsc);
1798 rxs.push(attach_events_rx_any(&options, rx_lane.as_str(), rx_attach)?);
1799 }
1800
1801 run_merge_role_loop(
1802 stop,
1803 &activity,
1804 &role_activity,
1805 &mut rxs,
1806 &mut tx,
1807 cfg.policy,
1808 on_full,
1809 )
1810 }
1811 Some(LaneKindV1::Journal) => {
1812 let mut tx = attach_journal_tx_one(&options, cfg.tx.as_str())?;
1813 let mut rxs: Vec<ShmJournalSubscriber> = Vec::with_capacity(cfg.rx.len());
1814 for rx_lane in &cfg.rx {
1815 let Some(LaneKindV1::Journal) = lane_kinds.get(rx_lane.as_str()).copied()
1816 else {
1817 return Err(ExecError::new("unsupported merge rx lane kind"));
1818 };
1819 rxs.push(attach_journal_rx_one(&options, rx_lane.as_str())?);
1820 }
1821
1822 run_merge_role_loop(
1823 stop,
1824 &activity,
1825 &role_activity,
1826 &mut rxs,
1827 &mut tx,
1828 cfg.policy,
1829 on_full,
1830 )
1831 }
1832 _ => Err(ExecError::new("unsupported merge tx lane kind")),
1833 },
1834 RoleCfgV1::Router(cfg) => {
1835 let Some(LaneKindV1::FanoutBroadcast { consumers }) =
1836 lane_kinds.get(cfg.rx.as_str()).copied()
1837 else {
1838 return Err(ExecError::new("unsupported router rx lane kind"));
1839 };
1840 let consumers = consumers as usize;
1841 if consumers > 4 {
1842 return Err(ExecError::new(
1843 "fanout consumers > 4 is not supported by shm backing",
1844 ));
1845 }
1846 if consumers != cfg.tx.len() {
1847 return Err(ExecError::new("router tx lane count mismatch"));
1848 }
1849
1850 let router = attach_fanout_router_one(&options, cfg.rx.as_str())?;
1851
1852 let mut fanout_rxs: Vec<ShmFanoutRx> = Vec::with_capacity(consumers);
1853 for idx in 0..consumers {
1854 fanout_rxs.push(attach_fanout_rx_one(&options, cfg.rx.as_str(), idx)?);
1855 }
1856
1857 let mut txs: Vec<AnyEventsTx> = Vec::with_capacity(consumers);
1858 for tx_lane in &cfg.tx {
1859 let Some(LaneKindV1::Events) = lane_kinds.get(tx_lane.as_str()).copied() else {
1860 return Err(ExecError::new("router tx lanes must be events"));
1861 };
1862 let tx_attach = events_kinds
1863 .get(tx_lane.as_str())
1864 .copied()
1865 .unwrap_or(EventsAttachKind::Mpsc);
1866 txs.push(attach_events_tx_any(&options, tx_lane.as_str(), tx_attach)?);
1867 }
1868
1869 run_router_role_loop(
1870 stop,
1871 &activity,
1872 &role_activity,
1873 &router,
1874 &mut fanout_rxs,
1875 &mut txs,
1876 on_full,
1877 )
1878 }
1879 }
1880 }
1881}
1882
1883#[cfg(feature = "shm")]
1884pub use shm::{
1885 run_lane_graph_mp_role_shm, run_sp, spawn_lane_graph_sp_shm_runtime,
1886 spawn_lane_graph_sp_shm_runtime_with_thread_init, LaneGraphRoleActivitySnapshot,
1887 LaneGraphSpRuntime, SpShmOptions,
1888};