byteor_pipeline_exec/
single_ring.rs

1//! SingleRing SHM executor and runtime.
2
3use alloc::vec::Vec;
4
5use byteor_pipeline_kernel::step::TransformOutcome;
6use byteor_pipeline_spec::{validate_v1, PipelineSpecV1, StageOpV1};
7
8use crate::ExecError;
9use crate::{resolve_or_err, ResolvedStage, StageResolver};
10use crate::{RoleKind, ThreadInitContext, ThreadInitHook};
11
12type InspectFn = fn(&[u8]);
13type ProcessFn = fn(&mut [u8], u8);
14
15fn op_noop(_data: &[u8]) {}
16
17fn op_add_u8(data: &mut [u8], delta: u8) {
18    for b in data {
19        *b = b.wrapping_add(delta);
20    }
21}
22
23fn clamp_len(len: u32, max: usize) -> usize {
24    (len as usize).min(max)
25}
26
27enum StageExecOp {
28    BuiltIn(StageOpV1),
29    Resolved { key: String, stage: ResolvedStage },
30}
31
32fn resolve_stage_op(
33    op: StageOpV1,
34    resolver: Option<&dyn StageResolver>,
35) -> Result<StageExecOp, ExecError> {
36    match op {
37        StageOpV1::ResolverKey { stage } => {
38            let Some(resolver) = resolver else {
39                return Err(ExecError::new("missing stage resolver"));
40            };
41            let resolved = resolve_or_err(resolver, &stage).map_err(|e| {
42                ExecError::with_context("resolve stage failed", format!("stage={stage}: {e}"))
43            })?;
44            Ok(StageExecOp::Resolved {
45                key: stage,
46                stage: resolved,
47            })
48        }
49        other => Ok(StageExecOp::BuiltIn(other)),
50    }
51}
52
53/// Wait/backoff policy presets for SingleRing hot loops.
54///
55/// These presets are intentionally minimal and deterministic:
56/// - `MaxPerfSpin`: spin-first strategy for isolated CPUs.
57/// - `StdBackoff`: cooperative backoff for non-HFT deployments.
58/// - `Yield`: cooperative yield-only waiting for shared-host runs.
59#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60pub enum WaitPreset {
61    /// Max-perf preset: spin-first waiting.
62    MaxPerfSpin,
63    /// Default backoff preset (std-only): yields/sleeps as needed.
64    StdBackoff,
65    /// Yield-only preset (std-only).
66    Yield,
67}
68
69struct JoinBarrier {
70    deps: Vec<byteor_pipeline_backings_shm::GatingBarrier>,
71}
72
73impl byteor_pipeline_kernel::SeqBarrier for JoinBarrier {
74    fn available(&self) -> u64 {
75        self.deps.iter().map(|b| b.available()).min().unwrap_or(0)
76    }
77}
78
79enum StageBarrier {
80    Cursor(byteor_pipeline_backings_shm::CursorBarrier),
81    Gating(byteor_pipeline_backings_shm::GatingBarrier),
82    Join(JoinBarrier),
83}
84
85impl byteor_pipeline_kernel::SeqBarrier for StageBarrier {
86    fn available(&self) -> u64 {
87        match self {
88            StageBarrier::Cursor(b) => b.available(),
89            StageBarrier::Gating(b) => b.available(),
90            StageBarrier::Join(b) => b.available(),
91        }
92    }
93}
94
95enum WaitHolder {
96    Spin(indexbus_core::SpinWait),
97    Backoff(indexbus_core::StdBackoff),
98    Yield(indexbus_core::YieldWait),
99}
100
101impl WaitHolder {
102    #[inline]
103    fn wait(&mut self) {
104        match self {
105            WaitHolder::Spin(w) => byteor_pipeline_kernel::WaitStrategy::wait(w),
106            WaitHolder::Backoff(w) => byteor_pipeline_kernel::WaitStrategy::wait(w),
107            WaitHolder::Yield(w) => byteor_pipeline_kernel::WaitStrategy::wait(w),
108        }
109    }
110}
111
112fn current_allowed_cpus() -> Vec<usize> {
113    #[cfg(target_os = "linux")]
114    {
115        affinity::get_thread_affinity().unwrap_or_default()
116    }
117
118    #[cfg(not(target_os = "linux"))]
119    {
120        Vec::new()
121    }
122}
123
124fn main_thread_cpu(cpus: &[usize]) -> Option<usize> {
125    if cpus.len() > 1 {
126        cpus.first().copied()
127    } else {
128        None
129    }
130}
131
132fn worker_thread_cpu(cpus: &[usize], worker_idx: usize) -> Option<usize> {
133    if cpus.len() <= 1 {
134        return None;
135    }
136
137    Some(cpus[1 + (worker_idx % (cpus.len() - 1))])
138}
139
140fn runtime_stage_cpu(cpus: &[usize], stage_idx: usize) -> Option<usize> {
141    if cpus.is_empty() {
142        return None;
143    }
144
145    Some(cpus[stage_idx % cpus.len()])
146}
147
148struct ThreadAffinityGuard {
149    previous: Vec<usize>,
150    active: bool,
151}
152
153impl ThreadAffinityGuard {
154    fn inactive() -> Self {
155        Self {
156            previous: Vec::new(),
157            active: false,
158        }
159    }
160
161    fn pin(cpu: Option<usize>) -> Result<Self, ExecError> {
162        #[cfg(target_os = "linux")]
163        {
164            let Some(cpu) = cpu else {
165                return Ok(Self::inactive());
166            };
167
168            let previous = affinity::get_thread_affinity().map_err(|e| {
169                ExecError::with_context("read thread affinity failed", e.to_string())
170            })?;
171
172            if previous.len() == 1 && previous[0] == cpu {
173                return Ok(Self {
174                    previous,
175                    active: false,
176                });
177            }
178
179            affinity::set_thread_affinity([cpu]).map_err(|e| {
180                ExecError::with_context("set thread affinity failed", format!("cpu={cpu}: {e}"))
181            })?;
182
183            Ok(Self {
184                previous,
185                active: true,
186            })
187        }
188
189        #[cfg(not(target_os = "linux"))]
190        {
191            let _ = cpu;
192            Ok(Self::inactive())
193        }
194    }
195}
196
197impl Drop for ThreadAffinityGuard {
198    fn drop(&mut self) {
199        #[cfg(target_os = "linux")]
200        if self.active && !self.previous.is_empty() {
201            let _ = affinity::set_thread_affinity(&self.previous);
202        }
203    }
204}
205
206fn run_stage_loop_stoppable<W: byteor_pipeline_kernel::WaitStrategy>(
207    stage: &mut byteor_pipeline_backings_shm::ShmSequencedSlotsStage,
208    barrier: &StageBarrier,
209    wait: &mut W,
210    stop: &std::sync::atomic::AtomicBool,
211    op: &mut StageExecOp,
212    iterations: usize,
213    release: bool,
214) -> Result<(), ExecError> {
215    match op {
216        StageExecOp::BuiltIn(op) => match op {
217            StageOpV1::Identity => {
218                let inspect_fn: InspectFn = op_noop;
219                for _ in 0..iterations {
220                    if stop.load(std::sync::atomic::Ordering::Relaxed) {
221                        break;
222                    }
223
224                    let res =
225                        stage.inspect_next_stoppable(barrier, wait, release, stop, |data, _len| {
226                            inspect_fn(data);
227                            Ok(())
228                        });
229
230                    match res {
231                        Ok(_) => {}
232                        Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => break,
233                        Err(e) => return Err(ExecError::with_source("stage failed", e)),
234                    }
235                }
236            }
237            StageOpV1::AddU8 { delta } => {
238                let delta = *delta;
239                let process_fn: ProcessFn = op_add_u8;
240                for _ in 0..iterations {
241                    if stop.load(std::sync::atomic::Ordering::Relaxed) {
242                        break;
243                    }
244
245                    let res =
246                        stage.process_next_stoppable(barrier, wait, release, stop, |data, len| {
247                            let n = clamp_len(*len, data.len());
248                            process_fn(&mut data[..n], delta);
249                            Ok(())
250                        });
251
252                    match res {
253                        Ok(_) => {}
254                        Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => break,
255                        Err(e) => return Err(ExecError::with_source("stage failed", e)),
256                    }
257                }
258            }
259            StageOpV1::ResolverKey { stage } => {
260                return Err(ExecError::with_context(
261                    "unresolved stage",
262                    format!("stage={stage}"),
263                ));
264            }
265        },
266        StageExecOp::Resolved {
267            key,
268            stage: resolved,
269        } => {
270            let mut scratch = vec![0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
271
272            for _ in 0..iterations {
273                if stop.load(std::sync::atomic::Ordering::Relaxed) {
274                    break;
275                }
276
277                let mut stage_err: Option<ExecError> = None;
278
279                match resolved {
280                    ResolvedStage::Inspect(f) => {
281                        let res = stage.inspect_next_stoppable(
282                            barrier,
283                            wait,
284                            release,
285                            stop,
286                            |data, len| {
287                                let n = clamp_len(len, data.len());
288                                if let Err(e) = f(&data[..n]) {
289                                    stage_err = Some(ExecError::with_context(
290                                        "resolved stage failed",
291                                        format!("stage={key}: {e}"),
292                                    ));
293                                    return Err(
294                                        byteor_pipeline_backings_shm::SequencedSlotsError::Stopped,
295                                    );
296                                }
297                                Ok(())
298                            },
299                        );
300
301                        match res {
302                            Ok(_) => {}
303                            Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
304                                if let Some(e) = stage_err {
305                                    return Err(e);
306                                }
307                                break;
308                            }
309                            Err(e) => return Err(ExecError::with_source("stage failed", e)),
310                        }
311                    }
312                    _ => {
313                        let res = stage.process_next_stoppable(
314                            barrier,
315                            wait,
316                            release,
317                            stop,
318                            |data, len| {
319                                let n_in = clamp_len(*len, data.len());
320                                let input = &data[..n_in];
321                                let out = &mut scratch[..n_in];
322
323                                let mut set_err = |e: ExecError| {
324                                    stage_err = Some(e);
325                                    Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped)
326                                };
327
328                                match resolved {
329                                    ResolvedStage::MapOk(f) => {
330                                        let n = f(input, out);
331                                        if n > input.len() {
332                                            return set_err(ExecError::with_context(
333                                                "resolved stage output too large",
334                                                format!("stage={key} out={n} in={}", input.len()),
335                                            ));
336                                        }
337                                        data[..n].copy_from_slice(&out[..n]);
338                                        *len = n as u32;
339                                        Ok(())
340                                    }
341                                    ResolvedStage::Map(f) => match f(input, out) {
342                                        Ok(n) => {
343                                            if n > input.len() {
344                                                return set_err(ExecError::with_context(
345                                                    "resolved stage output too large",
346                                                    format!(
347                                                        "stage={key} out={n} in={} (fallible)",
348                                                        input.len()
349                                                    ),
350                                                ));
351                                            }
352                                            data[..n].copy_from_slice(&out[..n]);
353                                            *len = n as u32;
354                                            Ok(())
355                                        }
356                                        Err(e) => set_err(ExecError::with_context(
357                                            "resolved stage failed",
358                                            format!("stage={key}: {e}"),
359                                        )),
360                                    },
361                                    ResolvedStage::Filter(f) => match f(input) {
362                                        Ok(true) => Ok(()),
363                                        Ok(false) => {
364                                            *len = 0;
365                                            Ok(())
366                                        }
367                                        Err(e) => set_err(ExecError::with_context(
368                                            "resolved stage failed",
369                                            format!("stage={key}: {e}"),
370                                        )),
371                                    },
372                                    ResolvedStage::Inspect(f) => match f(input) {
373                                        Ok(()) => Ok(()),
374                                        Err(e) => set_err(ExecError::with_context(
375                                            "resolved stage failed",
376                                            format!("stage={key}: {e}"),
377                                        )),
378                                    },
379                                    ResolvedStage::StatefulTransform(st) => match st.call(input, out)
380                                    {
381                                        Ok(TransformOutcome::Drop) => {
382                                            *len = 0;
383                                            Ok(())
384                                        }
385                                        Ok(TransformOutcome::ForwardInput) => Ok(()),
386                                        Ok(TransformOutcome::ForwardOutput(n)) => {
387                                            if n > input.len() {
388                                                return set_err(ExecError::with_context(
389                                                    "resolved stage output too large",
390                                                    format!(
391                                                        "stage={key} out={n} in={} (stateful transform)",
392                                                        input.len()
393                                                    ),
394                                                ));
395                                            }
396                                            data[..n].copy_from_slice(&out[..n]);
397                                            *len = n as u32;
398                                            Ok(())
399                                        }
400                                        Ok(_) => set_err(ExecError::new(
401                                            "unsupported transform outcome (non-exhaustive)",
402                                        )),
403                                        Err(e) => set_err(ExecError::with_context(
404                                            "resolved stage failed",
405                                            format!("stage={key}: {e}"),
406                                        )),
407                                    },
408                                    ResolvedStage::Stateful(st) => match st.call(input, out) {
409                                        Ok(n) => {
410                                            if n > input.len() {
411                                                return set_err(ExecError::with_context(
412                                                    "resolved stage output too large",
413                                                    format!(
414                                                        "stage={key} out={n} in={} (stateful)",
415                                                        input.len()
416                                                    ),
417                                                ));
418                                            }
419                                            data[..n].copy_from_slice(&out[..n]);
420                                            *len = n as u32;
421                                            Ok(())
422                                        }
423                                        Err(e) => set_err(ExecError::with_context(
424                                            "resolved stage failed",
425                                            format!("stage={key}: {e}"),
426                                        )),
427                                    },
428                                }
429                            },
430                        );
431
432                        match res {
433                            Ok(_) => {}
434                            Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
435                                if let Some(e) = stage_err {
436                                    return Err(e);
437                                }
438                                break;
439                            }
440                            Err(e) => return Err(ExecError::with_source("stage failed", e)),
441                        }
442                    }
443                }
444            }
445        }
446    }
447
448    Ok(())
449}
450
451fn run_stage_loop_iters<W: byteor_pipeline_kernel::WaitStrategy>(
452    stage: &mut byteor_pipeline_backings_shm::ShmSequencedSlotsStage,
453    barrier: &StageBarrier,
454    wait: &mut W,
455    op: &mut StageExecOp,
456    iterations: usize,
457    release: bool,
458) -> Result<(), ExecError> {
459    match op {
460        StageExecOp::BuiltIn(op) => match op {
461            StageOpV1::Identity => {
462                let inspect_fn: InspectFn = op_noop;
463                for _ in 0..iterations {
464                    stage
465                        .inspect_next(barrier, wait, release, |data, _len| {
466                            inspect_fn(data);
467                            Ok(())
468                        })
469                        .map_err(|e| ExecError::with_source("stage failed", e))?;
470                }
471            }
472            StageOpV1::AddU8 { delta } => {
473                let delta = *delta;
474                let process_fn: ProcessFn = op_add_u8;
475                for _ in 0..iterations {
476                    stage
477                        .process_next(barrier, wait, release, |data, len| {
478                            let n = clamp_len(*len, data.len());
479                            process_fn(&mut data[..n], delta);
480                            Ok(())
481                        })
482                        .map_err(|e| ExecError::with_source("stage failed", e))?;
483                }
484            }
485            StageOpV1::ResolverKey { stage } => {
486                return Err(ExecError::with_context(
487                    "unresolved stage",
488                    format!("stage={stage}"),
489                ));
490            }
491        },
492        StageExecOp::Resolved {
493            key,
494            stage: resolved,
495        } => {
496            let mut scratch = vec![0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
497            for _ in 0..iterations {
498                let mut stage_err: Option<ExecError> = None;
499
500                match resolved {
501                    ResolvedStage::Inspect(f) => {
502                        let res = stage.inspect_next(barrier, wait, release, |data, len| {
503                            let n = clamp_len(len, data.len());
504                            if let Err(e) = f(&data[..n]) {
505                                stage_err = Some(ExecError::with_context(
506                                    "resolved stage failed",
507                                    format!("stage={key}: {e}"),
508                                ));
509                                return Err(
510                                    byteor_pipeline_backings_shm::SequencedSlotsError::Stopped,
511                                );
512                            }
513                            Ok(())
514                        });
515
516                        match res {
517                            Ok(_) => {}
518                            Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
519                                return Err(stage_err
520                                    .unwrap_or_else(|| ExecError::new("resolved stage stopped")));
521                            }
522                            Err(e) => return Err(ExecError::with_source("stage failed", e)),
523                        }
524                    }
525                    _ => {
526                        let res = stage.process_next(barrier, wait, release, |data, len| {
527                            let n_in = clamp_len(*len, data.len());
528                            let input = &data[..n_in];
529                            let out = &mut scratch[..n_in];
530
531                            let mut set_err = |e: ExecError| {
532                                stage_err = Some(e);
533                                Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped)
534                            };
535
536                            match resolved {
537                                ResolvedStage::MapOk(f) => {
538                                    let n = f(input, out);
539                                    if n > input.len() {
540                                        return set_err(ExecError::with_context(
541                                            "resolved stage output too large",
542                                            format!("stage={key} out={n} in={}", input.len()),
543                                        ));
544                                    }
545                                    data[..n].copy_from_slice(&out[..n]);
546                                    *len = n as u32;
547                                    Ok(())
548                                }
549                                ResolvedStage::Map(f) => match f(input, out) {
550                                    Ok(n) => {
551                                        if n > input.len() {
552                                            return set_err(ExecError::with_context(
553                                                "resolved stage output too large",
554                                                format!(
555                                                    "stage={key} out={n} in={} (fallible)",
556                                                    input.len()
557                                                ),
558                                            ));
559                                        }
560                                        data[..n].copy_from_slice(&out[..n]);
561                                        *len = n as u32;
562                                        Ok(())
563                                    }
564                                    Err(e) => set_err(ExecError::with_context(
565                                        "resolved stage failed",
566                                        format!("stage={key}: {e}"),
567                                    )),
568                                },
569                                ResolvedStage::Filter(f) => match f(input) {
570                                    Ok(true) => Ok(()),
571                                    Ok(false) => {
572                                        *len = 0;
573                                        Ok(())
574                                    }
575                                    Err(e) => set_err(ExecError::with_context(
576                                        "resolved stage failed",
577                                        format!("stage={key}: {e}"),
578                                    )),
579                                },
580                                ResolvedStage::Inspect(f) => match f(input) {
581                                    Ok(()) => Ok(()),
582                                    Err(e) => set_err(ExecError::with_context(
583                                        "resolved stage failed",
584                                        format!("stage={key}: {e}"),
585                                    )),
586                                },
587                                ResolvedStage::StatefulTransform(st) => match st.call(input, out)
588                                {
589                                    Ok(TransformOutcome::Drop) => {
590                                        *len = 0;
591                                        Ok(())
592                                    }
593                                    Ok(TransformOutcome::ForwardInput) => Ok(()),
594                                    Ok(TransformOutcome::ForwardOutput(n)) => {
595                                        if n > input.len() {
596                                            return set_err(ExecError::with_context(
597                                                "resolved stage output too large",
598                                                format!(
599                                                    "stage={key} out={n} in={} (stateful transform)",
600                                                    input.len()
601                                                ),
602                                            ));
603                                        }
604                                        data[..n].copy_from_slice(&out[..n]);
605                                        *len = n as u32;
606                                        Ok(())
607                                    }
608                                    Ok(_) => set_err(ExecError::new(
609                                        "unsupported transform outcome (non-exhaustive)",
610                                    )),
611                                    Err(e) => set_err(ExecError::with_context(
612                                        "resolved stage failed",
613                                        format!("stage={key}: {e}"),
614                                    )),
615                                },
616                                ResolvedStage::Stateful(st) => match st.call(input, out) {
617                                    Ok(n) => {
618                                        if n > input.len() {
619                                            return set_err(ExecError::with_context(
620                                                "resolved stage output too large",
621                                                format!(
622                                                    "stage={key} out={n} in={} (stateful)",
623                                                    input.len()
624                                                ),
625                                            ));
626                                        }
627                                        data[..n].copy_from_slice(&out[..n]);
628                                        *len = n as u32;
629                                        Ok(())
630                                    }
631                                    Err(e) => set_err(ExecError::with_context(
632                                        "resolved stage failed",
633                                        format!("stage={key}: {e}"),
634                                    )),
635                                },
636                            }
637                        });
638
639                        match res {
640                            Ok(_) => {}
641                            Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
642                                return Err(stage_err
643                                    .unwrap_or_else(|| ExecError::new("resolved stage stopped")));
644                            }
645                            Err(e) => return Err(ExecError::with_source("stage failed", e)),
646                        }
647                    }
648                }
649            }
650        }
651    }
652
653    Ok(())
654}
655
656/// In-process SP runtime handle for SingleRing SHM execution.
657pub struct SingleRingSpRuntime {
658    stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
659    lane_path: std::path::PathBuf,
660    active_gating: usize,
661    handles: Vec<(usize, std::thread::JoinHandle<Result<(), ExecError>>)>,
662}
663
664impl SingleRingSpRuntime {
665    /// Number of stage threads in this runtime.
666    pub fn stage_count(&self) -> usize {
667        self.handles.len()
668    }
669
670    /// Number of active gating cells used by this runtime.
671    pub fn active_gating(&self) -> usize {
672        self.active_gating
673    }
674
675    /// Snapshot cursor + gating sequences without stopping the runtime.
676    pub fn snapshot(&self) -> Result<byteor_pipeline_backings_shm::SingleRingSnapshot, ExecError> {
677        let lane = "ignored";
678        let opts = byteor_pipeline_backings_shm::AttachOptions {
679            blocking: false,
680            prefault: false,
681            path: Some(self.lane_path.clone()),
682            queue: 0,
683        };
684        let shm =
685            byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, lane).map_err(|e| {
686                ExecError::with_context_source(
687                    "attach shm failed",
688                    format!("path={} lane={lane}", self.lane_path.display()),
689                    e,
690                )
691            })?;
692        Ok(shm.snapshot())
693    }
694
695    /// True if any stage thread has exited (successfully or with error).
696    ///
697    /// This is intended for control-plane supervision.
698    pub fn any_stage_finished(&self) -> bool {
699        self.handles.iter().any(|(_, h)| h.is_finished())
700    }
701
702    /// Request a cooperative stop.
703    pub fn request_stop(&self) {
704        self.stop.store(true, std::sync::atomic::Ordering::Relaxed);
705    }
706
707    /// Request stop and join stage threads, but return an error if they don't stop within `timeout`.
708    ///
709    /// Notes:
710    /// - Rust threads cannot be force-killed safely; this is cooperative stop only.
711    /// - On timeout, this returns an error that lists unfinished stages.
712    pub fn stop_and_join_bounded(mut self, timeout: std::time::Duration) -> Result<(), ExecError> {
713        self.request_stop();
714
715        let start = std::time::Instant::now();
716        let mut first_err: Option<ExecError> = None;
717
718        // Join threads as they finish so we can report failures promptly.
719        while !self.handles.is_empty() {
720            let mut i = 0;
721            while i < self.handles.len() {
722                if self.handles[i].1.is_finished() {
723                    let (stage_idx, h) = self.handles.swap_remove(i);
724                    let res = match h.join() {
725                        Ok(r) => r,
726                        Err(_) => Err(ExecError::new("stage panic")),
727                    };
728
729                    if let Err(e) = res {
730                        if first_err.is_none() {
731                            first_err = Some(ExecError::with_context(
732                                "stage exited",
733                                format!("stage_idx={stage_idx}: {e}"),
734                            ));
735                        }
736                    }
737
738                    continue;
739                }
740                i += 1;
741            }
742
743            if self.handles.is_empty() {
744                break;
745            }
746
747            if start.elapsed() >= timeout {
748                let mut unfinished: Vec<usize> = self.handles.iter().map(|(idx, _)| *idx).collect();
749                unfinished.sort_unstable();
750                return Err(ExecError::with_context(
751                    "stop/join timed out",
752                    format!("unfinished_stages={unfinished:?}"),
753                ));
754            }
755
756            std::thread::sleep(std::time::Duration::from_millis(10));
757        }
758
759        match first_err {
760            Some(e) => Err(e),
761            None => Ok(()),
762        }
763    }
764
765    /// Request stop and join all stage threads.
766    pub fn stop_and_join(mut self) -> Result<(), ExecError> {
767        self.request_stop();
768
769        let mut first_err: Option<ExecError> = None;
770        for (stage_idx, h) in self.handles.drain(..) {
771            let res = match h.join() {
772                Ok(r) => r,
773                Err(_) => Err(ExecError::new("stage panic")),
774            };
775
776            if let Err(e) = res {
777                if first_err.is_none() {
778                    first_err = Some(ExecError::with_context(
779                        "stage exited",
780                        format!("stage_idx={stage_idx}: {e}"),
781                    ));
782                }
783            }
784        }
785
786        match first_err {
787            Some(e) => Err(e),
788            None => Ok(()),
789        }
790    }
791}
792
793/// Spawn a long-running SingleRing runtime that processes until stopped.
794pub fn spawn_single_ring_shm_runtime(
795    spec: &PipelineSpecV1,
796    lane_path: std::path::PathBuf,
797) -> Result<SingleRingSpRuntime, ExecError> {
798    spawn_single_ring_shm_runtime_with_wait(spec, lane_path, WaitPreset::StdBackoff)
799}
800
801/// Spawn a long-running SingleRing runtime that processes until stopped, using a wait preset.
802pub fn spawn_single_ring_shm_runtime_with_wait(
803    spec: &PipelineSpecV1,
804    lane_path: std::path::PathBuf,
805    wait: WaitPreset,
806) -> Result<SingleRingSpRuntime, ExecError> {
807    spawn_single_ring_shm_runtime_with_wait_and_resolver(spec, lane_path, wait, None)
808}
809
810/// Spawn a long-running SingleRing runtime that processes until stopped, resolving any
811/// `ResolverKey` stages using `resolver` (cold path).
812pub fn spawn_single_ring_shm_runtime_with_wait_and_resolver(
813    spec: &PipelineSpecV1,
814    lane_path: std::path::PathBuf,
815    wait: WaitPreset,
816    resolver: Option<&dyn StageResolver>,
817) -> Result<SingleRingSpRuntime, ExecError> {
818    spawn_single_ring_shm_runtime_with_wait_and_resolver_and_prefault(
819        spec, lane_path, wait, resolver, false,
820    )
821}
822
823/// Spawn a long-running SingleRing runtime that processes until stopped, resolving any
824/// `ResolverKey` stages using `resolver` (cold path), with explicit control over SHM prefault.
825pub fn spawn_single_ring_shm_runtime_with_wait_and_resolver_and_prefault(
826    spec: &PipelineSpecV1,
827    lane_path: std::path::PathBuf,
828    wait: WaitPreset,
829    resolver: Option<&dyn StageResolver>,
830    prefault: bool,
831) -> Result<SingleRingSpRuntime, ExecError> {
832    spawn_single_ring_shm_runtime_with_wait_and_resolver_and_thread_init_and_prefault(
833        spec, lane_path, wait, resolver, None, prefault,
834    )
835}
836
837/// Spawn a long-running SingleRing runtime that processes until stopped, resolving any
838/// `ResolverKey` stages using `resolver` (cold path) and invoking `thread_init` at the start of
839/// each stage thread (before entering hot loops).
840pub fn spawn_single_ring_shm_runtime_with_wait_and_resolver_and_thread_init(
841    spec: &PipelineSpecV1,
842    lane_path: std::path::PathBuf,
843    wait: WaitPreset,
844    resolver: Option<&dyn StageResolver>,
845    thread_init: Option<ThreadInitHook>,
846) -> Result<SingleRingSpRuntime, ExecError> {
847    spawn_single_ring_shm_runtime_with_wait_and_resolver_and_thread_init_and_prefault(
848        spec,
849        lane_path,
850        wait,
851        resolver,
852        thread_init,
853        false,
854    )
855}
856
857/// Spawn a long-running SingleRing runtime that processes until stopped, resolving any
858/// `ResolverKey` stages using `resolver` (cold path) and invoking `thread_init` at the start of
859/// each stage thread (before entering hot loops), with explicit control over SHM prefault.
860pub fn spawn_single_ring_shm_runtime_with_wait_and_resolver_and_thread_init_and_prefault(
861    spec: &PipelineSpecV1,
862    lane_path: std::path::PathBuf,
863    wait: WaitPreset,
864    resolver: Option<&dyn StageResolver>,
865    thread_init: Option<ThreadInitHook>,
866    prefault: bool,
867) -> Result<SingleRingSpRuntime, ExecError> {
868    validate_v1(spec).map_err(|e| ExecError::with_source("spec validation failed", e))?;
869    let ring = match spec {
870        PipelineSpecV1::SingleRing(ring) => ring,
871        _ => return Err(ExecError::new("expected SingleRing spec")),
872    };
873
874    if ring.shards > 1 {
875        return Err(ExecError::new(
876            "sharded SingleRing requires the sharded executor helper",
877        ));
878    }
879
880    let lane = "ignored";
881    let stages = ring.stages.len();
882    let active_gating = stages;
883    if active_gating == 0 || active_gating > 4 {
884        return Err(ExecError::new("single_ring gating exceeds Layout4"));
885    }
886
887    let opts = byteor_pipeline_backings_shm::AttachOptions {
888        blocking: false,
889        prefault,
890        path: Some(lane_path.clone()),
891        queue: 0,
892    };
893    let shm =
894        byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, lane).map_err(|e| {
895            ExecError::with_context_source(
896                "attach shm failed",
897                format!("path={} lane={lane}", lane_path.display()),
898                e,
899            )
900        })?;
901
902    let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
903    let iterations = usize::MAX;
904    let auto_pin_cpus = if thread_init.is_none() {
905        current_allowed_cpus()
906    } else {
907        Vec::new()
908    };
909
910    let mut handles = Vec::new();
911    for (stage_idx, st) in ring.stages.iter().cloned().enumerate() {
912        let opts_t = opts.clone();
913        let stop_t = stop.clone();
914        let cursor_t = byteor_pipeline_backings_shm::CursorBarrier::new(&shm);
915        let wait_t = wait;
916        let thread_init_t = thread_init.clone();
917        let cpu_t = runtime_stage_cpu(&auto_pin_cpus, stage_idx);
918
919        let barrier = if st.depends_on.is_empty() {
920            StageBarrier::Cursor(cursor_t)
921        } else if st.depends_on.len() == 1 {
922            let dep = st.depends_on[0] as usize;
923            StageBarrier::Gating(byteor_pipeline_backings_shm::GatingBarrier::new(&shm, dep))
924        } else {
925            let mut deps = Vec::new();
926            for d in &st.depends_on {
927                deps.push(byteor_pipeline_backings_shm::GatingBarrier::new(
928                    &shm,
929                    *d as usize,
930                ));
931            }
932            StageBarrier::Join(JoinBarrier { deps })
933        };
934
935        let mut op = resolve_stage_op(st.op, resolver)?;
936
937        let release = stage_idx + 1 == stages;
938
939        let role_name = format!("byteor-single-ring:stage-{stage_idx}");
940        let handle = std::thread::Builder::new()
941            .name(role_name.clone())
942            .spawn(move || {
943                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
944                    let _affinity_guard = ThreadAffinityGuard::pin(cpu_t)?;
945
946                    if let Some(h) = &thread_init_t {
947                        let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Stage);
948                        h(&ctx)?;
949                    }
950
951                    let mut stage = byteor_pipeline_backings_shm::ShmSequencedSlotsStage::attach(
952                        &opts_t, lane, stage_idx,
953                    )
954                    .map_err(|_| {
955                        ExecError::with_context(
956                            "attach stage failed",
957                            format!("stage_idx={stage_idx}"),
958                        )
959                    })?;
960
961                    match wait_t {
962                        WaitPreset::MaxPerfSpin => {
963                            let mut wait = indexbus_core::SpinWait::default();
964                            run_stage_loop_stoppable(
965                                &mut stage,
966                                &barrier,
967                                &mut wait,
968                                stop_t.as_ref(),
969                                &mut op,
970                                iterations,
971                                release,
972                            )?;
973                        }
974                        WaitPreset::StdBackoff => {
975                            let mut wait = indexbus_core::StdBackoff::default();
976                            run_stage_loop_stoppable(
977                                &mut stage,
978                                &barrier,
979                                &mut wait,
980                                stop_t.as_ref(),
981                                &mut op,
982                                iterations,
983                                release,
984                            )?;
985                        }
986                        WaitPreset::Yield => {
987                            let mut wait = indexbus_core::YieldWait;
988                            run_stage_loop_stoppable(
989                                &mut stage,
990                                &barrier,
991                                &mut wait,
992                                stop_t.as_ref(),
993                                &mut op,
994                                iterations,
995                                release,
996                            )?;
997                        }
998                    }
999
1000                    Ok::<(), ExecError>(())
1001                }));
1002
1003                match result {
1004                    Ok(Ok(())) => Ok(()),
1005                    Ok(Err(e)) => {
1006                        stop_t.store(true, std::sync::atomic::Ordering::Relaxed);
1007                        Err(ExecError::with_context(
1008                            "stage failed",
1009                            format!("stage_idx={stage_idx}: {e}"),
1010                        ))
1011                    }
1012                    Err(_) => {
1013                        stop_t.store(true, std::sync::atomic::Ordering::Relaxed);
1014                        Err(ExecError::with_context(
1015                            "stage panic",
1016                            format!("stage_idx={stage_idx}"),
1017                        ))
1018                    }
1019                }
1020            })
1021            .map_err(|e| ExecError::with_source("failed to spawn stage thread", e))?;
1022
1023        handles.push((stage_idx, handle));
1024    }
1025
1026    Ok(SingleRingSpRuntime {
1027        stop,
1028        lane_path,
1029        active_gating,
1030        handles,
1031    })
1032}
1033
1034/// Run a SingleRing spec over a SHM SequencedSlots backing.
1035///
1036/// This is a minimal v1 in-process executor intended for bring-up tests.
1037pub fn run_single_ring_shm(
1038    spec: &PipelineSpecV1,
1039    lane_path: std::path::PathBuf,
1040    input: &[u8],
1041) -> Result<Vec<u8>, ExecError> {
1042    run_single_ring_shm_with_wait(spec, lane_path, input, WaitPreset::MaxPerfSpin)
1043}
1044
1045/// Run a SingleRing spec over a SHM SequencedSlots backing, using a wait preset.
1046pub fn run_single_ring_shm_with_wait(
1047    spec: &PipelineSpecV1,
1048    lane_path: std::path::PathBuf,
1049    input: &[u8],
1050    wait: WaitPreset,
1051) -> Result<Vec<u8>, ExecError> {
1052    run_single_ring_shm_with_wait_and_resolver(spec, lane_path, input, wait, None)
1053}
1054
1055/// Run a SingleRing spec over a SHM SequencedSlots backing, resolving any `ResolverKey` stages
1056/// using `resolver` (cold path).
1057pub fn run_single_ring_shm_with_wait_and_resolver(
1058    spec: &PipelineSpecV1,
1059    lane_path: std::path::PathBuf,
1060    input: &[u8],
1061    wait: WaitPreset,
1062    resolver: Option<&dyn StageResolver>,
1063) -> Result<Vec<u8>, ExecError> {
1064    run_single_ring_shm_with_wait_and_resolver_and_prefault(
1065        spec, lane_path, input, wait, resolver, false,
1066    )
1067}
1068
1069/// Run a SingleRing spec over a SHM SequencedSlots backing, resolving any `ResolverKey` stages
1070/// using `resolver` (cold path), with explicit control over SHM prefault.
1071pub fn run_single_ring_shm_with_wait_and_resolver_and_prefault(
1072    spec: &PipelineSpecV1,
1073    lane_path: std::path::PathBuf,
1074    input: &[u8],
1075    wait: WaitPreset,
1076    resolver: Option<&dyn StageResolver>,
1077    prefault: bool,
1078) -> Result<Vec<u8>, ExecError> {
1079    run_single_ring_shm_with_wait_and_resolver_and_thread_init_and_prefault(
1080        spec, lane_path, input, wait, resolver, None, prefault,
1081    )
1082}
1083
1084/// Run a SingleRing spec over a SHM SequencedSlots backing, resolving any `ResolverKey` stages
1085/// using `resolver` (cold path) and invoking `thread_init` at the start of each stage thread.
1086///
1087/// This is a minimal v1 in-process executor intended for bring-up tests.
1088pub fn run_single_ring_shm_with_wait_and_resolver_and_thread_init(
1089    spec: &PipelineSpecV1,
1090    lane_path: std::path::PathBuf,
1091    input: &[u8],
1092    wait: WaitPreset,
1093    resolver: Option<&dyn StageResolver>,
1094    thread_init: Option<ThreadInitHook>,
1095) -> Result<Vec<u8>, ExecError> {
1096    run_single_ring_shm_with_wait_and_resolver_and_thread_init_and_prefault(
1097        spec,
1098        lane_path,
1099        input,
1100        wait,
1101        resolver,
1102        thread_init,
1103        false,
1104    )
1105}
1106
1107/// Run a SingleRing spec over a SHM SequencedSlots backing, resolving any `ResolverKey` stages
1108/// using `resolver` (cold path) and invoking `thread_init` at the start of each stage thread,
1109/// with explicit control over SHM prefault.
1110pub fn run_single_ring_shm_with_wait_and_resolver_and_thread_init_and_prefault(
1111    spec: &PipelineSpecV1,
1112    lane_path: std::path::PathBuf,
1113    input: &[u8],
1114    wait: WaitPreset,
1115    resolver: Option<&dyn StageResolver>,
1116    thread_init: Option<ThreadInitHook>,
1117    prefault: bool,
1118) -> Result<Vec<u8>, ExecError> {
1119    validate_v1(spec).map_err(|e| ExecError::with_source("spec validation failed", e))?;
1120    let ring = match spec {
1121        PipelineSpecV1::SingleRing(ring) => ring,
1122        _ => return Err(ExecError::new("spec is not SingleRing")),
1123    };
1124
1125    if ring.shards > 1 {
1126        return Err(ExecError::new(
1127            "sharded SingleRing requires the sharded executor helper",
1128        ));
1129    }
1130
1131    let opts = byteor_pipeline_backings_shm::AttachOptions {
1132        blocking: false,
1133        prefault,
1134        path: Some(lane_path),
1135        queue: 0,
1136    };
1137
1138    let lane = "ignored";
1139    let stages = ring.stages.len();
1140    let active_gating = stages;
1141    if active_gating == 0 || active_gating > 4 {
1142        return Err(ExecError::new("single_ring gating exceeds Layout4"));
1143    }
1144
1145    let mut prod = byteor_pipeline_backings_shm::ShmSequencedSlotsProducer::attach(
1146        &opts,
1147        lane,
1148        1,
1149        active_gating,
1150    )
1151    .map_err(|e| {
1152        ExecError::with_context_source(
1153            "attach producer failed",
1154            format!("path={} lane={lane}", opts.path.as_ref().unwrap().display()),
1155            e,
1156        )
1157    })?;
1158
1159    let shm =
1160        byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, lane).map_err(|e| {
1161            ExecError::with_context_source(
1162                "attach shm failed",
1163                format!("path={} lane={lane}", opts.path.as_ref().unwrap().display()),
1164                e,
1165            )
1166        })?;
1167    let cursor = byteor_pipeline_backings_shm::CursorBarrier::new(&shm);
1168
1169    let mut handles = Vec::new();
1170    let iterations = input.len();
1171    let final_stage_idx = stages.saturating_sub(1);
1172    let auto_pin_cpus = if thread_init.is_none() {
1173        current_allowed_cpus()
1174    } else {
1175        Vec::new()
1176    };
1177    for (stage_idx, st) in ring
1178        .stages
1179        .iter()
1180        .cloned()
1181        .enumerate()
1182        .take(final_stage_idx)
1183    {
1184        let opts_t = opts.clone();
1185        let cursor_t = byteor_pipeline_backings_shm::CursorBarrier::new(&shm);
1186        let wait_t = wait;
1187        let thread_init_t = thread_init.clone();
1188        let cpu_t = worker_thread_cpu(&auto_pin_cpus, stage_idx);
1189
1190        let barrier = if st.depends_on.is_empty() {
1191            StageBarrier::Cursor(cursor_t)
1192        } else if st.depends_on.len() == 1 {
1193            let dep = st.depends_on[0] as usize;
1194            StageBarrier::Gating(byteor_pipeline_backings_shm::GatingBarrier::new(&shm, dep))
1195        } else {
1196            let mut deps = Vec::new();
1197            for d in &st.depends_on {
1198                deps.push(byteor_pipeline_backings_shm::GatingBarrier::new(
1199                    &shm,
1200                    *d as usize,
1201                ));
1202            }
1203            StageBarrier::Join(JoinBarrier { deps })
1204        };
1205
1206        let mut op = resolve_stage_op(st.op, resolver)?;
1207
1208        let role_name = format!("byteor-single-ring:stage-{stage_idx}");
1209        let handle = std::thread::Builder::new()
1210            .name(role_name.clone())
1211            .spawn(move || {
1212                let _affinity_guard = ThreadAffinityGuard::pin(cpu_t)?;
1213
1214                if let Some(h) = &thread_init_t {
1215                    let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Stage);
1216                    h(&ctx)?;
1217                }
1218
1219                let mut stage = byteor_pipeline_backings_shm::ShmSequencedSlotsStage::attach(
1220                    &opts_t, lane, stage_idx,
1221                )
1222                .map_err(|e| {
1223                    ExecError::with_context_source(
1224                        "attach stage failed",
1225                        format!(
1226                            "path={} lane={lane} stage_idx={stage_idx}",
1227                            opts_t.path.as_ref().unwrap().display()
1228                        ),
1229                        e,
1230                    )
1231                })?;
1232
1233                match wait_t {
1234                    WaitPreset::MaxPerfSpin => {
1235                        let mut wait = indexbus_core::SpinWait::default();
1236                        run_stage_loop_iters(
1237                            &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1238                        )?;
1239                    }
1240                    WaitPreset::StdBackoff => {
1241                        let mut wait = indexbus_core::StdBackoff::default();
1242                        run_stage_loop_iters(
1243                            &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1244                        )?;
1245                    }
1246                    WaitPreset::Yield => {
1247                        let mut wait = indexbus_core::YieldWait;
1248                        run_stage_loop_iters(
1249                            &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1250                        )?;
1251                    }
1252                }
1253
1254                Ok::<(), ExecError>(())
1255            })
1256            .map_err(|e| ExecError::with_source("failed to spawn stage thread", e))?;
1257        handles.push(handle);
1258    }
1259
1260    let _main_affinity_guard = ThreadAffinityGuard::pin(main_thread_cpu(&auto_pin_cpus))?;
1261
1262    let mut publish_wait = match wait {
1263        WaitPreset::MaxPerfSpin => WaitHolder::Spin(indexbus_core::SpinWait::default()),
1264        WaitPreset::StdBackoff => WaitHolder::Backoff(indexbus_core::StdBackoff::default()),
1265        WaitPreset::Yield => WaitHolder::Yield(indexbus_core::YieldWait),
1266    };
1267
1268    // Final stage acts as the consumer: it releases slots and we collect output directly.
1269    let final_stage = ring
1270        .stages
1271        .get(final_stage_idx)
1272        .cloned()
1273        .ok_or_else(|| ExecError::new("single_ring requires at least one stage"))?;
1274
1275    let consumer_barrier = if final_stage.depends_on.is_empty() {
1276        StageBarrier::Cursor(cursor)
1277    } else if final_stage.depends_on.len() == 1 {
1278        let dep = final_stage.depends_on[0] as usize;
1279        StageBarrier::Gating(byteor_pipeline_backings_shm::GatingBarrier::new(&shm, dep))
1280    } else {
1281        let mut deps = Vec::new();
1282        for d in &final_stage.depends_on {
1283            deps.push(byteor_pipeline_backings_shm::GatingBarrier::new(
1284                &shm,
1285                *d as usize,
1286            ));
1287        }
1288        StageBarrier::Join(JoinBarrier { deps })
1289    };
1290
1291    let mut consumer_stage =
1292        byteor_pipeline_backings_shm::ShmSequencedSlotsStage::attach(&opts, lane, final_stage_idx)
1293            .map_err(|e| {
1294                ExecError::with_context_source(
1295                    "attach stage failed",
1296                    format!(
1297                        "path={} lane={lane} stage_idx={final_stage_idx}",
1298                        opts.path.as_ref().unwrap().display()
1299                    ),
1300                    e,
1301                )
1302            })?;
1303    let mut consumer_op = resolve_stage_op(final_stage.op, resolver)?;
1304    let mut scratch = vec![0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
1305
1306    let mut out = Vec::with_capacity(input.len());
1307    let mut wait = match wait {
1308        WaitPreset::MaxPerfSpin => WaitHolder::Spin(indexbus_core::SpinWait::default()),
1309        WaitPreset::StdBackoff => WaitHolder::Backoff(indexbus_core::StdBackoff::default()),
1310        WaitPreset::Yield => WaitHolder::Yield(indexbus_core::YieldWait),
1311    };
1312
1313    for &b in input {
1314        loop {
1315            match prod.publish_bytes(&[b]) {
1316                Ok(_) => break,
1317                Err(byteor_pipeline_backings_shm::SequencedSlotsError::Full) => {
1318                    publish_wait.wait();
1319                }
1320                Err(e) => {
1321                    return Err(ExecError::with_context("publish failed", e.to_string()));
1322                }
1323            }
1324        }
1325
1326        match &mut wait {
1327            WaitHolder::Spin(w) => run_consumer_step_collect_u8(
1328                &mut consumer_stage,
1329                &consumer_barrier,
1330                w,
1331                &mut consumer_op,
1332                &mut scratch,
1333                &mut out,
1334            )?,
1335            WaitHolder::Backoff(w) => run_consumer_step_collect_u8(
1336                &mut consumer_stage,
1337                &consumer_barrier,
1338                w,
1339                &mut consumer_op,
1340                &mut scratch,
1341                &mut out,
1342            )?,
1343            WaitHolder::Yield(w) => run_consumer_step_collect_u8(
1344                &mut consumer_stage,
1345                &consumer_barrier,
1346                w,
1347                &mut consumer_op,
1348                &mut scratch,
1349                &mut out,
1350            )?,
1351        }
1352    }
1353
1354    for h in handles {
1355        h.join().map_err(|_| ExecError::new("stage panic"))??;
1356    }
1357
1358    Ok(out)
1359}
1360
1361/// Run a SingleRing spec over a SHM SequencedSlots backing treating `input` as a single record.
1362///
1363/// This is intended for message-shaped stages (e.g. IndexBus envelopes) where a stage expects a
1364/// whole message in one slot.
1365pub fn run_single_ring_shm_record_with_wait_and_resolver_and_thread_init(
1366    spec: &PipelineSpecV1,
1367    lane_path: std::path::PathBuf,
1368    input: &[u8],
1369    wait: WaitPreset,
1370    resolver: Option<&dyn StageResolver>,
1371    thread_init: Option<ThreadInitHook>,
1372) -> Result<Vec<u8>, ExecError> {
1373    validate_v1(spec).map_err(|e| ExecError::with_source("spec validation failed", e))?;
1374    let ring = match spec {
1375        PipelineSpecV1::SingleRing(ring) => ring,
1376        _ => return Err(ExecError::new("spec is not SingleRing")),
1377    };
1378
1379    if ring.shards > 1 {
1380        return Err(ExecError::new(
1381            "sharded SingleRing requires the sharded executor helper",
1382        ));
1383    }
1384
1385    if input.len() > indexbus_core::INDEXBUS_SLOT_DATA_SIZE {
1386        return Err(ExecError::with_context(
1387            "input too large for slot",
1388            format!(
1389                "in={} max={}",
1390                input.len(),
1391                indexbus_core::INDEXBUS_SLOT_DATA_SIZE
1392            ),
1393        ));
1394    }
1395
1396    let opts = byteor_pipeline_backings_shm::AttachOptions {
1397        blocking: false,
1398        prefault: false,
1399        path: Some(lane_path),
1400        queue: 0,
1401    };
1402
1403    let lane = "ignored";
1404    let stages = ring.stages.len();
1405    let active_gating = stages;
1406    if active_gating == 0 || active_gating > 4 {
1407        return Err(ExecError::new("single_ring gating exceeds Layout4"));
1408    }
1409
1410    let mut prod = byteor_pipeline_backings_shm::ShmSequencedSlotsProducer::attach(
1411        &opts,
1412        lane,
1413        1,
1414        active_gating,
1415    )
1416    .map_err(|e| {
1417        ExecError::with_context_source(
1418            "attach producer failed",
1419            format!("path={} lane={lane}", opts.path.as_ref().unwrap().display()),
1420            e,
1421        )
1422    })?;
1423
1424    let shm =
1425        byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, lane).map_err(|e| {
1426            ExecError::with_context_source(
1427                "attach shm failed",
1428                format!("path={} lane={lane}", opts.path.as_ref().unwrap().display()),
1429                e,
1430            )
1431        })?;
1432    let cursor = byteor_pipeline_backings_shm::CursorBarrier::new(&shm);
1433
1434    let mut handles = Vec::new();
1435    let iterations = 1;
1436    let final_stage_idx = stages.saturating_sub(1);
1437    let auto_pin_cpus = if thread_init.is_none() {
1438        current_allowed_cpus()
1439    } else {
1440        Vec::new()
1441    };
1442    for (stage_idx, st) in ring
1443        .stages
1444        .iter()
1445        .cloned()
1446        .enumerate()
1447        .take(final_stage_idx)
1448    {
1449        let opts_t = opts.clone();
1450        let cursor_t = byteor_pipeline_backings_shm::CursorBarrier::new(&shm);
1451        let wait_t = wait;
1452        let thread_init_t = thread_init.clone();
1453        let cpu_t = worker_thread_cpu(&auto_pin_cpus, stage_idx);
1454
1455        let barrier = if st.depends_on.is_empty() {
1456            StageBarrier::Cursor(cursor_t)
1457        } else if st.depends_on.len() == 1 {
1458            let dep = st.depends_on[0] as usize;
1459            StageBarrier::Gating(byteor_pipeline_backings_shm::GatingBarrier::new(&shm, dep))
1460        } else {
1461            let mut deps = Vec::new();
1462            for d in &st.depends_on {
1463                deps.push(byteor_pipeline_backings_shm::GatingBarrier::new(
1464                    &shm,
1465                    *d as usize,
1466                ));
1467            }
1468            StageBarrier::Join(JoinBarrier { deps })
1469        };
1470
1471        let mut op = resolve_stage_op(st.op, resolver)?;
1472
1473        let role_name = format!("byteor-single-ring:stage-{stage_idx}");
1474        let handle = std::thread::Builder::new()
1475            .name(role_name.clone())
1476            .spawn(move || {
1477                let _affinity_guard = ThreadAffinityGuard::pin(cpu_t)?;
1478
1479                if let Some(h) = &thread_init_t {
1480                    let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Stage);
1481                    h(&ctx)?;
1482                }
1483
1484                let mut stage = byteor_pipeline_backings_shm::ShmSequencedSlotsStage::attach(
1485                    &opts_t, lane, stage_idx,
1486                )
1487                .map_err(|e| {
1488                    ExecError::with_context_source(
1489                        "attach stage failed",
1490                        format!(
1491                            "path={} lane={lane} stage_idx={stage_idx}",
1492                            opts_t.path.as_ref().unwrap().display()
1493                        ),
1494                        e,
1495                    )
1496                })?;
1497
1498                match wait_t {
1499                    WaitPreset::MaxPerfSpin => {
1500                        let mut wait = indexbus_core::SpinWait::default();
1501                        run_stage_loop_iters(
1502                            &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1503                        )?;
1504                    }
1505                    WaitPreset::StdBackoff => {
1506                        let mut wait = indexbus_core::StdBackoff::default();
1507                        run_stage_loop_iters(
1508                            &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1509                        )?;
1510                    }
1511                    WaitPreset::Yield => {
1512                        let mut wait = indexbus_core::YieldWait;
1513                        run_stage_loop_iters(
1514                            &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1515                        )?;
1516                    }
1517                }
1518
1519                Ok::<(), ExecError>(())
1520            })
1521            .map_err(|e| ExecError::with_source("failed to spawn stage thread", e))?;
1522        handles.push(handle);
1523    }
1524
1525    let _main_affinity_guard = ThreadAffinityGuard::pin(main_thread_cpu(&auto_pin_cpus))?;
1526
1527    let mut publish_wait = match wait {
1528        WaitPreset::MaxPerfSpin => WaitHolder::Spin(indexbus_core::SpinWait::default()),
1529        WaitPreset::StdBackoff => WaitHolder::Backoff(indexbus_core::StdBackoff::default()),
1530        WaitPreset::Yield => WaitHolder::Yield(indexbus_core::YieldWait),
1531    };
1532
1533    let final_stage = ring
1534        .stages
1535        .get(final_stage_idx)
1536        .cloned()
1537        .ok_or_else(|| ExecError::new("single_ring requires at least one stage"))?;
1538
1539    let consumer_barrier = if final_stage.depends_on.is_empty() {
1540        StageBarrier::Cursor(cursor)
1541    } else if final_stage.depends_on.len() == 1 {
1542        let dep = final_stage.depends_on[0] as usize;
1543        StageBarrier::Gating(byteor_pipeline_backings_shm::GatingBarrier::new(&shm, dep))
1544    } else {
1545        let mut deps = Vec::new();
1546        for d in &final_stage.depends_on {
1547            deps.push(byteor_pipeline_backings_shm::GatingBarrier::new(
1548                &shm,
1549                *d as usize,
1550            ));
1551        }
1552        StageBarrier::Join(JoinBarrier { deps })
1553    };
1554
1555    let mut consumer_stage =
1556        byteor_pipeline_backings_shm::ShmSequencedSlotsStage::attach(&opts, lane, final_stage_idx)
1557            .map_err(|e| {
1558                ExecError::with_context_source(
1559                    "attach stage failed",
1560                    format!(
1561                        "path={} lane={lane} stage_idx={final_stage_idx}",
1562                        opts.path.as_ref().unwrap().display()
1563                    ),
1564                    e,
1565                )
1566            })?;
1567    let mut consumer_op = resolve_stage_op(final_stage.op, resolver)?;
1568    let mut scratch = vec![0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
1569
1570    let mut out = Vec::with_capacity(input.len());
1571    let mut wait = match wait {
1572        WaitPreset::MaxPerfSpin => WaitHolder::Spin(indexbus_core::SpinWait::default()),
1573        WaitPreset::StdBackoff => WaitHolder::Backoff(indexbus_core::StdBackoff::default()),
1574        WaitPreset::Yield => WaitHolder::Yield(indexbus_core::YieldWait),
1575    };
1576
1577    loop {
1578        match prod.publish_bytes(input) {
1579            Ok(_) => break,
1580            Err(byteor_pipeline_backings_shm::SequencedSlotsError::Full) => publish_wait.wait(),
1581            Err(e) => return Err(ExecError::with_context("publish failed", e.to_string())),
1582        }
1583    }
1584
1585    match &mut wait {
1586        WaitHolder::Spin(w) => run_consumer_step_collect_bytes(
1587            &mut consumer_stage,
1588            &consumer_barrier,
1589            w,
1590            &mut consumer_op,
1591            &mut scratch,
1592            &mut out,
1593        )?,
1594        WaitHolder::Backoff(w) => run_consumer_step_collect_bytes(
1595            &mut consumer_stage,
1596            &consumer_barrier,
1597            w,
1598            &mut consumer_op,
1599            &mut scratch,
1600            &mut out,
1601        )?,
1602        WaitHolder::Yield(w) => run_consumer_step_collect_bytes(
1603            &mut consumer_stage,
1604            &consumer_barrier,
1605            w,
1606            &mut consumer_op,
1607            &mut scratch,
1608            &mut out,
1609        )?,
1610    }
1611
1612    for h in handles {
1613        h.join().map_err(|_| ExecError::new("stage panic"))??;
1614    }
1615
1616    Ok(out)
1617}
1618
1619fn run_consumer_step_collect_u8<W: byteor_pipeline_kernel::WaitStrategy>(
1620    stage: &mut byteor_pipeline_backings_shm::ShmSequencedSlotsStage,
1621    barrier: &StageBarrier,
1622    wait: &mut W,
1623    op: &mut StageExecOp,
1624    scratch: &mut [u8],
1625    out: &mut Vec<u8>,
1626) -> Result<(), ExecError> {
1627    match op {
1628        StageExecOp::BuiltIn(op) => match op {
1629            StageOpV1::Identity => {
1630                stage
1631                    .inspect_next(barrier, wait, true, |data, len| {
1632                        out.push(if len > 0 { data[0] } else { 0 });
1633                        Ok(())
1634                    })
1635                    .map_err(|e| ExecError::with_source("stage failed", e))?;
1636            }
1637            StageOpV1::AddU8 { delta } => {
1638                let delta = *delta;
1639                stage
1640                    .process_next(barrier, wait, true, |data, len| {
1641                        let n = clamp_len(*len, data.len());
1642                        op_add_u8(&mut data[..n], delta);
1643                        out.push(if *len > 0 { data[0] } else { 0 });
1644                        Ok(())
1645                    })
1646                    .map_err(|e| ExecError::with_source("stage failed", e))?;
1647            }
1648            StageOpV1::ResolverKey { stage } => {
1649                return Err(ExecError::with_context(
1650                    "unresolved stage",
1651                    format!("stage={stage}"),
1652                ));
1653            }
1654        },
1655        StageExecOp::Resolved {
1656            key,
1657            stage: resolved,
1658        } => {
1659            let mut stage_err: Option<ExecError> = None;
1660
1661            match resolved {
1662                ResolvedStage::Inspect(f) => {
1663                    let res = stage.inspect_next(barrier, wait, true, |data, len| {
1664                        let n = clamp_len(len, data.len());
1665                        if let Err(e) = f(&data[..n]) {
1666                            stage_err = Some(ExecError::with_context(
1667                                "resolved stage failed",
1668                                format!("stage={key}: {e}"),
1669                            ));
1670                            return Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped);
1671                        }
1672                        out.push(if len > 0 { data[0] } else { 0 });
1673                        Ok(())
1674                    });
1675
1676                    match res {
1677                        Ok(_) => {}
1678                        Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
1679                            return Err(stage_err.unwrap_or_else(|| {
1680                                ExecError::with_context(
1681                                    "resolved stage stopped",
1682                                    format!("stage={key}"),
1683                                )
1684                            }));
1685                        }
1686                        Err(e) => return Err(ExecError::with_source("stage failed", e)),
1687                    }
1688                }
1689                _ => {
1690                    let res = stage.process_next(barrier, wait, true, |data, len| {
1691                        let n_in = clamp_len(*len, data.len());
1692                        let input = &data[..n_in];
1693                        let out_buf = &mut scratch[..n_in];
1694
1695                        let mut set_err = |e: ExecError| {
1696                            stage_err = Some(e);
1697                            Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped)
1698                        };
1699
1700                        match resolved {
1701                            ResolvedStage::MapOk(f) => {
1702                                let n = f(input, out_buf);
1703                                if n > input.len() {
1704                                    return set_err(ExecError::with_context(
1705                                        "resolved stage output too large",
1706                                        format!("stage={key} out={n} in={}", input.len()),
1707                                    ));
1708                                }
1709                                data[..n].copy_from_slice(&out_buf[..n]);
1710                                *len = n as u32;
1711                            }
1712                            ResolvedStage::Map(f) => match f(input, out_buf) {
1713                                Ok(n) => {
1714                                    if n > input.len() {
1715                                        return set_err(ExecError::with_context(
1716                                            "resolved stage output too large",
1717                                            format!(
1718                                                "stage={key} out={n} in={} (fallible)",
1719                                                input.len()
1720                                            ),
1721                                        ));
1722                                    }
1723                                    data[..n].copy_from_slice(&out_buf[..n]);
1724                                    *len = n as u32;
1725                                }
1726                                Err(e) => {
1727                                    return set_err(ExecError::with_context(
1728                                        "resolved stage failed",
1729                                        format!("stage={key}: {e}"),
1730                                    ));
1731                                }
1732                            },
1733                            ResolvedStage::Filter(f) => match f(input) {
1734                                Ok(true) => {}
1735                                Ok(false) => {
1736                                    *len = 0;
1737                                }
1738                                Err(e) => {
1739                                    return set_err(ExecError::with_context(
1740                                        "resolved stage failed",
1741                                        format!("stage={key}: {e}"),
1742                                    ));
1743                                }
1744                            },
1745                            ResolvedStage::Inspect(f) => match f(input) {
1746                                Ok(()) => {}
1747                                Err(e) => {
1748                                    return set_err(ExecError::with_context(
1749                                        "resolved stage failed",
1750                                        format!("stage={key}: {e}"),
1751                                    ));
1752                                }
1753                            },
1754                            ResolvedStage::StatefulTransform(st) => match st.call(input, out_buf) {
1755                                Ok(TransformOutcome::Drop) => {
1756                                    *len = 0;
1757                                }
1758                                Ok(TransformOutcome::ForwardInput) => {}
1759                                Ok(TransformOutcome::ForwardOutput(n)) => {
1760                                    if n > input.len() {
1761                                        return set_err(ExecError::with_context(
1762                                            "resolved stage output too large",
1763                                            format!(
1764                                                "stage={key} out={n} in={} (stateful transform)",
1765                                                input.len()
1766                                            ),
1767                                        ));
1768                                    }
1769                                    data[..n].copy_from_slice(&out_buf[..n]);
1770                                    *len = n as u32;
1771                                }
1772                                Ok(_) => {
1773                                    return set_err(ExecError::new(
1774                                        "unsupported transform outcome (non-exhaustive)",
1775                                    ));
1776                                }
1777                                Err(e) => {
1778                                    return set_err(ExecError::with_context(
1779                                        "resolved stage failed",
1780                                        format!("stage={key}: {e}"),
1781                                    ));
1782                                }
1783                            },
1784                            ResolvedStage::Stateful(st) => match st.call(input, out_buf) {
1785                                Ok(n) => {
1786                                    if n > input.len() {
1787                                        return set_err(ExecError::with_context(
1788                                            "resolved stage output too large",
1789                                            format!(
1790                                                "stage={key} out={n} in={} (stateful)",
1791                                                input.len()
1792                                            ),
1793                                        ));
1794                                    }
1795                                    data[..n].copy_from_slice(&out_buf[..n]);
1796                                    *len = n as u32;
1797                                }
1798                                Err(e) => {
1799                                    return set_err(ExecError::with_context(
1800                                        "resolved stage failed",
1801                                        format!("stage={key}: {e}"),
1802                                    ));
1803                                }
1804                            },
1805                        }
1806
1807                        out.push(if *len > 0 { data[0] } else { 0 });
1808                        Ok(())
1809                    });
1810
1811                    match res {
1812                        Ok(_) => {}
1813                        Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
1814                            return Err(stage_err.unwrap_or_else(|| {
1815                                ExecError::with_context(
1816                                    "resolved stage stopped",
1817                                    format!("stage={key}"),
1818                                )
1819                            }));
1820                        }
1821                        Err(e) => return Err(ExecError::with_source("stage failed", e)),
1822                    }
1823                }
1824            }
1825        }
1826    }
1827
1828    Ok(())
1829}
1830
1831fn run_consumer_step_collect_bytes<W: byteor_pipeline_kernel::WaitStrategy>(
1832    stage: &mut byteor_pipeline_backings_shm::ShmSequencedSlotsStage,
1833    barrier: &StageBarrier,
1834    wait: &mut W,
1835    op: &mut StageExecOp,
1836    scratch: &mut [u8],
1837    out: &mut Vec<u8>,
1838) -> Result<(), ExecError> {
1839    out.clear();
1840
1841    match op {
1842        StageExecOp::BuiltIn(op) => match op {
1843            StageOpV1::Identity => {
1844                stage
1845                    .inspect_next(barrier, wait, true, |data, len| {
1846                        let n = clamp_len(len, data.len());
1847                        out.extend_from_slice(&data[..n]);
1848                        Ok(())
1849                    })
1850                    .map_err(|e| ExecError::with_source("stage failed", e))?;
1851            }
1852            StageOpV1::AddU8 { delta } => {
1853                let delta = *delta;
1854                stage
1855                    .process_next(barrier, wait, true, |data, len| {
1856                        let n = clamp_len(*len, data.len());
1857                        op_add_u8(&mut data[..n], delta);
1858                        out.extend_from_slice(&data[..n]);
1859                        Ok(())
1860                    })
1861                    .map_err(|e| ExecError::with_source("stage failed", e))?;
1862            }
1863            StageOpV1::ResolverKey { stage } => {
1864                return Err(ExecError::with_context(
1865                    "unresolved stage",
1866                    format!("stage={stage}"),
1867                ));
1868            }
1869        },
1870        StageExecOp::Resolved {
1871            key,
1872            stage: resolved,
1873        } => {
1874            let mut stage_err: Option<ExecError> = None;
1875
1876            match resolved {
1877                ResolvedStage::Inspect(f) => {
1878                    let res = stage.inspect_next(barrier, wait, true, |data, len| {
1879                        let n = clamp_len(len, data.len());
1880                        if let Err(e) = f(&data[..n]) {
1881                            stage_err = Some(ExecError::with_context(
1882                                "resolved stage failed",
1883                                format!("stage={key}: {e}"),
1884                            ));
1885                            return Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped);
1886                        }
1887                        out.extend_from_slice(&data[..n]);
1888                        Ok(())
1889                    });
1890
1891                    match res {
1892                        Ok(_) => {}
1893                        Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
1894                            return Err(stage_err
1895                                .unwrap_or_else(|| ExecError::new("resolved stage stopped")));
1896                        }
1897                        Err(e) => return Err(ExecError::with_source("stage failed", e)),
1898                    }
1899                }
1900                _ => {
1901                    let res = stage.process_next(barrier, wait, true, |data, len| {
1902                        let n_in = clamp_len(*len, data.len());
1903                        let input = &data[..n_in];
1904                        let out_buf = &mut scratch[..n_in];
1905
1906                        let mut set_err = |e: ExecError| {
1907                            stage_err = Some(e);
1908                            Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped)
1909                        };
1910
1911                        match resolved {
1912                            ResolvedStage::MapOk(f) => {
1913                                let n = f(input, out_buf);
1914                                if n > input.len() {
1915                                    return set_err(ExecError::with_context(
1916                                        "resolved stage output too large",
1917                                        format!("stage={key} out={n} in={}", input.len()),
1918                                    ));
1919                                }
1920                                data[..n].copy_from_slice(&out_buf[..n]);
1921                                *len = n as u32;
1922                            }
1923                            ResolvedStage::Map(f) => match f(input, out_buf) {
1924                                Ok(n) => {
1925                                    if n > input.len() {
1926                                        return set_err(ExecError::with_context(
1927                                            "resolved stage output too large",
1928                                            format!(
1929                                                "stage={key} out={n} in={} (fallible)",
1930                                                input.len()
1931                                            ),
1932                                        ));
1933                                    }
1934                                    data[..n].copy_from_slice(&out_buf[..n]);
1935                                    *len = n as u32;
1936                                }
1937                                Err(e) => {
1938                                    return set_err(ExecError::with_context(
1939                                        "resolved stage failed",
1940                                        format!("stage={key}: {e}"),
1941                                    ));
1942                                }
1943                            },
1944                            ResolvedStage::Filter(f) => match f(input) {
1945                                Ok(true) => {}
1946                                Ok(false) => {
1947                                    *len = 0;
1948                                }
1949                                Err(e) => {
1950                                    return set_err(ExecError::with_context(
1951                                        "resolved stage failed",
1952                                        format!("stage={key}: {e}"),
1953                                    ));
1954                                }
1955                            },
1956                            ResolvedStage::Inspect(f) => match f(input) {
1957                                Ok(()) => {}
1958                                Err(e) => {
1959                                    return set_err(ExecError::with_context(
1960                                        "resolved stage failed",
1961                                        format!("stage={key}: {e}"),
1962                                    ));
1963                                }
1964                            },
1965                            ResolvedStage::StatefulTransform(st) => match st.call(input, out_buf) {
1966                                Ok(TransformOutcome::Drop) => {
1967                                    *len = 0;
1968                                }
1969                                Ok(TransformOutcome::ForwardInput) => {}
1970                                Ok(TransformOutcome::ForwardOutput(n)) => {
1971                                    if n > input.len() {
1972                                        return set_err(ExecError::with_context(
1973                                            "resolved stage output too large",
1974                                            format!(
1975                                                "stage={key} out={n} in={} (stateful transform)",
1976                                                input.len()
1977                                            ),
1978                                        ));
1979                                    }
1980                                    data[..n].copy_from_slice(&out_buf[..n]);
1981                                    *len = n as u32;
1982                                }
1983                                Ok(_) => {
1984                                    return set_err(ExecError::new(
1985                                        "unsupported transform outcome (non-exhaustive)",
1986                                    ));
1987                                }
1988                                Err(e) => {
1989                                    return set_err(ExecError::with_context(
1990                                        "resolved stage failed",
1991                                        format!("stage={key}: {e}"),
1992                                    ));
1993                                }
1994                            },
1995                            ResolvedStage::Stateful(st) => match st.call(input, out_buf) {
1996                                Ok(n) => {
1997                                    if n > input.len() {
1998                                        return set_err(ExecError::with_context(
1999                                            "resolved stage output too large",
2000                                            format!(
2001                                                "stage={key} out={n} in={} (stateful)",
2002                                                input.len()
2003                                            ),
2004                                        ));
2005                                    }
2006                                    data[..n].copy_from_slice(&out_buf[..n]);
2007                                    *len = n as u32;
2008                                }
2009                                Err(e) => {
2010                                    return set_err(ExecError::with_context(
2011                                        "resolved stage failed",
2012                                        format!("stage={key}: {e}"),
2013                                    ));
2014                                }
2015                            },
2016                        }
2017
2018                        let n_out = clamp_len(*len, data.len());
2019                        out.extend_from_slice(&data[..n_out]);
2020                        Ok(())
2021                    });
2022
2023                    match res {
2024                        Ok(_) => {}
2025                        Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
2026                            return Err(stage_err
2027                                .unwrap_or_else(|| ExecError::new("resolved stage stopped")));
2028                        }
2029                        Err(e) => return Err(ExecError::with_source("stage failed", e)),
2030                    }
2031                }
2032            }
2033        }
2034    }
2035
2036    Ok(())
2037}
2038
2039fn shard_lane_path(base: &std::path::Path, shard_idx: u32) -> std::path::PathBuf {
2040    let dir = base.parent().unwrap_or_else(|| std::path::Path::new("."));
2041    let stem = base
2042        .file_stem()
2043        .and_then(|s| s.to_str())
2044        .unwrap_or("byteor-single-ring");
2045    let ext = base.extension().and_then(|s| s.to_str());
2046    let name = match ext {
2047        Some(ext) => format!("{stem}-shard{shard_idx}.{ext}"),
2048        None => format!("{stem}-shard{shard_idx}"),
2049    };
2050    dir.join(name)
2051}
2052
2053/// Run a PerKey-sharded SingleRing spec by partitioning input by shard key.
2054///
2055/// v1 sharding contract:
2056/// - the shard key is extracted deterministically from the message payload
2057/// - messages routed to the same shard preserve strict order
2058/// - no cross-shard ordering is implied
2059///
2060/// Returns one output vector per shard (in shard index order).
2061pub fn run_single_ring_shm_per_key_sharded_with_wait(
2062    spec: &PipelineSpecV1,
2063    lane_path: std::path::PathBuf,
2064    input: &[u8],
2065    wait: WaitPreset,
2066) -> Result<Vec<alloc::vec::Vec<u8>>, ExecError> {
2067    run_single_ring_shm_per_key_sharded_with_wait_and_resolver(spec, lane_path, input, wait, None)
2068}
2069
2070/// Run a PerKey-sharded SingleRing spec by partitioning input by shard key,
2071/// resolving any `ResolverKey` stages using `resolver` (cold path).
2072///
2073/// Returns one output vector per shard (in shard index order).
2074pub fn run_single_ring_shm_per_key_sharded_with_wait_and_resolver(
2075    spec: &PipelineSpecV1,
2076    lane_path: std::path::PathBuf,
2077    input: &[u8],
2078    wait: WaitPreset,
2079    resolver: Option<&dyn StageResolver>,
2080) -> Result<Vec<alloc::vec::Vec<u8>>, ExecError> {
2081    run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_prefault(
2082        spec, lane_path, input, wait, resolver, false,
2083    )
2084}
2085
2086/// Run a PerKey-sharded SingleRing spec by partitioning input by shard key,
2087/// resolving any `ResolverKey` stages using `resolver` (cold path), with explicit control over
2088/// SHM prefault.
2089pub fn run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_prefault(
2090    spec: &PipelineSpecV1,
2091    lane_path: std::path::PathBuf,
2092    input: &[u8],
2093    wait: WaitPreset,
2094    resolver: Option<&dyn StageResolver>,
2095    prefault: bool,
2096) -> Result<Vec<alloc::vec::Vec<u8>>, ExecError> {
2097    run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_thread_init_and_prefault(
2098        spec, lane_path, input, wait, resolver, None, prefault,
2099    )
2100}
2101
2102/// Run a PerKey-sharded SingleRing spec by partitioning input by shard key,
2103/// resolving any `ResolverKey` stages using `resolver` (cold path), and invoking `thread_init`
2104/// for each stage thread (within each shard's executor).
2105///
2106/// Returns one output vector per shard (in shard index order).
2107pub fn run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_thread_init(
2108    spec: &PipelineSpecV1,
2109    lane_path: std::path::PathBuf,
2110    input: &[u8],
2111    wait: WaitPreset,
2112    resolver: Option<&dyn StageResolver>,
2113    thread_init: Option<ThreadInitHook>,
2114) -> Result<Vec<alloc::vec::Vec<u8>>, ExecError> {
2115    run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_thread_init_and_prefault(
2116        spec,
2117        lane_path,
2118        input,
2119        wait,
2120        resolver,
2121        thread_init,
2122        false,
2123    )
2124}
2125
2126/// Run a PerKey-sharded SingleRing spec by partitioning input by shard key,
2127/// resolving any `ResolverKey` stages using `resolver` (cold path), invoking `thread_init`
2128/// for each stage thread, and explicitly controlling SHM prefault.
2129pub fn run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_thread_init_and_prefault(
2130    spec: &PipelineSpecV1,
2131    lane_path: std::path::PathBuf,
2132    input: &[u8],
2133    wait: WaitPreset,
2134    resolver: Option<&dyn StageResolver>,
2135    thread_init: Option<ThreadInitHook>,
2136    prefault: bool,
2137) -> Result<Vec<alloc::vec::Vec<u8>>, ExecError> {
2138    validate_v1(spec).map_err(|e| ExecError::with_source("spec validation failed", e))?;
2139    let ring = match spec {
2140        PipelineSpecV1::SingleRing(ring) => ring,
2141        _ => return Err(ExecError::new("spec is not SingleRing")),
2142    };
2143
2144    let shards = ring.shards;
2145    if shards <= 1 {
2146        return Ok(alloc::vec::Vec::from([
2147            run_single_ring_shm_with_wait_and_resolver_and_thread_init_and_prefault(
2148                spec,
2149                lane_path,
2150                input,
2151                wait,
2152                resolver,
2153                thread_init,
2154                prefault,
2155            )?,
2156        ]));
2157    }
2158
2159    // Partition input by shard key.
2160    let mut per_shard: alloc::vec::Vec<alloc::vec::Vec<u8>> =
2161        (0..shards).map(|_| alloc::vec::Vec::new()).collect();
2162    for &b in input {
2163        // v1 key extraction: first byte of payload (the only byte in this runner).
2164        let shard = (b as u32) % shards;
2165        per_shard[shard as usize].push(b);
2166    }
2167
2168    // Run each shard independently over its own SHM file.
2169    //
2170    // Note: we use scoped threads so we can borrow `resolver` without requiring a `static`
2171    // lifetime or forcing all callers to allocate an `Arc<dyn StageResolver>`.
2172    let mut out = alloc::vec::Vec::with_capacity(shards as usize);
2173    std::thread::scope(|scope| {
2174        let mut handles = alloc::vec::Vec::with_capacity(shards as usize);
2175        for shard_idx in 0..shards {
2176            let mut spec_t = spec.clone();
2177            if let PipelineSpecV1::SingleRing(r) = &mut spec_t {
2178                r.shards = 1;
2179            }
2180            let path_t = shard_lane_path(&lane_path, shard_idx);
2181            let input_t = core::mem::take(&mut per_shard[shard_idx as usize]);
2182            let thread_init_t = thread_init.clone();
2183            handles.push(scope.spawn(move || {
2184                run_single_ring_shm_with_wait_and_resolver_and_thread_init_and_prefault(
2185                    &spec_t,
2186                    path_t,
2187                    &input_t,
2188                    wait,
2189                    resolver,
2190                    thread_init_t,
2191                    prefault,
2192                )
2193            }));
2194        }
2195
2196        for h in handles {
2197            out.push(h.join().map_err(|_| ExecError::new("stage panic"))??);
2198        }
2199        Ok::<(), ExecError>(())
2200    })?;
2201
2202    Ok(out)
2203}