byteor_pipeline_exec/
sp.rs

1//! SP runtime scaffolding.
2
3use byteor_pipeline_spec::{validate_v1, PipelineSpecV1};
4
5use crate::ExecError;
6
7/// Validate an SP spec.
8pub fn validate_sp(spec: &PipelineSpecV1) -> Result<(), ExecError> {
9    validate_v1(spec)
10        .map_err(|e| ExecError::with_context("spec validation failed", e.message().to_string()))
11}
12
13#[cfg(feature = "shm")]
14mod shm {
15    use std::collections::BTreeMap;
16    use std::path::PathBuf;
17    use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
18    use std::sync::Arc;
19    use std::thread::JoinHandle;
20    use std::time::{Duration, Instant};
21
22    use byteor_pipeline_backings_shm::{
23        attach_events_mpsc_rx, attach_events_mpsc_tx, attach_events_rx, attach_events_tx,
24        attach_fanout_router, attach_fanout_rx, attach_fanout_tx, AttachOptions, BackingError,
25        CursorBarrier, ShmEventsMpscRx, ShmEventsMpscTx, ShmEventsRx, ShmEventsTx, ShmFanoutRouter,
26        ShmFanoutRx, ShmFanoutTx, ShmJournalPublisher, ShmJournalSubscriber, ShmSequencedSlots,
27        ShmSequencedSlotsProducer, ShmSequencedSlotsSubscriber,
28    };
29    use byteor_pipeline_kernel::step::{publish_with_policy, TransformOutcome};
30    use byteor_pipeline_kernel::{
31        LaneRx, LaneRxBorrow, LaneTx, LaneTxError, LaneTxWith, LaneTxWithError, LaneTxWithResult,
32    };
33    use byteor_pipeline_spec::{
34        EndpointKindV1, LaneGraphV1, LaneKindV1, MergePolicyV1, OnFullV1, RoleCfgV1,
35    };
36    use indexbus_core::{RouterMode, SpinWait, WaitStrategy};
37
38    use crate::stage::resolve_or_err;
39    use crate::thread_init::{RoleKind, ThreadInitContext, ThreadInitHook};
40    use crate::{ExecError, ResolvedStage, StageResolver};
41
42    /// Start an SP runtime for a LaneGraph spec (SHM-backed, thread-per-role).
43    pub fn run_sp(
44        spec: &byteor_pipeline_spec::PipelineSpecV1,
45        resolver: &dyn StageResolver,
46        options: SpShmOptions,
47    ) -> Result<LaneGraphSpRuntime, ExecError> {
48        spawn_lane_graph_sp_shm_runtime(spec, resolver, options)
49    }
50
51    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
52    enum EventsAttachKind {
53        Spsc,
54        Mpsc,
55    }
56
57    fn backing_err(msg: &'static str, lane: &str, e: BackingError) -> ExecError {
58        ExecError::with_context_source(msg, lane.to_string(), e)
59    }
60
61    fn sanitize_lane_name(lane: &str) -> String {
62        lane.chars()
63            .map(|c| {
64                if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
65                    c
66                } else {
67                    '_'
68                }
69            })
70            .collect()
71    }
72
73    /// SHM attach options for the LaneGraph SP runtime.
74    ///
75    /// If `dir` is set, each lane is backed by its own mmap file at
76    /// `${dir}/${lane}.mmap` (with basic sanitization applied).
77    #[derive(Clone, Debug, Default)]
78    pub struct SpShmOptions {
79        /// Whether to prefer blocking waits (non-HFT mode).
80        pub blocking: bool,
81        /// Optional directory where per-lane mmap files should be created.
82        pub dir: Option<PathBuf>,
83    }
84
85    impl SpShmOptions {
86        fn lane_path(&self, lane: &str) -> Option<PathBuf> {
87            let dir = self.dir.as_ref()?;
88            let lane = sanitize_lane_name(lane);
89            Some(dir.join(format!("{lane}.mmap")))
90        }
91
92        /// Build backing attach options for a specific lane.
93        pub fn lane_attach_options(&self, lane: &str) -> AttachOptions {
94            AttachOptions {
95                blocking: self.blocking,
96                prefault: false,
97                path: self.lane_path(lane),
98                queue: 0,
99            }
100        }
101
102        fn lane_attach_options_with_queue(&self, lane: &str, queue: usize) -> AttachOptions {
103            AttachOptions {
104                blocking: self.blocking,
105                prefault: false,
106                path: self.lane_path(lane),
107                queue,
108            }
109        }
110
111        fn ensure_dir(&self) -> Result<(), ExecError> {
112            let Some(dir) = &self.dir else {
113                return Ok(());
114            };
115            std::fs::create_dir_all(dir).map_err(|e| {
116                ExecError::with_context_source(
117                    "failed to create shm dir",
118                    dir.display().to_string(),
119                    e,
120                )
121            })
122        }
123    }
124
125    enum AnyEventsRx {
126        Spsc(ShmEventsRx),
127        Mpsc(ShmEventsMpscRx),
128    }
129
130    impl LaneRx for AnyEventsRx {
131        fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
132            match self {
133                AnyEventsRx::Spsc(rx) => rx.recv(buf),
134                AnyEventsRx::Mpsc(rx) => rx.recv(buf),
135            }
136        }
137    }
138
139    impl LaneRxBorrow for AnyEventsRx {
140        fn recv_with<F, R>(
141            &mut self,
142            f: F,
143        ) -> core::result::Result<Option<R>, byteor_pipeline_kernel::LaneRxError>
144        where
145            F: FnOnce(&[u8]) -> R,
146        {
147            match self {
148                AnyEventsRx::Spsc(rx) => rx.recv_with(f),
149                AnyEventsRx::Mpsc(rx) => rx.recv_with(f),
150            }
151        }
152    }
153
154    enum AnyEventsTx {
155        Spsc(ShmEventsTx),
156        Mpsc(ShmEventsMpscTx),
157    }
158
159    struct RuntimeActivity {
160        active_roles: AtomicUsize,
161        progress_epoch: AtomicU64,
162        roles: Vec<Arc<RuntimeRoleActivity>>,
163    }
164
165    #[derive(Debug)]
166    struct RuntimeRoleActivity {
167        name: String,
168        role_kind: RoleKind,
169        active_work: AtomicUsize,
170        progress_epoch: AtomicU64,
171    }
172
173    impl RuntimeRoleActivity {
174        fn new(name: String, role_kind: RoleKind) -> Self {
175            Self {
176                name,
177                role_kind,
178                active_work: AtomicUsize::new(0),
179                progress_epoch: AtomicU64::new(0),
180            }
181        }
182
183        fn snapshot(&self) -> LaneGraphRoleActivitySnapshot {
184            LaneGraphRoleActivitySnapshot {
185                name: self.name.clone(),
186                role_kind: self.role_kind,
187                active_work: self.active_work.load(Ordering::Relaxed),
188                processed_count: self.progress_epoch.load(Ordering::Relaxed),
189                progress_epoch: self.progress_epoch.load(Ordering::Relaxed),
190            }
191        }
192    }
193
194    /// Activity snapshot for a single LaneGraph role thread.
195    #[derive(Clone, Debug, PartialEq, Eq)]
196    pub struct LaneGraphRoleActivitySnapshot {
197        /// Stable role name from the spec.
198        pub name: String,
199        /// Role kind.
200        pub role_kind: RoleKind,
201        /// Count of currently in-flight work units for this role.
202        pub active_work: usize,
203        /// Count of completed work units observed for this role.
204        pub processed_count: u64,
205        /// Monotonic progress counter incremented when the role completes work.
206        pub progress_epoch: u64,
207    }
208
209    impl LaneGraphRoleActivitySnapshot {
210        /// Returns true when the role is actively processing work.
211        pub fn is_active(&self) -> bool {
212            self.active_work > 0
213        }
214
215        /// Human-readable runtime status for operators.
216        pub fn status(&self) -> &'static str {
217            if self.is_active() {
218                "active"
219            } else {
220                "idle"
221            }
222        }
223    }
224
225    impl RuntimeActivity {
226        fn new(roles: Vec<Arc<RuntimeRoleActivity>>) -> Self {
227            Self {
228                active_roles: AtomicUsize::new(0),
229                progress_epoch: AtomicU64::new(0),
230                roles,
231            }
232        }
233
234        fn start_work<'a>(&'a self, role: &'a RuntimeRoleActivity) -> RuntimeWorkGuard<'a> {
235            self.active_roles.fetch_add(1, Ordering::Relaxed);
236            role.active_work.fetch_add(1, Ordering::Relaxed);
237            RuntimeWorkGuard {
238                activity: self,
239                role,
240                progressed: false,
241            }
242        }
243    }
244
245    struct RuntimeWorkGuard<'a> {
246        activity: &'a RuntimeActivity,
247        role: &'a RuntimeRoleActivity,
248        progressed: bool,
249    }
250
251    impl RuntimeWorkGuard<'_> {
252        fn note_progress(&mut self) {
253            if !self.progressed {
254                self.progressed = true;
255                self.activity.progress_epoch.fetch_add(1, Ordering::Relaxed);
256                self.role.progress_epoch.fetch_add(1, Ordering::Relaxed);
257            }
258        }
259    }
260
261    impl Drop for RuntimeWorkGuard<'_> {
262        fn drop(&mut self) {
263            self.activity.active_roles.fetch_sub(1, Ordering::Relaxed);
264            self.role.active_work.fetch_sub(1, Ordering::Relaxed);
265        }
266    }
267
268    impl LaneTx for AnyEventsTx {
269        fn publish(&mut self, msg: &[u8]) -> core::result::Result<(), LaneTxError> {
270            match self {
271                AnyEventsTx::Spsc(tx) => tx.publish(msg),
272                AnyEventsTx::Mpsc(tx) => tx.publish(msg),
273            }
274        }
275    }
276
277    impl LaneTxWith for AnyEventsTx {
278        fn publish_with<F>(&mut self, f: F) -> core::result::Result<(), LaneTxError>
279        where
280            F: FnOnce(&mut [u8]) -> usize,
281        {
282            match self {
283                AnyEventsTx::Spsc(tx) => tx.publish_with(f),
284                AnyEventsTx::Mpsc(tx) => tx.publish_with(f),
285            }
286        }
287    }
288
289    impl byteor_pipeline_kernel::LaneTxWithResult for AnyEventsTx {
290        fn publish_with_result<F, E>(
291            &mut self,
292            f: F,
293        ) -> core::result::Result<(), byteor_pipeline_kernel::LaneTxWithError<E>>
294        where
295            F: FnOnce(&mut [u8]) -> core::result::Result<usize, E>,
296        {
297            match self {
298                AnyEventsTx::Spsc(tx) => tx.publish_with_result(f),
299                AnyEventsTx::Mpsc(tx) => tx.publish_with_result(f),
300            }
301        }
302    }
303
304    fn events_attach_kind_by_lane(lg: &LaneGraphV1) -> BTreeMap<String, EventsAttachKind> {
305        #[derive(Clone, Copy, Default)]
306        struct LaneStats {
307            producers: u32,
308        }
309
310        let mut stats = BTreeMap::<&str, LaneStats>::new();
311        for lane in &lg.lanes {
312            stats.insert(lane.name.as_str(), LaneStats::default());
313        }
314
315        for ep in &lg.endpoints {
316            let Some(s) = stats.get_mut(ep.lane.as_str()) else {
317                continue;
318            };
319            if matches!(ep.kind, EndpointKindV1::Ingress) {
320                s.producers = s.producers.saturating_add(1);
321            }
322        }
323
324        for role in &lg.roles {
325            match role {
326                RoleCfgV1::Stage(cfg) => {
327                    if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
328                        s.producers = s.producers.saturating_add(1);
329                    }
330                }
331                RoleCfgV1::Bridge(cfg) => {
332                    if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
333                        s.producers = s.producers.saturating_add(1);
334                    }
335                }
336                RoleCfgV1::Router(cfg) => {
337                    for tx in &cfg.tx {
338                        if let Some(s) = stats.get_mut(tx.as_str()) {
339                            s.producers = s.producers.saturating_add(1);
340                        }
341                    }
342                }
343                RoleCfgV1::Merge(cfg) => {
344                    if let Some(s) = stats.get_mut(cfg.tx.as_str()) {
345                        s.producers = s.producers.saturating_add(1);
346                    }
347                }
348            }
349        }
350
351        let mut out = BTreeMap::<String, EventsAttachKind>::new();
352        for lane in &lg.lanes {
353            if !matches!(lane.kind, LaneKindV1::Events) {
354                continue;
355            }
356            let producers = stats
357                .get(lane.name.as_str())
358                .copied()
359                .unwrap_or_default()
360                .producers;
361
362            // In v1, Events lanes may be multi-producer but must be single-consumer.
363            // We choose SPSC for single-producer, MPSC otherwise.
364            let kind = if producers > 1 {
365                EventsAttachKind::Mpsc
366            } else {
367                EventsAttachKind::Spsc
368            };
369            out.insert(lane.name.clone(), kind);
370        }
371        out
372    }
373
374    fn attach_events_rx_any(
375        opts: &SpShmOptions,
376        lane: &str,
377        kind: EventsAttachKind,
378    ) -> Result<AnyEventsRx, ExecError> {
379        let o = opts.lane_attach_options(lane);
380        match kind {
381            EventsAttachKind::Spsc => attach_events_rx(&o, lane)
382                .map(AnyEventsRx::Spsc)
383                .map_err(|e| backing_err("events rx attach failed", lane, e)),
384            EventsAttachKind::Mpsc => attach_events_mpsc_rx(&o, lane)
385                .map(AnyEventsRx::Mpsc)
386                .map_err(|e| backing_err("events mpsc rx attach failed", lane, e)),
387        }
388    }
389
390    fn attach_events_tx_any(
391        opts: &SpShmOptions,
392        lane: &str,
393        kind: EventsAttachKind,
394    ) -> Result<AnyEventsTx, ExecError> {
395        let o = opts.lane_attach_options(lane);
396        match kind {
397            EventsAttachKind::Spsc => attach_events_tx(&o, lane)
398                .map(AnyEventsTx::Spsc)
399                .map_err(|e| backing_err("events tx attach failed", lane, e)),
400            EventsAttachKind::Mpsc => attach_events_mpsc_tx(&o, lane)
401                .map(AnyEventsTx::Mpsc)
402                .map_err(|e| backing_err("events mpsc tx attach failed", lane, e)),
403        }
404    }
405
406    fn attach_fanout_tx_one(opts: &SpShmOptions, lane: &str) -> Result<ShmFanoutTx, ExecError> {
407        let o = opts.lane_attach_options(lane);
408        attach_fanout_tx(&o, lane).map_err(|e| backing_err("fanout tx attach failed", lane, e))
409    }
410
411    fn attach_fanout_router_one(
412        opts: &SpShmOptions,
413        lane: &str,
414    ) -> Result<ShmFanoutRouter, ExecError> {
415        let o = opts.lane_attach_options(lane);
416        attach_fanout_router(&o, lane)
417            .map_err(|e| backing_err("fanout router attach failed", lane, e))
418    }
419
420    fn attach_fanout_rx_one(
421        opts: &SpShmOptions,
422        lane: &str,
423        consumer_idx: usize,
424    ) -> Result<ShmFanoutRx, ExecError> {
425        let o = opts.lane_attach_options_with_queue(lane, consumer_idx);
426        attach_fanout_rx(&o, lane).map_err(|e| backing_err("fanout rx attach failed", lane, e))
427    }
428
429    fn attach_journal_rx_one(
430        opts: &SpShmOptions,
431        lane: &str,
432    ) -> Result<ShmJournalSubscriber, ExecError> {
433        let o = opts.lane_attach_options(lane);
434        byteor_pipeline_backings_shm::attach_journal_subscriber_tail(&o, lane)
435            .map_err(|e| backing_err("journal rx attach failed", lane, e))
436    }
437
438    fn attach_journal_tx_one(
439        opts: &SpShmOptions,
440        lane: &str,
441    ) -> Result<ShmJournalPublisher, ExecError> {
442        let o = opts.lane_attach_options(lane);
443        byteor_pipeline_backings_shm::attach_journal_publisher(
444            &o,
445            lane,
446            indexbus_log::JournalPublisherConfig::default(),
447        )
448        .map_err(|e| backing_err("journal tx attach failed", lane, e))
449    }
450
451    struct SequencedSlotsRxOne {
452        barrier: CursorBarrier,
453        sub: ShmSequencedSlotsSubscriber,
454    }
455
456    fn attach_sequenced_slots_rx_one(
457        opts: &SpShmOptions,
458        lane: &str,
459    ) -> Result<SequencedSlotsRxOne, ExecError> {
460        let o = opts.lane_attach_options(lane);
461        let shm = ShmSequencedSlots::attach(&o, lane)
462            .map_err(|e| backing_err("sequenced-slots rx attach failed", lane, e))?;
463        let barrier = CursorBarrier::new(&shm);
464        let sub = ShmSequencedSlotsSubscriber::attach(&o, lane, 0)
465            .map_err(|e| backing_err("sequenced-slots rx attach failed", lane, e))?;
466        Ok(SequencedSlotsRxOne { barrier, sub })
467    }
468
469    fn attach_sequenced_slots_tx_one(
470        opts: &SpShmOptions,
471        lane: &str,
472    ) -> Result<ShmSequencedSlotsProducer, ExecError> {
473        let o = opts.lane_attach_options(lane);
474        ShmSequencedSlotsProducer::attach(&o, lane, 1, 1)
475            .map_err(|e| backing_err("sequenced-slots tx attach failed", lane, e))
476    }
477
478    fn publish_with_policy_with<Tx: LaneTxWith>(
479        tx: &mut Tx,
480        on_full: OnFullV1,
481        wait: &mut impl WaitStrategy,
482        mut fill: impl FnMut(&mut [u8]) -> usize,
483    ) -> core::result::Result<(), LaneTxError> {
484        loop {
485            match tx.publish_with(|slot| fill(slot).min(slot.len())) {
486                Ok(()) => {
487                    wait.reset();
488                    return Ok(());
489                }
490                Err(LaneTxError::Full) => match on_full {
491                    OnFullV1::Drop => return Ok(()),
492                    OnFullV1::Block => {
493                        wait.wait();
494                        continue;
495                    }
496                },
497                Err(error) => return Err(error),
498            }
499        }
500    }
501
502    fn publish_with_policy_with_result<Tx: LaneTxWithResult, E>(
503        tx: &mut Tx,
504        on_full: OnFullV1,
505        wait: &mut impl WaitStrategy,
506        mut fill: impl FnMut(&mut [u8]) -> core::result::Result<usize, E>,
507    ) -> core::result::Result<(), LaneTxWithError<E>> {
508        loop {
509            match tx.publish_with_result(|slot| fill(slot).map(|n| n.min(slot.len()))) {
510                Ok(()) => {
511                    wait.reset();
512                    return Ok(());
513                }
514                Err(LaneTxWithError::Full) => match on_full {
515                    OnFullV1::Drop => return Ok(()),
516                    OnFullV1::Block => {
517                        wait.wait();
518                        continue;
519                    }
520                },
521                Err(error @ LaneTxWithError::Encode(_)) => return Err(error),
522                Err(error @ LaneTxWithError::Failed) => return Err(error),
523            }
524        }
525    }
526
527    fn run_stage_role_loop(
528        stop: &AtomicBool,
529        activity: &RuntimeActivity,
530        role_activity: &RuntimeRoleActivity,
531        rx: &mut AnyEventsRx,
532        tx: &mut AnyEventsTx,
533        mut stage: ResolvedStage,
534        on_full: OnFullV1,
535    ) -> Result<(), ExecError> {
536        let mut idle_wait = SpinWait::default();
537        let mut backpressure_wait = SpinWait::default();
538        let mut scratch = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
539
540        while !stop.load(Ordering::Relaxed) {
541            let did = match &mut stage {
542                ResolvedStage::MapOk(map_ok) => {
543                    let res = rx
544                        .recv_with(|input| {
545                            let mut work = activity.start_work(role_activity);
546                            publish_with_policy_with(tx, on_full, &mut backpressure_wait, |slot| {
547                                map_ok(input, slot)
548                            })
549                            .map_err(|_| ExecError::new("stage publish failed"))?;
550                            work.note_progress();
551                            Ok::<(), ExecError>(())
552                        })
553                        .map_err(|_| ExecError::new("stage recv failed"))?;
554                    match res {
555                        None => false,
556                        Some(r) => {
557                            r?;
558                            true
559                        }
560                    }
561                }
562                ResolvedStage::Map(map) => {
563                    let res = rx
564                        .recv_with(|input| {
565                            let mut work = activity.start_work(role_activity);
566                            publish_with_policy_with_result(
567                                tx,
568                                on_full,
569                                &mut backpressure_wait,
570                                |slot| map(input, slot),
571                            )
572                            .map_err(|error| match error {
573                                byteor_pipeline_kernel::LaneTxWithError::Encode(error) => {
574                                    ExecError::with_context("stage error", error.to_string())
575                                }
576                                _ => ExecError::new("stage publish failed"),
577                            })?;
578                            work.note_progress();
579                            Ok::<(), ExecError>(())
580                        })
581                        .map_err(|_| ExecError::new("stage recv failed"))?;
582                    match res {
583                        None => false,
584                        Some(r) => {
585                            r?;
586                            true
587                        }
588                    }
589                }
590                ResolvedStage::Filter(filter) => {
591                    let res = rx
592                        .recv_with(|input| {
593                            let mut work = activity.start_work(role_activity);
594                            let keep = filter(input).map_err(|e| {
595                                ExecError::with_context("stage error", e.to_string())
596                            })?;
597                            if keep {
598                                publish_with_policy(tx, input, on_full, &mut backpressure_wait)
599                                    .map_err(|_| ExecError::new("stage publish failed"))?;
600                            }
601                            work.note_progress();
602                            Ok::<(), ExecError>(())
603                        })
604                        .map_err(|_| ExecError::new("stage recv failed"))?;
605                    match res {
606                        None => false,
607                        Some(r) => {
608                            r?;
609                            true
610                        }
611                    }
612                }
613                ResolvedStage::Inspect(inspect) => {
614                    let res = rx
615                        .recv_with(|input| {
616                            let mut work = activity.start_work(role_activity);
617                            inspect(input).map_err(|e| {
618                                ExecError::with_context("stage error", e.to_string())
619                            })?;
620                            publish_with_policy(tx, input, on_full, &mut backpressure_wait)
621                                .map_err(|_| ExecError::new("stage publish failed"))?;
622                            work.note_progress();
623                            Ok::<(), ExecError>(())
624                        })
625                        .map_err(|_| ExecError::new("stage recv failed"))?;
626                    match res {
627                        None => false,
628                        Some(r) => {
629                            r?;
630                            true
631                        }
632                    }
633                }
634                ResolvedStage::StatefulTransform(stateful) => {
635                    let res = rx
636                        .recv_with(|input| {
637                            let mut work = activity.start_work(role_activity);
638                            let outcome = stateful.call(input, &mut scratch).map_err(|e| {
639                                ExecError::with_context("stage error", e.to_string())
640                            })?;
641
642                            match outcome {
643                                TransformOutcome::Drop => {}
644                                TransformOutcome::ForwardInput => {
645                                    publish_with_policy(tx, input, on_full, &mut backpressure_wait)
646                                        .map_err(|_| ExecError::new("stage publish failed"))?;
647                                }
648                                TransformOutcome::ForwardOutput(out_n) => {
649                                    let out_n = out_n.min(scratch.len());
650                                    publish_with_policy(
651                                        tx,
652                                        &scratch[..out_n],
653                                        on_full,
654                                        &mut backpressure_wait,
655                                    )
656                                    .map_err(|_| ExecError::new("stage publish failed"))?;
657                                }
658                                _ => {
659                                    return Err(ExecError::new(
660                                        "unsupported transform outcome (non-exhaustive)",
661                                    ));
662                                }
663                            }
664
665                            work.note_progress();
666                            Ok::<(), ExecError>(())
667                        })
668                        .map_err(|_| ExecError::new("stage recv failed"))?;
669                    match res {
670                        None => false,
671                        Some(r) => {
672                            r?;
673                            true
674                        }
675                    }
676                }
677                ResolvedStage::Stateful(stateful) => {
678                    let res = rx
679                        .recv_with(|input| {
680                            let mut work = activity.start_work(role_activity);
681                            publish_with_policy_with_result(
682                                tx,
683                                on_full,
684                                &mut backpressure_wait,
685                                |slot| stateful.call(input, slot),
686                            )
687                            .map_err(|error| match error {
688                                byteor_pipeline_kernel::LaneTxWithError::Encode(error) => {
689                                    ExecError::with_context("stage error", error.to_string())
690                                }
691                                _ => ExecError::new("stage publish failed"),
692                            })?;
693                            work.note_progress();
694                            Ok::<(), ExecError>(())
695                        })
696                        .map_err(|_| ExecError::new("stage recv failed"))?;
697                    match res {
698                        None => false,
699                        Some(r) => {
700                            r?;
701                            true
702                        }
703                    }
704                }
705            };
706
707            if did {
708                idle_wait.reset();
709            } else {
710                idle_wait.wait();
711            }
712        }
713
714        Ok(())
715    }
716
717    fn run_stage_role_loop_copy<Rx, Tx>(
718        stop: &AtomicBool,
719        activity: &RuntimeActivity,
720        role_activity: &RuntimeRoleActivity,
721        rx: &mut Rx,
722        tx: &mut Tx,
723        mut stage: ResolvedStage,
724        on_full: OnFullV1,
725    ) -> Result<(), ExecError>
726    where
727        Rx: LaneRx,
728        Tx: LaneTx,
729    {
730        let mut idle_wait = SpinWait::default();
731        let mut backpressure_wait = SpinWait::default();
732        let mut input = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
733        let mut scratch = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
734
735        while !stop.load(Ordering::Relaxed) {
736            let Some(n) = rx.recv(&mut input) else {
737                idle_wait.wait();
738                continue;
739            };
740
741            let mut work = activity.start_work(role_activity);
742            let input = &input[..n];
743            match &mut stage {
744                ResolvedStage::MapOk(map_ok) => {
745                    let out_n = map_ok(input, &mut scratch).min(scratch.len());
746                    publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
747                        .map_err(|_| ExecError::new("stage publish failed"))?;
748                }
749                ResolvedStage::Map(map) => {
750                    let out_n = map(input, &mut scratch)
751                        .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
752                        .min(scratch.len());
753                    publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
754                        .map_err(|_| ExecError::new("stage publish failed"))?;
755                }
756                ResolvedStage::Filter(filter) => {
757                    if filter(input)
758                        .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
759                    {
760                        publish_with_policy(tx, input, on_full, &mut backpressure_wait)
761                            .map_err(|_| ExecError::new("stage publish failed"))?;
762                    }
763                }
764                ResolvedStage::Inspect(inspect) => {
765                    inspect(input)
766                        .map_err(|e| ExecError::with_context("stage error", e.to_string()))?;
767                    publish_with_policy(tx, input, on_full, &mut backpressure_wait)
768                        .map_err(|_| ExecError::new("stage publish failed"))?;
769                }
770                ResolvedStage::StatefulTransform(stateful) => {
771                    let outcome = stateful
772                        .call(input, &mut scratch)
773                        .map_err(|e| ExecError::with_context("stage error", e.to_string()))?;
774                    match outcome {
775                        TransformOutcome::Drop => {}
776                        TransformOutcome::ForwardInput => {
777                            publish_with_policy(tx, input, on_full, &mut backpressure_wait)
778                                .map_err(|_| ExecError::new("stage publish failed"))?;
779                        }
780                        TransformOutcome::ForwardOutput(out_n) => {
781                            let out_n = out_n.min(scratch.len());
782                            publish_with_policy(
783                                tx,
784                                &scratch[..out_n],
785                                on_full,
786                                &mut backpressure_wait,
787                            )
788                            .map_err(|_| ExecError::new("stage publish failed"))?;
789                        }
790                        _ => {
791                            return Err(ExecError::new(
792                                "unsupported transform outcome (non-exhaustive)",
793                            ));
794                        }
795                    }
796                }
797                ResolvedStage::Stateful(stateful) => {
798                    let out_n = stateful
799                        .call(input, &mut scratch)
800                        .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
801                        .min(scratch.len());
802                    publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
803                        .map_err(|_| ExecError::new("stage publish failed"))?;
804                }
805            }
806
807            work.note_progress();
808            idle_wait.reset();
809        }
810
811        Ok(())
812    }
813
814    fn run_bridge_role_loop<Rx, Tx>(
815        stop: &AtomicBool,
816        activity: &RuntimeActivity,
817        role_activity: &RuntimeRoleActivity,
818        rx: &mut Rx,
819        tx: &mut Tx,
820        on_full: OnFullV1,
821    ) -> Result<(), ExecError>
822    where
823        Rx: LaneRxBorrow,
824        Tx: LaneTxWith,
825    {
826        let mut idle_wait = SpinWait::default();
827        let mut backpressure_wait = SpinWait::default();
828
829        while !stop.load(Ordering::Relaxed) {
830            let res = rx
831                .recv_with(|input| {
832                    let mut work = activity.start_work(role_activity);
833                    publish_with_policy_with(tx, on_full, &mut backpressure_wait, |slot| {
834                        let n = input.len().min(slot.len());
835                        slot[..n].copy_from_slice(&input[..n]);
836                        n
837                    })
838                    .map_err(|_| ExecError::new("bridge publish failed"))?;
839                    work.note_progress();
840                    Ok::<(), ExecError>(())
841                })
842                .map_err(|_| ExecError::new("bridge recv failed"))?;
843            let did = match res {
844                None => false,
845                Some(r) => {
846                    r?;
847                    true
848                }
849            };
850            if did {
851                idle_wait.reset();
852            } else {
853                idle_wait.wait();
854            }
855        }
856        Ok(())
857    }
858
859    fn run_bridge_role_loop_copy<Rx, Tx>(
860        stop: &AtomicBool,
861        activity: &RuntimeActivity,
862        role_activity: &RuntimeRoleActivity,
863        rx: &mut Rx,
864        tx: &mut Tx,
865        on_full: OnFullV1,
866    ) -> Result<(), ExecError>
867    where
868        Rx: LaneRx,
869        Tx: LaneTx,
870    {
871        let mut idle_wait = SpinWait::default();
872        let mut backpressure_wait = SpinWait::default();
873        let mut buf = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
874
875        while !stop.load(Ordering::Relaxed) {
876            let Some(n) = rx.recv(&mut buf) else {
877                idle_wait.wait();
878                continue;
879            };
880
881            let mut work = activity.start_work(role_activity);
882            publish_with_policy(tx, &buf[..n], on_full, &mut backpressure_wait)
883                .map_err(|_| ExecError::new("bridge publish failed"))?;
884            work.note_progress();
885            idle_wait.reset();
886        }
887
888        Ok(())
889    }
890
891    fn run_stage_role_loop_sequenced_slots(
892        stop: &AtomicBool,
893        activity: &RuntimeActivity,
894        role_activity: &RuntimeRoleActivity,
895        rx: &mut SequencedSlotsRxOne,
896        tx: &mut ShmSequencedSlotsProducer,
897        mut stage: ResolvedStage,
898        on_full: OnFullV1,
899    ) -> Result<(), ExecError> {
900        let mut idle_wait = SpinWait::default();
901        let mut backpressure_wait = SpinWait::default();
902        let mut input = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
903        let mut scratch = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
904
905        while !stop.load(Ordering::Relaxed) {
906            let n = match rx.sub.recv_next_into_from_stoppable(
907                &rx.barrier,
908                &mut idle_wait,
909                &mut input,
910                stop,
911            ) {
912                Ok((_seq, n)) => n,
913                Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => break,
914                Err(e) => {
915                    return Err(ExecError::with_source(
916                        "sequenced-slots stage recv failed",
917                        e,
918                    ))
919                }
920            };
921
922            let mut work = activity.start_work(role_activity);
923            let input = &input[..n];
924            match &mut stage {
925                ResolvedStage::MapOk(map_ok) => {
926                    let out_n = map_ok(input, &mut scratch).min(scratch.len());
927                    publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
928                        .map_err(|_| ExecError::new("stage publish failed"))?;
929                }
930                ResolvedStage::Map(map) => {
931                    let out_n = map(input, &mut scratch)
932                        .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
933                        .min(scratch.len());
934                    publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
935                        .map_err(|_| ExecError::new("stage publish failed"))?;
936                }
937                ResolvedStage::Filter(filter) => {
938                    if filter(input)
939                        .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
940                    {
941                        publish_with_policy(tx, input, on_full, &mut backpressure_wait)
942                            .map_err(|_| ExecError::new("stage publish failed"))?;
943                    }
944                }
945                ResolvedStage::Inspect(inspect) => {
946                    inspect(input)
947                        .map_err(|e| ExecError::with_context("stage error", e.to_string()))?;
948                    publish_with_policy(tx, input, on_full, &mut backpressure_wait)
949                        .map_err(|_| ExecError::new("stage publish failed"))?;
950                }
951                ResolvedStage::StatefulTransform(stateful) => {
952                    let outcome = stateful
953                        .call(input, &mut scratch)
954                        .map_err(|e| ExecError::with_context("stage error", e.to_string()))?;
955                    match outcome {
956                        TransformOutcome::Drop => {}
957                        TransformOutcome::ForwardInput => {
958                            publish_with_policy(tx, input, on_full, &mut backpressure_wait)
959                                .map_err(|_| ExecError::new("stage publish failed"))?;
960                        }
961                        TransformOutcome::ForwardOutput(out_n) => {
962                            let out_n = out_n.min(scratch.len());
963                            publish_with_policy(
964                                tx,
965                                &scratch[..out_n],
966                                on_full,
967                                &mut backpressure_wait,
968                            )
969                            .map_err(|_| ExecError::new("stage publish failed"))?;
970                        }
971                        _ => {
972                            return Err(ExecError::new(
973                                "unsupported transform outcome (non-exhaustive)",
974                            ));
975                        }
976                    }
977                }
978                ResolvedStage::Stateful(stateful) => {
979                    let out_n = stateful
980                        .call(input, &mut scratch)
981                        .map_err(|e| ExecError::with_context("stage error", e.to_string()))?
982                        .min(scratch.len());
983                    publish_with_policy(tx, &scratch[..out_n], on_full, &mut backpressure_wait)
984                        .map_err(|_| ExecError::new("stage publish failed"))?;
985                }
986            }
987
988            work.note_progress();
989            idle_wait.reset();
990        }
991
992        Ok(())
993    }
994
995    fn run_bridge_role_loop_sequenced_slots(
996        stop: &AtomicBool,
997        activity: &RuntimeActivity,
998        role_activity: &RuntimeRoleActivity,
999        rx: &mut SequencedSlotsRxOne,
1000        tx: &mut ShmSequencedSlotsProducer,
1001        on_full: OnFullV1,
1002    ) -> Result<(), ExecError> {
1003        let mut idle_wait = SpinWait::default();
1004        let mut backpressure_wait = SpinWait::default();
1005        let mut buf = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
1006
1007        while !stop.load(Ordering::Relaxed) {
1008            let n = match rx.sub.recv_next_into_from_stoppable(
1009                &rx.barrier,
1010                &mut idle_wait,
1011                &mut buf,
1012                stop,
1013            ) {
1014                Ok((_seq, n)) => n,
1015                Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => break,
1016                Err(e) => {
1017                    return Err(ExecError::with_source(
1018                        "sequenced-slots bridge recv failed",
1019                        e,
1020                    ));
1021                }
1022            };
1023
1024            let mut work = activity.start_work(role_activity);
1025            publish_with_policy(tx, &buf[..n], on_full, &mut backpressure_wait)
1026                .map_err(|_| ExecError::new("bridge publish failed"))?;
1027            work.note_progress();
1028            idle_wait.reset();
1029        }
1030
1031        Ok(())
1032    }
1033
1034    fn run_merge_role_loop<Rx, Tx>(
1035        stop: &AtomicBool,
1036        activity: &RuntimeActivity,
1037        role_activity: &RuntimeRoleActivity,
1038        rxs: &mut [Rx],
1039        tx: &mut Tx,
1040        policy: MergePolicyV1,
1041        on_full: OnFullV1,
1042    ) -> Result<(), ExecError>
1043    where
1044        Rx: LaneRx,
1045        Tx: LaneTx,
1046    {
1047        if rxs.is_empty() {
1048            return Err(ExecError::new("merge requires at least one rx lane"));
1049        }
1050        if !matches!(policy, MergePolicyV1::RoundRobin) {
1051            return Err(ExecError::new("unsupported merge policy"));
1052        }
1053
1054        let mut idle_wait = SpinWait::default();
1055        let mut backpressure_wait = SpinWait::default();
1056        let mut rr: usize = 0;
1057        let mut buf = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
1058
1059        while !stop.load(Ordering::Relaxed) {
1060            let n = rxs.len();
1061            let start = rr % n;
1062            let mut did_any = false;
1063
1064            for offset in 0..n {
1065                let idx = (start + offset) % n;
1066                let Some(msg_n) = rxs[idx].recv(&mut buf) else {
1067                    continue;
1068                };
1069                let mut work = activity.start_work(role_activity);
1070                publish_with_policy(tx, &buf[..msg_n], on_full, &mut backpressure_wait)
1071                    .map_err(|_| ExecError::new("merge publish failed"))?;
1072                work.note_progress();
1073                rr = (idx + 1) % n;
1074                did_any = true;
1075                break;
1076            }
1077
1078            if !did_any {
1079                rr = (start + 1) % n;
1080                idle_wait.wait();
1081            } else {
1082                idle_wait.reset();
1083            }
1084        }
1085
1086        Ok(())
1087    }
1088
1089    fn run_router_role_loop(
1090        stop: &AtomicBool,
1091        activity: &RuntimeActivity,
1092        role_activity: &RuntimeRoleActivity,
1093        router: &ShmFanoutRouter,
1094        fanout_rxs: &mut [ShmFanoutRx],
1095        txs: &mut [AnyEventsTx],
1096        on_full: OnFullV1,
1097    ) -> Result<(), ExecError> {
1098        if fanout_rxs.is_empty() || txs.is_empty() {
1099            return Err(ExecError::new("router requires non-empty lanes"));
1100        }
1101        if fanout_rxs.len() != txs.len() {
1102            return Err(ExecError::new("router lane count mismatch"));
1103        }
1104
1105        let mut idle_wait = SpinWait::default();
1106        let mut backpressure_wait = SpinWait::default();
1107        let mut buf = [0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
1108
1109        while !stop.load(Ordering::Relaxed) {
1110            let routed = router.route_once(RouterMode::Broadcast);
1111            if !routed {
1112                idle_wait.wait();
1113                continue;
1114            }
1115
1116            let mut work = activity.start_work(role_activity);
1117            idle_wait.reset();
1118
1119            for (i, (rx, tx)) in fanout_rxs.iter_mut().zip(txs.iter_mut()).enumerate() {
1120                let Some(n) = rx.recv(&mut buf) else {
1121                    return Err(ExecError::with_context(
1122                        "fanout consumer recv failed",
1123                        format!("consumer {i}"),
1124                    ));
1125                };
1126                publish_with_policy(tx, &buf[..n], on_full, &mut backpressure_wait)
1127                    .map_err(|_| ExecError::new("router publish failed"))?;
1128            }
1129
1130            work.note_progress();
1131        }
1132
1133        Ok(())
1134    }
1135
1136    /// Handle for a spawned LaneGraph SP runtime.
1137    pub struct LaneGraphSpRuntime {
1138        stop: Arc<AtomicBool>,
1139        activity: Arc<RuntimeActivity>,
1140        joins: Vec<JoinHandle<Result<(), ExecError>>>,
1141    }
1142
1143    impl LaneGraphSpRuntime {
1144        /// Snapshot per-role runtime activity for operator-facing monitoring.
1145        pub fn role_activity_snapshot(&self) -> Vec<LaneGraphRoleActivitySnapshot> {
1146            self.activity
1147                .roles
1148                .iter()
1149                .map(|role| role.snapshot())
1150                .collect()
1151        }
1152
1153        /// Wait until all roles are idle and no new progress is observed for a short settle window.
1154        pub fn wait_until_idle(&self, timeout: Duration) -> Result<(), ExecError> {
1155            const POLL_INTERVAL: Duration = Duration::from_millis(10);
1156            const IDLE_SETTLE: Duration = Duration::from_millis(20);
1157
1158            let start = Instant::now();
1159            let mut stable_since: Option<Instant> = None;
1160            let mut last_epoch = self.activity.progress_epoch.load(Ordering::Relaxed);
1161
1162            loop {
1163                if self.joins.iter().any(|j| j.is_finished()) {
1164                    return Err(ExecError::new("role exited before runtime became idle"));
1165                }
1166
1167                let active_roles = self.activity.active_roles.load(Ordering::Relaxed);
1168                let epoch = self.activity.progress_epoch.load(Ordering::Relaxed);
1169
1170                if active_roles == 0 {
1171                    if epoch == last_epoch {
1172                        let stable = stable_since.get_or_insert_with(Instant::now);
1173                        if stable.elapsed() >= IDLE_SETTLE {
1174                            return Ok(());
1175                        }
1176                    } else {
1177                        last_epoch = epoch;
1178                        stable_since = Some(Instant::now());
1179                    }
1180                } else {
1181                    last_epoch = epoch;
1182                    stable_since = None;
1183                }
1184
1185                if start.elapsed() >= timeout {
1186                    return Err(ExecError::with_context(
1187                        "lane graph idle wait timed out",
1188                        format!(
1189                            "timeout_ms={} active_roles={} progress_epoch={epoch}",
1190                            timeout.as_millis(),
1191                            active_roles,
1192                        ),
1193                    ));
1194                }
1195
1196                std::thread::sleep(POLL_INTERVAL);
1197            }
1198        }
1199
1200        /// Request a cooperative stop.
1201        pub fn request_stop(&self) {
1202            self.stop.store(true, Ordering::Relaxed);
1203        }
1204
1205        /// Join all role threads.
1206        pub fn join(&mut self) -> Result<(), ExecError> {
1207            for j in self.joins.drain(..) {
1208                let r = j
1209                    .join()
1210                    .map_err(|_| ExecError::new("role thread panicked"))?;
1211                r?;
1212            }
1213            Ok(())
1214        }
1215
1216        /// Request stop and join all role threads.
1217        pub fn stop_and_join(mut self) -> Result<(), ExecError> {
1218            self.request_stop();
1219            self.join()
1220        }
1221    }
1222
1223    /// Spawn a SHM-backed LaneGraph SP runtime (thread-per-role).
1224    ///
1225    /// Current scope: Events + FanoutBroadcast + Journal + SequencedSlots.
1226    pub fn spawn_lane_graph_sp_shm_runtime(
1227        spec: &byteor_pipeline_spec::PipelineSpecV1,
1228        resolver: &dyn StageResolver,
1229        options: SpShmOptions,
1230    ) -> Result<LaneGraphSpRuntime, ExecError> {
1231        spawn_lane_graph_sp_shm_runtime_with_thread_init(spec, resolver, options, None)
1232    }
1233
1234    /// Spawn a SHM-backed LaneGraph SP runtime (thread-per-role), invoking `thread_init` at the
1235    /// start of each role thread (before entering hot loops).
1236    pub fn spawn_lane_graph_sp_shm_runtime_with_thread_init(
1237        spec: &byteor_pipeline_spec::PipelineSpecV1,
1238        resolver: &dyn StageResolver,
1239        options: SpShmOptions,
1240        thread_init: Option<ThreadInitHook>,
1241    ) -> Result<LaneGraphSpRuntime, ExecError> {
1242        options.ensure_dir()?;
1243
1244        crate::validate_sp(spec)?;
1245
1246        let byteor_pipeline_spec::PipelineSpecV1::LaneGraph(lg) = spec else {
1247            return Err(ExecError::new("spec is not a lane graph"));
1248        };
1249
1250        let lane_kinds: BTreeMap<String, LaneKindV1> =
1251            lg.lanes.iter().map(|l| (l.name.clone(), l.kind)).collect();
1252        let lane_kinds = Arc::new(lane_kinds);
1253
1254        let events_kinds = Arc::new(events_attach_kind_by_lane(lg));
1255
1256        let on_full = lg.on_full;
1257
1258        let stop = Arc::new(AtomicBool::new(false));
1259        let role_activities = lg
1260            .roles
1261            .iter()
1262            .map(|role| {
1263                Arc::new(RuntimeRoleActivity::new(
1264                    role.name().to_string(),
1265                    role_kind(role),
1266                ))
1267            })
1268            .collect::<Vec<_>>();
1269        let activity = Arc::new(RuntimeActivity::new(role_activities));
1270        let mut joins: Vec<JoinHandle<Result<(), ExecError>>> = Vec::with_capacity(lg.roles.len());
1271
1272        for (role, role_activity) in lg.roles.iter().cloned().zip(activity.roles.iter().cloned()) {
1273            let stop_t = stop.clone();
1274            let activity_t = activity.clone();
1275            let role_activity_t = role_activity.clone();
1276            let options_t = options.clone();
1277            let lane_kinds_t = lane_kinds.clone();
1278            let events_kinds_t = events_kinds.clone();
1279            let thread_init_t = thread_init.clone();
1280
1281            let role_name = role.name().to_string();
1282
1283            let join = match role {
1284                RoleCfgV1::Stage(cfg) => {
1285                    let stage = resolve_or_err(resolver, cfg.stage.as_str())?;
1286                    std::thread::Builder::new()
1287                        .name(format!("byteor-sp:{}", role_name))
1288                        .spawn(move || {
1289                            if let Some(h) = &thread_init_t {
1290                                let ctx =
1291                                    ThreadInitContext::new(role_name.clone(), RoleKind::Stage);
1292                                h(&ctx)?;
1293                            }
1294
1295                            match (
1296                                lane_kinds_t.get(cfg.rx.as_str()).copied(),
1297                                lane_kinds_t.get(cfg.tx.as_str()).copied(),
1298                            ) {
1299                                (Some(LaneKindV1::Events), Some(LaneKindV1::Events)) => {
1300                                    let rx_kind = events_kinds_t
1301                                        .get(cfg.rx.as_str())
1302                                        .copied()
1303                                        .unwrap_or(EventsAttachKind::Mpsc);
1304                                    let tx_kind = events_kinds_t
1305                                        .get(cfg.tx.as_str())
1306                                        .copied()
1307                                        .unwrap_or(EventsAttachKind::Mpsc);
1308
1309                                    let mut rx =
1310                                        attach_events_rx_any(&options_t, cfg.rx.as_str(), rx_kind)?;
1311                                    let mut tx =
1312                                        attach_events_tx_any(&options_t, cfg.tx.as_str(), tx_kind)?;
1313
1314                                    run_stage_role_loop(
1315                                        stop_t.as_ref(),
1316                                        activity_t.as_ref(),
1317                                        role_activity_t.as_ref(),
1318                                        &mut rx,
1319                                        &mut tx,
1320                                        stage,
1321                                        on_full,
1322                                    )
1323                                }
1324                                (Some(LaneKindV1::Journal), Some(LaneKindV1::Journal)) => {
1325                                    let mut rx =
1326                                        attach_journal_rx_one(&options_t, cfg.rx.as_str())?;
1327                                    let mut tx =
1328                                        attach_journal_tx_one(&options_t, cfg.tx.as_str())?;
1329
1330                                    run_stage_role_loop_copy(
1331                                        stop_t.as_ref(),
1332                                        activity_t.as_ref(),
1333                                        role_activity_t.as_ref(),
1334                                        &mut rx,
1335                                        &mut tx,
1336                                        stage,
1337                                        on_full,
1338                                    )
1339                                }
1340                                (
1341                                    Some(LaneKindV1::SequencedSlots { .. }),
1342                                    Some(LaneKindV1::SequencedSlots { .. }),
1343                                ) => {
1344                                    let mut rx =
1345                                        attach_sequenced_slots_rx_one(&options_t, cfg.rx.as_str())?;
1346                                    let mut tx =
1347                                        attach_sequenced_slots_tx_one(&options_t, cfg.tx.as_str())?;
1348
1349                                    run_stage_role_loop_sequenced_slots(
1350                                        stop_t.as_ref(),
1351                                        activity_t.as_ref(),
1352                                        role_activity_t.as_ref(),
1353                                        &mut rx,
1354                                        &mut tx,
1355                                        stage,
1356                                        on_full,
1357                                    )
1358                                }
1359                                _ => Err(ExecError::new("unsupported stage rx lane kind")),
1360                            }
1361                        })
1362                        .map_err(|e| ExecError::with_source("failed to spawn role thread", e))?
1363                }
1364                RoleCfgV1::Bridge(cfg) => std::thread::Builder::new()
1365                    .name(format!("byteor-sp:{}", role_name))
1366                    .spawn(move || {
1367                        if let Some(h) = &thread_init_t {
1368                            let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Bridge);
1369                            h(&ctx)?;
1370                        }
1371
1372                        let Some(rx_kind) = lane_kinds_t.get(cfg.rx.as_str()).copied() else {
1373                            return Err(ExecError::new("unknown bridge rx lane"));
1374                        };
1375                        let Some(tx_kind) = lane_kinds_t.get(cfg.tx.as_str()).copied() else {
1376                            return Err(ExecError::new("unknown bridge tx lane"));
1377                        };
1378
1379                        match (rx_kind, tx_kind) {
1380                            (LaneKindV1::Events, LaneKindV1::Events) => {
1381                                let rx_attach = events_kinds_t
1382                                    .get(cfg.rx.as_str())
1383                                    .copied()
1384                                    .unwrap_or(EventsAttachKind::Mpsc);
1385                                let tx_attach = events_kinds_t
1386                                    .get(cfg.tx.as_str())
1387                                    .copied()
1388                                    .unwrap_or(EventsAttachKind::Mpsc);
1389                                let mut rx =
1390                                    attach_events_rx_any(&options_t, cfg.rx.as_str(), rx_attach)?;
1391                                let mut tx =
1392                                    attach_events_tx_any(&options_t, cfg.tx.as_str(), tx_attach)?;
1393                                run_bridge_role_loop(
1394                                    stop_t.as_ref(),
1395                                    activity_t.as_ref(),
1396                                    role_activity_t.as_ref(),
1397                                    &mut rx,
1398                                    &mut tx,
1399                                    on_full,
1400                                )
1401                            }
1402                            (LaneKindV1::Events, LaneKindV1::FanoutBroadcast { .. }) => {
1403                                let rx_attach = events_kinds_t
1404                                    .get(cfg.rx.as_str())
1405                                    .copied()
1406                                    .unwrap_or(EventsAttachKind::Mpsc);
1407                                let mut rx =
1408                                    attach_events_rx_any(&options_t, cfg.rx.as_str(), rx_attach)?;
1409                                let mut tx = attach_fanout_tx_one(&options_t, cfg.tx.as_str())?;
1410                                run_bridge_role_loop(
1411                                    stop_t.as_ref(),
1412                                    activity_t.as_ref(),
1413                                    role_activity_t.as_ref(),
1414                                    &mut rx,
1415                                    &mut tx,
1416                                    on_full,
1417                                )
1418                            }
1419                            (LaneKindV1::Journal, LaneKindV1::Journal) => {
1420                                let mut rx = attach_journal_rx_one(&options_t, cfg.rx.as_str())?;
1421                                let mut tx = attach_journal_tx_one(&options_t, cfg.tx.as_str())?;
1422                                run_bridge_role_loop_copy(
1423                                    stop_t.as_ref(),
1424                                    activity_t.as_ref(),
1425                                    role_activity_t.as_ref(),
1426                                    &mut rx,
1427                                    &mut tx,
1428                                    on_full,
1429                                )
1430                            }
1431                            (
1432                                LaneKindV1::SequencedSlots { .. },
1433                                LaneKindV1::SequencedSlots { .. },
1434                            ) => {
1435                                let mut rx =
1436                                    attach_sequenced_slots_rx_one(&options_t, cfg.rx.as_str())?;
1437                                let mut tx =
1438                                    attach_sequenced_slots_tx_one(&options_t, cfg.tx.as_str())?;
1439                                run_bridge_role_loop_sequenced_slots(
1440                                    stop_t.as_ref(),
1441                                    activity_t.as_ref(),
1442                                    role_activity_t.as_ref(),
1443                                    &mut rx,
1444                                    &mut tx,
1445                                    on_full,
1446                                )
1447                            }
1448                            _ => Err(ExecError::new("unsupported bridge lane kinds")),
1449                        }
1450                    })
1451                    .map_err(|e| ExecError::with_source("failed to spawn role thread", e))?,
1452                RoleCfgV1::Merge(cfg) => std::thread::Builder::new()
1453                    .name(format!("byteor-sp:{}", role_name))
1454                    .spawn(move || {
1455                        if let Some(h) = &thread_init_t {
1456                            let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Merge);
1457                            h(&ctx)?;
1458                        }
1459
1460                        match lane_kinds_t.get(cfg.tx.as_str()).copied() {
1461                            Some(LaneKindV1::Events) => {
1462                                let tx_attach = events_kinds_t
1463                                    .get(cfg.tx.as_str())
1464                                    .copied()
1465                                    .unwrap_or(EventsAttachKind::Mpsc);
1466                                let mut tx =
1467                                    attach_events_tx_any(&options_t, cfg.tx.as_str(), tx_attach)?;
1468
1469                                let mut rxs: Vec<AnyEventsRx> = Vec::with_capacity(cfg.rx.len());
1470                                for rx_lane in &cfg.rx {
1471                                    let Some(LaneKindV1::Events) =
1472                                        lane_kinds_t.get(rx_lane.as_str()).copied()
1473                                    else {
1474                                        return Err(ExecError::new(
1475                                            "unsupported merge rx lane kind",
1476                                        ));
1477                                    };
1478                                    let rx_attach = events_kinds_t
1479                                        .get(rx_lane.as_str())
1480                                        .copied()
1481                                        .unwrap_or(EventsAttachKind::Mpsc);
1482                                    rxs.push(attach_events_rx_any(
1483                                        &options_t,
1484                                        rx_lane.as_str(),
1485                                        rx_attach,
1486                                    )?);
1487                                }
1488
1489                                run_merge_role_loop(
1490                                    stop_t.as_ref(),
1491                                    activity_t.as_ref(),
1492                                    role_activity_t.as_ref(),
1493                                    &mut rxs,
1494                                    &mut tx,
1495                                    cfg.policy,
1496                                    on_full,
1497                                )
1498                            }
1499                            Some(LaneKindV1::Journal) => {
1500                                let mut tx = attach_journal_tx_one(&options_t, cfg.tx.as_str())?;
1501
1502                                let mut rxs: Vec<ShmJournalSubscriber> =
1503                                    Vec::with_capacity(cfg.rx.len());
1504                                for rx_lane in &cfg.rx {
1505                                    let Some(LaneKindV1::Journal) =
1506                                        lane_kinds_t.get(rx_lane.as_str()).copied()
1507                                    else {
1508                                        return Err(ExecError::new(
1509                                            "unsupported merge rx lane kind",
1510                                        ));
1511                                    };
1512                                    rxs.push(attach_journal_rx_one(&options_t, rx_lane.as_str())?);
1513                                }
1514
1515                                run_merge_role_loop(
1516                                    stop_t.as_ref(),
1517                                    activity_t.as_ref(),
1518                                    role_activity_t.as_ref(),
1519                                    &mut rxs,
1520                                    &mut tx,
1521                                    cfg.policy,
1522                                    on_full,
1523                                )
1524                            }
1525                            _ => Err(ExecError::new("unsupported merge tx lane kind")),
1526                        }
1527                    })
1528                    .map_err(|e| ExecError::with_source("failed to spawn role thread", e))?,
1529                RoleCfgV1::Router(cfg) => std::thread::Builder::new()
1530                    .name(format!("byteor-sp:{}", role_name))
1531                    .spawn(move || {
1532                        if let Some(h) = &thread_init_t {
1533                            let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Router);
1534                            h(&ctx)?;
1535                        }
1536
1537                        let Some(LaneKindV1::FanoutBroadcast { consumers }) =
1538                            lane_kinds_t.get(cfg.rx.as_str()).copied()
1539                        else {
1540                            return Err(ExecError::new("unsupported router rx lane kind"));
1541                        };
1542                        let consumers = consumers as usize;
1543                        if consumers > 4 {
1544                            return Err(ExecError::new(
1545                                "fanout consumers > 4 is not supported by shm backing",
1546                            ));
1547                        }
1548                        if consumers != cfg.tx.len() {
1549                            return Err(ExecError::new("router tx lane count mismatch"));
1550                        }
1551
1552                        let router = attach_fanout_router_one(&options_t, cfg.rx.as_str())?;
1553
1554                        let mut fanout_rxs: Vec<ShmFanoutRx> = Vec::with_capacity(consumers);
1555                        for idx in 0..consumers {
1556                            fanout_rxs.push(attach_fanout_rx_one(
1557                                &options_t,
1558                                cfg.rx.as_str(),
1559                                idx,
1560                            )?);
1561                        }
1562
1563                        let mut txs: Vec<AnyEventsTx> = Vec::with_capacity(consumers);
1564                        for tx_lane in &cfg.tx {
1565                            let Some(LaneKindV1::Events) =
1566                                lane_kinds_t.get(tx_lane.as_str()).copied()
1567                            else {
1568                                return Err(ExecError::new("router tx lanes must be events"));
1569                            };
1570                            let tx_attach = events_kinds_t
1571                                .get(tx_lane.as_str())
1572                                .copied()
1573                                .unwrap_or(EventsAttachKind::Mpsc);
1574                            txs.push(attach_events_tx_any(
1575                                &options_t,
1576                                tx_lane.as_str(),
1577                                tx_attach,
1578                            )?);
1579                        }
1580
1581                        run_router_role_loop(
1582                            stop_t.as_ref(),
1583                            activity_t.as_ref(),
1584                            role_activity_t.as_ref(),
1585                            &router,
1586                            &mut fanout_rxs,
1587                            &mut txs,
1588                            on_full,
1589                        )
1590                    })
1591                    .map_err(|e| ExecError::with_source("failed to spawn role thread", e))?,
1592            };
1593
1594            joins.push(join);
1595        }
1596
1597        Ok(LaneGraphSpRuntime {
1598            stop,
1599            activity,
1600            joins,
1601        })
1602    }
1603
1604    fn role_kind(role: &RoleCfgV1) -> RoleKind {
1605        match role {
1606            RoleCfgV1::Stage(_) => RoleKind::Stage,
1607            RoleCfgV1::Bridge(_) => RoleKind::Bridge,
1608            RoleCfgV1::Router(_) => RoleKind::Router,
1609            RoleCfgV1::Merge(_) => RoleKind::Merge,
1610        }
1611    }
1612
1613    /// Run exactly one LaneGraph role in the current process using SHM-backed lanes.
1614    ///
1615    /// This is the “MP role runner” primitive: external supervisors can run each role in its own
1616    /// process, while this function runs the selected role loop until `stop` is set.
1617    ///
1618    /// Current scope: Events + FanoutBroadcast + Journal + SequencedSlots.
1619    pub fn run_lane_graph_mp_role_shm(
1620        spec: &byteor_pipeline_spec::PipelineSpecV1,
1621        resolver: &dyn StageResolver,
1622        options: SpShmOptions,
1623        role_name: &str,
1624        stop: &AtomicBool,
1625    ) -> Result<(), ExecError> {
1626        options.ensure_dir()?;
1627
1628        crate::validate_sp(spec)?;
1629
1630        let byteor_pipeline_spec::PipelineSpecV1::LaneGraph(lg) = spec else {
1631            return Err(ExecError::new("spec is not a lane graph"));
1632        };
1633
1634        let role = lg
1635            .roles
1636            .iter()
1637            .find(|r| r.name() == role_name)
1638            .ok_or_else(|| ExecError::with_context("unknown role", role_name.to_string()))?;
1639
1640        let lane_kinds: BTreeMap<String, LaneKindV1> =
1641            lg.lanes.iter().map(|l| (l.name.clone(), l.kind)).collect();
1642        let events_kinds = events_attach_kind_by_lane(lg);
1643        let on_full = lg.on_full;
1644        let role_activity = RuntimeRoleActivity::new(role.name().to_string(), role_kind(role));
1645        let activity = RuntimeActivity::new(Vec::new());
1646
1647        match role {
1648            RoleCfgV1::Stage(cfg) => {
1649                let stage = resolve_or_err(resolver, cfg.stage.as_str())?;
1650                match (
1651                    lane_kinds.get(cfg.rx.as_str()).copied(),
1652                    lane_kinds.get(cfg.tx.as_str()).copied(),
1653                ) {
1654                    (Some(LaneKindV1::Events), Some(LaneKindV1::Events)) => {
1655                        let rx_kind = events_kinds
1656                            .get(cfg.rx.as_str())
1657                            .copied()
1658                            .unwrap_or(EventsAttachKind::Mpsc);
1659                        let tx_kind = events_kinds
1660                            .get(cfg.tx.as_str())
1661                            .copied()
1662                            .unwrap_or(EventsAttachKind::Mpsc);
1663
1664                        let mut rx = attach_events_rx_any(&options, cfg.rx.as_str(), rx_kind)?;
1665                        let mut tx = attach_events_tx_any(&options, cfg.tx.as_str(), tx_kind)?;
1666                        run_stage_role_loop(
1667                            stop,
1668                            &activity,
1669                            &role_activity,
1670                            &mut rx,
1671                            &mut tx,
1672                            stage,
1673                            on_full,
1674                        )
1675                    }
1676                    (Some(LaneKindV1::Journal), Some(LaneKindV1::Journal)) => {
1677                        let mut rx = attach_journal_rx_one(&options, cfg.rx.as_str())?;
1678                        let mut tx = attach_journal_tx_one(&options, cfg.tx.as_str())?;
1679                        run_stage_role_loop_copy(
1680                            stop,
1681                            &activity,
1682                            &role_activity,
1683                            &mut rx,
1684                            &mut tx,
1685                            stage,
1686                            on_full,
1687                        )
1688                    }
1689                    (
1690                        Some(LaneKindV1::SequencedSlots { .. }),
1691                        Some(LaneKindV1::SequencedSlots { .. }),
1692                    ) => {
1693                        let mut rx = attach_sequenced_slots_rx_one(&options, cfg.rx.as_str())?;
1694                        let mut tx = attach_sequenced_slots_tx_one(&options, cfg.tx.as_str())?;
1695                        run_stage_role_loop_sequenced_slots(
1696                            stop,
1697                            &activity,
1698                            &role_activity,
1699                            &mut rx,
1700                            &mut tx,
1701                            stage,
1702                            on_full,
1703                        )
1704                    }
1705                    _ => Err(ExecError::new("unsupported stage rx lane kind")),
1706                }
1707            }
1708            RoleCfgV1::Bridge(cfg) => {
1709                let Some(rx_kind) = lane_kinds.get(cfg.rx.as_str()).copied() else {
1710                    return Err(ExecError::new("unknown bridge rx lane"));
1711                };
1712                let Some(tx_kind) = lane_kinds.get(cfg.tx.as_str()).copied() else {
1713                    return Err(ExecError::new("unknown bridge tx lane"));
1714                };
1715
1716                match (rx_kind, tx_kind) {
1717                    (LaneKindV1::Events, LaneKindV1::Events) => {
1718                        let rx_attach = events_kinds
1719                            .get(cfg.rx.as_str())
1720                            .copied()
1721                            .unwrap_or(EventsAttachKind::Mpsc);
1722                        let tx_attach = events_kinds
1723                            .get(cfg.tx.as_str())
1724                            .copied()
1725                            .unwrap_or(EventsAttachKind::Mpsc);
1726                        let mut rx = attach_events_rx_any(&options, cfg.rx.as_str(), rx_attach)?;
1727                        let mut tx = attach_events_tx_any(&options, cfg.tx.as_str(), tx_attach)?;
1728                        run_bridge_role_loop(
1729                            stop,
1730                            &activity,
1731                            &role_activity,
1732                            &mut rx,
1733                            &mut tx,
1734                            on_full,
1735                        )
1736                    }
1737                    (LaneKindV1::Events, LaneKindV1::FanoutBroadcast { .. }) => {
1738                        let rx_attach = events_kinds
1739                            .get(cfg.rx.as_str())
1740                            .copied()
1741                            .unwrap_or(EventsAttachKind::Mpsc);
1742                        let mut rx = attach_events_rx_any(&options, cfg.rx.as_str(), rx_attach)?;
1743                        let mut tx = attach_fanout_tx_one(&options, cfg.tx.as_str())?;
1744                        run_bridge_role_loop(
1745                            stop,
1746                            &activity,
1747                            &role_activity,
1748                            &mut rx,
1749                            &mut tx,
1750                            on_full,
1751                        )
1752                    }
1753                    (LaneKindV1::Journal, LaneKindV1::Journal) => {
1754                        let mut rx = attach_journal_rx_one(&options, cfg.rx.as_str())?;
1755                        let mut tx = attach_journal_tx_one(&options, cfg.tx.as_str())?;
1756                        run_bridge_role_loop_copy(
1757                            stop,
1758                            &activity,
1759                            &role_activity,
1760                            &mut rx,
1761                            &mut tx,
1762                            on_full,
1763                        )
1764                    }
1765                    (LaneKindV1::SequencedSlots { .. }, LaneKindV1::SequencedSlots { .. }) => {
1766                        let mut rx = attach_sequenced_slots_rx_one(&options, cfg.rx.as_str())?;
1767                        let mut tx = attach_sequenced_slots_tx_one(&options, cfg.tx.as_str())?;
1768                        run_bridge_role_loop_sequenced_slots(
1769                            stop,
1770                            &activity,
1771                            &role_activity,
1772                            &mut rx,
1773                            &mut tx,
1774                            on_full,
1775                        )
1776                    }
1777                    _ => Err(ExecError::new("unsupported bridge lane kinds")),
1778                }
1779            }
1780            RoleCfgV1::Merge(cfg) => match lane_kinds.get(cfg.tx.as_str()).copied() {
1781                Some(LaneKindV1::Events) => {
1782                    let tx_attach = events_kinds
1783                        .get(cfg.tx.as_str())
1784                        .copied()
1785                        .unwrap_or(EventsAttachKind::Mpsc);
1786                    let mut tx = attach_events_tx_any(&options, cfg.tx.as_str(), tx_attach)?;
1787
1788                    let mut rxs: Vec<AnyEventsRx> = Vec::with_capacity(cfg.rx.len());
1789                    for rx_lane in &cfg.rx {
1790                        let Some(LaneKindV1::Events) = lane_kinds.get(rx_lane.as_str()).copied()
1791                        else {
1792                            return Err(ExecError::new("unsupported merge rx lane kind"));
1793                        };
1794                        let rx_attach = events_kinds
1795                            .get(rx_lane.as_str())
1796                            .copied()
1797                            .unwrap_or(EventsAttachKind::Mpsc);
1798                        rxs.push(attach_events_rx_any(&options, rx_lane.as_str(), rx_attach)?);
1799                    }
1800
1801                    run_merge_role_loop(
1802                        stop,
1803                        &activity,
1804                        &role_activity,
1805                        &mut rxs,
1806                        &mut tx,
1807                        cfg.policy,
1808                        on_full,
1809                    )
1810                }
1811                Some(LaneKindV1::Journal) => {
1812                    let mut tx = attach_journal_tx_one(&options, cfg.tx.as_str())?;
1813                    let mut rxs: Vec<ShmJournalSubscriber> = Vec::with_capacity(cfg.rx.len());
1814                    for rx_lane in &cfg.rx {
1815                        let Some(LaneKindV1::Journal) = lane_kinds.get(rx_lane.as_str()).copied()
1816                        else {
1817                            return Err(ExecError::new("unsupported merge rx lane kind"));
1818                        };
1819                        rxs.push(attach_journal_rx_one(&options, rx_lane.as_str())?);
1820                    }
1821
1822                    run_merge_role_loop(
1823                        stop,
1824                        &activity,
1825                        &role_activity,
1826                        &mut rxs,
1827                        &mut tx,
1828                        cfg.policy,
1829                        on_full,
1830                    )
1831                }
1832                _ => Err(ExecError::new("unsupported merge tx lane kind")),
1833            },
1834            RoleCfgV1::Router(cfg) => {
1835                let Some(LaneKindV1::FanoutBroadcast { consumers }) =
1836                    lane_kinds.get(cfg.rx.as_str()).copied()
1837                else {
1838                    return Err(ExecError::new("unsupported router rx lane kind"));
1839                };
1840                let consumers = consumers as usize;
1841                if consumers > 4 {
1842                    return Err(ExecError::new(
1843                        "fanout consumers > 4 is not supported by shm backing",
1844                    ));
1845                }
1846                if consumers != cfg.tx.len() {
1847                    return Err(ExecError::new("router tx lane count mismatch"));
1848                }
1849
1850                let router = attach_fanout_router_one(&options, cfg.rx.as_str())?;
1851
1852                let mut fanout_rxs: Vec<ShmFanoutRx> = Vec::with_capacity(consumers);
1853                for idx in 0..consumers {
1854                    fanout_rxs.push(attach_fanout_rx_one(&options, cfg.rx.as_str(), idx)?);
1855                }
1856
1857                let mut txs: Vec<AnyEventsTx> = Vec::with_capacity(consumers);
1858                for tx_lane in &cfg.tx {
1859                    let Some(LaneKindV1::Events) = lane_kinds.get(tx_lane.as_str()).copied() else {
1860                        return Err(ExecError::new("router tx lanes must be events"));
1861                    };
1862                    let tx_attach = events_kinds
1863                        .get(tx_lane.as_str())
1864                        .copied()
1865                        .unwrap_or(EventsAttachKind::Mpsc);
1866                    txs.push(attach_events_tx_any(&options, tx_lane.as_str(), tx_attach)?);
1867                }
1868
1869                run_router_role_loop(
1870                    stop,
1871                    &activity,
1872                    &role_activity,
1873                    &router,
1874                    &mut fanout_rxs,
1875                    &mut txs,
1876                    on_full,
1877                )
1878            }
1879        }
1880    }
1881}
1882
1883#[cfg(feature = "shm")]
1884pub use shm::{
1885    run_lane_graph_mp_role_shm, run_sp, spawn_lane_graph_sp_shm_runtime,
1886    spawn_lane_graph_sp_shm_runtime_with_thread_init, LaneGraphRoleActivitySnapshot,
1887    LaneGraphSpRuntime, SpShmOptions,
1888};