1use alloc::vec::Vec;
4
5use byteor_pipeline_kernel::step::TransformOutcome;
6use byteor_pipeline_spec::{validate_v1, PipelineSpecV1, StageOpV1};
7
8use crate::ExecError;
9use crate::{resolve_or_err, ResolvedStage, StageResolver};
10use crate::{RoleKind, ThreadInitContext, ThreadInitHook};
11
12type InspectFn = fn(&[u8]);
13type ProcessFn = fn(&mut [u8], u8);
14
15fn op_noop(_data: &[u8]) {}
16
17fn op_add_u8(data: &mut [u8], delta: u8) {
18 for b in data {
19 *b = b.wrapping_add(delta);
20 }
21}
22
23fn clamp_len(len: u32, max: usize) -> usize {
24 (len as usize).min(max)
25}
26
27enum StageExecOp {
28 BuiltIn(StageOpV1),
29 Resolved { key: String, stage: ResolvedStage },
30}
31
32fn resolve_stage_op(
33 op: StageOpV1,
34 resolver: Option<&dyn StageResolver>,
35) -> Result<StageExecOp, ExecError> {
36 match op {
37 StageOpV1::ResolverKey { stage } => {
38 let Some(resolver) = resolver else {
39 return Err(ExecError::new("missing stage resolver"));
40 };
41 let resolved = resolve_or_err(resolver, &stage).map_err(|e| {
42 ExecError::with_context("resolve stage failed", format!("stage={stage}: {e}"))
43 })?;
44 Ok(StageExecOp::Resolved {
45 key: stage,
46 stage: resolved,
47 })
48 }
49 other => Ok(StageExecOp::BuiltIn(other)),
50 }
51}
52
53#[derive(Clone, Copy, Debug, PartialEq, Eq)]
60pub enum WaitPreset {
61 MaxPerfSpin,
63 StdBackoff,
65 Yield,
67}
68
69struct JoinBarrier {
70 deps: Vec<byteor_pipeline_backings_shm::GatingBarrier>,
71}
72
73impl byteor_pipeline_kernel::SeqBarrier for JoinBarrier {
74 fn available(&self) -> u64 {
75 self.deps.iter().map(|b| b.available()).min().unwrap_or(0)
76 }
77}
78
79enum StageBarrier {
80 Cursor(byteor_pipeline_backings_shm::CursorBarrier),
81 Gating(byteor_pipeline_backings_shm::GatingBarrier),
82 Join(JoinBarrier),
83}
84
85impl byteor_pipeline_kernel::SeqBarrier for StageBarrier {
86 fn available(&self) -> u64 {
87 match self {
88 StageBarrier::Cursor(b) => b.available(),
89 StageBarrier::Gating(b) => b.available(),
90 StageBarrier::Join(b) => b.available(),
91 }
92 }
93}
94
95enum WaitHolder {
96 Spin(indexbus_core::SpinWait),
97 Backoff(indexbus_core::StdBackoff),
98 Yield(indexbus_core::YieldWait),
99}
100
101impl WaitHolder {
102 #[inline]
103 fn wait(&mut self) {
104 match self {
105 WaitHolder::Spin(w) => byteor_pipeline_kernel::WaitStrategy::wait(w),
106 WaitHolder::Backoff(w) => byteor_pipeline_kernel::WaitStrategy::wait(w),
107 WaitHolder::Yield(w) => byteor_pipeline_kernel::WaitStrategy::wait(w),
108 }
109 }
110}
111
112fn current_allowed_cpus() -> Vec<usize> {
113 #[cfg(target_os = "linux")]
114 {
115 affinity::get_thread_affinity().unwrap_or_default()
116 }
117
118 #[cfg(not(target_os = "linux"))]
119 {
120 Vec::new()
121 }
122}
123
124fn main_thread_cpu(cpus: &[usize]) -> Option<usize> {
125 if cpus.len() > 1 {
126 cpus.first().copied()
127 } else {
128 None
129 }
130}
131
132fn worker_thread_cpu(cpus: &[usize], worker_idx: usize) -> Option<usize> {
133 if cpus.len() <= 1 {
134 return None;
135 }
136
137 Some(cpus[1 + (worker_idx % (cpus.len() - 1))])
138}
139
140fn runtime_stage_cpu(cpus: &[usize], stage_idx: usize) -> Option<usize> {
141 if cpus.is_empty() {
142 return None;
143 }
144
145 Some(cpus[stage_idx % cpus.len()])
146}
147
148struct ThreadAffinityGuard {
149 previous: Vec<usize>,
150 active: bool,
151}
152
153impl ThreadAffinityGuard {
154 fn inactive() -> Self {
155 Self {
156 previous: Vec::new(),
157 active: false,
158 }
159 }
160
161 fn pin(cpu: Option<usize>) -> Result<Self, ExecError> {
162 #[cfg(target_os = "linux")]
163 {
164 let Some(cpu) = cpu else {
165 return Ok(Self::inactive());
166 };
167
168 let previous = affinity::get_thread_affinity().map_err(|e| {
169 ExecError::with_context("read thread affinity failed", e.to_string())
170 })?;
171
172 if previous.len() == 1 && previous[0] == cpu {
173 return Ok(Self {
174 previous,
175 active: false,
176 });
177 }
178
179 affinity::set_thread_affinity([cpu]).map_err(|e| {
180 ExecError::with_context("set thread affinity failed", format!("cpu={cpu}: {e}"))
181 })?;
182
183 Ok(Self {
184 previous,
185 active: true,
186 })
187 }
188
189 #[cfg(not(target_os = "linux"))]
190 {
191 let _ = cpu;
192 Ok(Self::inactive())
193 }
194 }
195}
196
197impl Drop for ThreadAffinityGuard {
198 fn drop(&mut self) {
199 #[cfg(target_os = "linux")]
200 if self.active && !self.previous.is_empty() {
201 let _ = affinity::set_thread_affinity(&self.previous);
202 }
203 }
204}
205
206fn run_stage_loop_stoppable<W: byteor_pipeline_kernel::WaitStrategy>(
207 stage: &mut byteor_pipeline_backings_shm::ShmSequencedSlotsStage,
208 barrier: &StageBarrier,
209 wait: &mut W,
210 stop: &std::sync::atomic::AtomicBool,
211 op: &mut StageExecOp,
212 iterations: usize,
213 release: bool,
214) -> Result<(), ExecError> {
215 match op {
216 StageExecOp::BuiltIn(op) => match op {
217 StageOpV1::Identity => {
218 let inspect_fn: InspectFn = op_noop;
219 for _ in 0..iterations {
220 if stop.load(std::sync::atomic::Ordering::Relaxed) {
221 break;
222 }
223
224 let res =
225 stage.inspect_next_stoppable(barrier, wait, release, stop, |data, _len| {
226 inspect_fn(data);
227 Ok(())
228 });
229
230 match res {
231 Ok(_) => {}
232 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => break,
233 Err(e) => return Err(ExecError::with_source("stage failed", e)),
234 }
235 }
236 }
237 StageOpV1::AddU8 { delta } => {
238 let delta = *delta;
239 let process_fn: ProcessFn = op_add_u8;
240 for _ in 0..iterations {
241 if stop.load(std::sync::atomic::Ordering::Relaxed) {
242 break;
243 }
244
245 let res =
246 stage.process_next_stoppable(barrier, wait, release, stop, |data, len| {
247 let n = clamp_len(*len, data.len());
248 process_fn(&mut data[..n], delta);
249 Ok(())
250 });
251
252 match res {
253 Ok(_) => {}
254 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => break,
255 Err(e) => return Err(ExecError::with_source("stage failed", e)),
256 }
257 }
258 }
259 StageOpV1::ResolverKey { stage } => {
260 return Err(ExecError::with_context(
261 "unresolved stage",
262 format!("stage={stage}"),
263 ));
264 }
265 },
266 StageExecOp::Resolved {
267 key,
268 stage: resolved,
269 } => {
270 let mut scratch = vec![0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
271
272 for _ in 0..iterations {
273 if stop.load(std::sync::atomic::Ordering::Relaxed) {
274 break;
275 }
276
277 let mut stage_err: Option<ExecError> = None;
278
279 match resolved {
280 ResolvedStage::Inspect(f) => {
281 let res = stage.inspect_next_stoppable(
282 barrier,
283 wait,
284 release,
285 stop,
286 |data, len| {
287 let n = clamp_len(len, data.len());
288 if let Err(e) = f(&data[..n]) {
289 stage_err = Some(ExecError::with_context(
290 "resolved stage failed",
291 format!("stage={key}: {e}"),
292 ));
293 return Err(
294 byteor_pipeline_backings_shm::SequencedSlotsError::Stopped,
295 );
296 }
297 Ok(())
298 },
299 );
300
301 match res {
302 Ok(_) => {}
303 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
304 if let Some(e) = stage_err {
305 return Err(e);
306 }
307 break;
308 }
309 Err(e) => return Err(ExecError::with_source("stage failed", e)),
310 }
311 }
312 _ => {
313 let res = stage.process_next_stoppable(
314 barrier,
315 wait,
316 release,
317 stop,
318 |data, len| {
319 let n_in = clamp_len(*len, data.len());
320 let input = &data[..n_in];
321 let out = &mut scratch[..n_in];
322
323 let mut set_err = |e: ExecError| {
324 stage_err = Some(e);
325 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped)
326 };
327
328 match resolved {
329 ResolvedStage::MapOk(f) => {
330 let n = f(input, out);
331 if n > input.len() {
332 return set_err(ExecError::with_context(
333 "resolved stage output too large",
334 format!("stage={key} out={n} in={}", input.len()),
335 ));
336 }
337 data[..n].copy_from_slice(&out[..n]);
338 *len = n as u32;
339 Ok(())
340 }
341 ResolvedStage::Map(f) => match f(input, out) {
342 Ok(n) => {
343 if n > input.len() {
344 return set_err(ExecError::with_context(
345 "resolved stage output too large",
346 format!(
347 "stage={key} out={n} in={} (fallible)",
348 input.len()
349 ),
350 ));
351 }
352 data[..n].copy_from_slice(&out[..n]);
353 *len = n as u32;
354 Ok(())
355 }
356 Err(e) => set_err(ExecError::with_context(
357 "resolved stage failed",
358 format!("stage={key}: {e}"),
359 )),
360 },
361 ResolvedStage::Filter(f) => match f(input) {
362 Ok(true) => Ok(()),
363 Ok(false) => {
364 *len = 0;
365 Ok(())
366 }
367 Err(e) => set_err(ExecError::with_context(
368 "resolved stage failed",
369 format!("stage={key}: {e}"),
370 )),
371 },
372 ResolvedStage::Inspect(f) => match f(input) {
373 Ok(()) => Ok(()),
374 Err(e) => set_err(ExecError::with_context(
375 "resolved stage failed",
376 format!("stage={key}: {e}"),
377 )),
378 },
379 ResolvedStage::StatefulTransform(st) => match st.call(input, out)
380 {
381 Ok(TransformOutcome::Drop) => {
382 *len = 0;
383 Ok(())
384 }
385 Ok(TransformOutcome::ForwardInput) => Ok(()),
386 Ok(TransformOutcome::ForwardOutput(n)) => {
387 if n > input.len() {
388 return set_err(ExecError::with_context(
389 "resolved stage output too large",
390 format!(
391 "stage={key} out={n} in={} (stateful transform)",
392 input.len()
393 ),
394 ));
395 }
396 data[..n].copy_from_slice(&out[..n]);
397 *len = n as u32;
398 Ok(())
399 }
400 Ok(_) => set_err(ExecError::new(
401 "unsupported transform outcome (non-exhaustive)",
402 )),
403 Err(e) => set_err(ExecError::with_context(
404 "resolved stage failed",
405 format!("stage={key}: {e}"),
406 )),
407 },
408 ResolvedStage::Stateful(st) => match st.call(input, out) {
409 Ok(n) => {
410 if n > input.len() {
411 return set_err(ExecError::with_context(
412 "resolved stage output too large",
413 format!(
414 "stage={key} out={n} in={} (stateful)",
415 input.len()
416 ),
417 ));
418 }
419 data[..n].copy_from_slice(&out[..n]);
420 *len = n as u32;
421 Ok(())
422 }
423 Err(e) => set_err(ExecError::with_context(
424 "resolved stage failed",
425 format!("stage={key}: {e}"),
426 )),
427 },
428 }
429 },
430 );
431
432 match res {
433 Ok(_) => {}
434 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
435 if let Some(e) = stage_err {
436 return Err(e);
437 }
438 break;
439 }
440 Err(e) => return Err(ExecError::with_source("stage failed", e)),
441 }
442 }
443 }
444 }
445 }
446 }
447
448 Ok(())
449}
450
451fn run_stage_loop_iters<W: byteor_pipeline_kernel::WaitStrategy>(
452 stage: &mut byteor_pipeline_backings_shm::ShmSequencedSlotsStage,
453 barrier: &StageBarrier,
454 wait: &mut W,
455 op: &mut StageExecOp,
456 iterations: usize,
457 release: bool,
458) -> Result<(), ExecError> {
459 match op {
460 StageExecOp::BuiltIn(op) => match op {
461 StageOpV1::Identity => {
462 let inspect_fn: InspectFn = op_noop;
463 for _ in 0..iterations {
464 stage
465 .inspect_next(barrier, wait, release, |data, _len| {
466 inspect_fn(data);
467 Ok(())
468 })
469 .map_err(|e| ExecError::with_source("stage failed", e))?;
470 }
471 }
472 StageOpV1::AddU8 { delta } => {
473 let delta = *delta;
474 let process_fn: ProcessFn = op_add_u8;
475 for _ in 0..iterations {
476 stage
477 .process_next(barrier, wait, release, |data, len| {
478 let n = clamp_len(*len, data.len());
479 process_fn(&mut data[..n], delta);
480 Ok(())
481 })
482 .map_err(|e| ExecError::with_source("stage failed", e))?;
483 }
484 }
485 StageOpV1::ResolverKey { stage } => {
486 return Err(ExecError::with_context(
487 "unresolved stage",
488 format!("stage={stage}"),
489 ));
490 }
491 },
492 StageExecOp::Resolved {
493 key,
494 stage: resolved,
495 } => {
496 let mut scratch = vec![0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
497 for _ in 0..iterations {
498 let mut stage_err: Option<ExecError> = None;
499
500 match resolved {
501 ResolvedStage::Inspect(f) => {
502 let res = stage.inspect_next(barrier, wait, release, |data, len| {
503 let n = clamp_len(len, data.len());
504 if let Err(e) = f(&data[..n]) {
505 stage_err = Some(ExecError::with_context(
506 "resolved stage failed",
507 format!("stage={key}: {e}"),
508 ));
509 return Err(
510 byteor_pipeline_backings_shm::SequencedSlotsError::Stopped,
511 );
512 }
513 Ok(())
514 });
515
516 match res {
517 Ok(_) => {}
518 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
519 return Err(stage_err
520 .unwrap_or_else(|| ExecError::new("resolved stage stopped")));
521 }
522 Err(e) => return Err(ExecError::with_source("stage failed", e)),
523 }
524 }
525 _ => {
526 let res = stage.process_next(barrier, wait, release, |data, len| {
527 let n_in = clamp_len(*len, data.len());
528 let input = &data[..n_in];
529 let out = &mut scratch[..n_in];
530
531 let mut set_err = |e: ExecError| {
532 stage_err = Some(e);
533 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped)
534 };
535
536 match resolved {
537 ResolvedStage::MapOk(f) => {
538 let n = f(input, out);
539 if n > input.len() {
540 return set_err(ExecError::with_context(
541 "resolved stage output too large",
542 format!("stage={key} out={n} in={}", input.len()),
543 ));
544 }
545 data[..n].copy_from_slice(&out[..n]);
546 *len = n as u32;
547 Ok(())
548 }
549 ResolvedStage::Map(f) => match f(input, out) {
550 Ok(n) => {
551 if n > input.len() {
552 return set_err(ExecError::with_context(
553 "resolved stage output too large",
554 format!(
555 "stage={key} out={n} in={} (fallible)",
556 input.len()
557 ),
558 ));
559 }
560 data[..n].copy_from_slice(&out[..n]);
561 *len = n as u32;
562 Ok(())
563 }
564 Err(e) => set_err(ExecError::with_context(
565 "resolved stage failed",
566 format!("stage={key}: {e}"),
567 )),
568 },
569 ResolvedStage::Filter(f) => match f(input) {
570 Ok(true) => Ok(()),
571 Ok(false) => {
572 *len = 0;
573 Ok(())
574 }
575 Err(e) => set_err(ExecError::with_context(
576 "resolved stage failed",
577 format!("stage={key}: {e}"),
578 )),
579 },
580 ResolvedStage::Inspect(f) => match f(input) {
581 Ok(()) => Ok(()),
582 Err(e) => set_err(ExecError::with_context(
583 "resolved stage failed",
584 format!("stage={key}: {e}"),
585 )),
586 },
587 ResolvedStage::StatefulTransform(st) => match st.call(input, out)
588 {
589 Ok(TransformOutcome::Drop) => {
590 *len = 0;
591 Ok(())
592 }
593 Ok(TransformOutcome::ForwardInput) => Ok(()),
594 Ok(TransformOutcome::ForwardOutput(n)) => {
595 if n > input.len() {
596 return set_err(ExecError::with_context(
597 "resolved stage output too large",
598 format!(
599 "stage={key} out={n} in={} (stateful transform)",
600 input.len()
601 ),
602 ));
603 }
604 data[..n].copy_from_slice(&out[..n]);
605 *len = n as u32;
606 Ok(())
607 }
608 Ok(_) => set_err(ExecError::new(
609 "unsupported transform outcome (non-exhaustive)",
610 )),
611 Err(e) => set_err(ExecError::with_context(
612 "resolved stage failed",
613 format!("stage={key}: {e}"),
614 )),
615 },
616 ResolvedStage::Stateful(st) => match st.call(input, out) {
617 Ok(n) => {
618 if n > input.len() {
619 return set_err(ExecError::with_context(
620 "resolved stage output too large",
621 format!(
622 "stage={key} out={n} in={} (stateful)",
623 input.len()
624 ),
625 ));
626 }
627 data[..n].copy_from_slice(&out[..n]);
628 *len = n as u32;
629 Ok(())
630 }
631 Err(e) => set_err(ExecError::with_context(
632 "resolved stage failed",
633 format!("stage={key}: {e}"),
634 )),
635 },
636 }
637 });
638
639 match res {
640 Ok(_) => {}
641 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
642 return Err(stage_err
643 .unwrap_or_else(|| ExecError::new("resolved stage stopped")));
644 }
645 Err(e) => return Err(ExecError::with_source("stage failed", e)),
646 }
647 }
648 }
649 }
650 }
651 }
652
653 Ok(())
654}
655
656pub struct SingleRingSpRuntime {
658 stop: std::sync::Arc<std::sync::atomic::AtomicBool>,
659 lane_path: std::path::PathBuf,
660 active_gating: usize,
661 handles: Vec<(usize, std::thread::JoinHandle<Result<(), ExecError>>)>,
662}
663
664impl SingleRingSpRuntime {
665 pub fn stage_count(&self) -> usize {
667 self.handles.len()
668 }
669
670 pub fn active_gating(&self) -> usize {
672 self.active_gating
673 }
674
675 pub fn snapshot(&self) -> Result<byteor_pipeline_backings_shm::SingleRingSnapshot, ExecError> {
677 let lane = "ignored";
678 let opts = byteor_pipeline_backings_shm::AttachOptions {
679 blocking: false,
680 prefault: false,
681 path: Some(self.lane_path.clone()),
682 queue: 0,
683 };
684 let shm =
685 byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, lane).map_err(|e| {
686 ExecError::with_context_source(
687 "attach shm failed",
688 format!("path={} lane={lane}", self.lane_path.display()),
689 e,
690 )
691 })?;
692 Ok(shm.snapshot())
693 }
694
695 pub fn any_stage_finished(&self) -> bool {
699 self.handles.iter().any(|(_, h)| h.is_finished())
700 }
701
702 pub fn request_stop(&self) {
704 self.stop.store(true, std::sync::atomic::Ordering::Relaxed);
705 }
706
707 pub fn stop_and_join_bounded(mut self, timeout: std::time::Duration) -> Result<(), ExecError> {
713 self.request_stop();
714
715 let start = std::time::Instant::now();
716 let mut first_err: Option<ExecError> = None;
717
718 while !self.handles.is_empty() {
720 let mut i = 0;
721 while i < self.handles.len() {
722 if self.handles[i].1.is_finished() {
723 let (stage_idx, h) = self.handles.swap_remove(i);
724 let res = match h.join() {
725 Ok(r) => r,
726 Err(_) => Err(ExecError::new("stage panic")),
727 };
728
729 if let Err(e) = res {
730 if first_err.is_none() {
731 first_err = Some(ExecError::with_context(
732 "stage exited",
733 format!("stage_idx={stage_idx}: {e}"),
734 ));
735 }
736 }
737
738 continue;
739 }
740 i += 1;
741 }
742
743 if self.handles.is_empty() {
744 break;
745 }
746
747 if start.elapsed() >= timeout {
748 let mut unfinished: Vec<usize> = self.handles.iter().map(|(idx, _)| *idx).collect();
749 unfinished.sort_unstable();
750 return Err(ExecError::with_context(
751 "stop/join timed out",
752 format!("unfinished_stages={unfinished:?}"),
753 ));
754 }
755
756 std::thread::sleep(std::time::Duration::from_millis(10));
757 }
758
759 match first_err {
760 Some(e) => Err(e),
761 None => Ok(()),
762 }
763 }
764
765 pub fn stop_and_join(mut self) -> Result<(), ExecError> {
767 self.request_stop();
768
769 let mut first_err: Option<ExecError> = None;
770 for (stage_idx, h) in self.handles.drain(..) {
771 let res = match h.join() {
772 Ok(r) => r,
773 Err(_) => Err(ExecError::new("stage panic")),
774 };
775
776 if let Err(e) = res {
777 if first_err.is_none() {
778 first_err = Some(ExecError::with_context(
779 "stage exited",
780 format!("stage_idx={stage_idx}: {e}"),
781 ));
782 }
783 }
784 }
785
786 match first_err {
787 Some(e) => Err(e),
788 None => Ok(()),
789 }
790 }
791}
792
793pub fn spawn_single_ring_shm_runtime(
795 spec: &PipelineSpecV1,
796 lane_path: std::path::PathBuf,
797) -> Result<SingleRingSpRuntime, ExecError> {
798 spawn_single_ring_shm_runtime_with_wait(spec, lane_path, WaitPreset::StdBackoff)
799}
800
801pub fn spawn_single_ring_shm_runtime_with_wait(
803 spec: &PipelineSpecV1,
804 lane_path: std::path::PathBuf,
805 wait: WaitPreset,
806) -> Result<SingleRingSpRuntime, ExecError> {
807 spawn_single_ring_shm_runtime_with_wait_and_resolver(spec, lane_path, wait, None)
808}
809
810pub fn spawn_single_ring_shm_runtime_with_wait_and_resolver(
813 spec: &PipelineSpecV1,
814 lane_path: std::path::PathBuf,
815 wait: WaitPreset,
816 resolver: Option<&dyn StageResolver>,
817) -> Result<SingleRingSpRuntime, ExecError> {
818 spawn_single_ring_shm_runtime_with_wait_and_resolver_and_prefault(
819 spec, lane_path, wait, resolver, false,
820 )
821}
822
823pub fn spawn_single_ring_shm_runtime_with_wait_and_resolver_and_prefault(
826 spec: &PipelineSpecV1,
827 lane_path: std::path::PathBuf,
828 wait: WaitPreset,
829 resolver: Option<&dyn StageResolver>,
830 prefault: bool,
831) -> Result<SingleRingSpRuntime, ExecError> {
832 spawn_single_ring_shm_runtime_with_wait_and_resolver_and_thread_init_and_prefault(
833 spec, lane_path, wait, resolver, None, prefault,
834 )
835}
836
837pub fn spawn_single_ring_shm_runtime_with_wait_and_resolver_and_thread_init(
841 spec: &PipelineSpecV1,
842 lane_path: std::path::PathBuf,
843 wait: WaitPreset,
844 resolver: Option<&dyn StageResolver>,
845 thread_init: Option<ThreadInitHook>,
846) -> Result<SingleRingSpRuntime, ExecError> {
847 spawn_single_ring_shm_runtime_with_wait_and_resolver_and_thread_init_and_prefault(
848 spec,
849 lane_path,
850 wait,
851 resolver,
852 thread_init,
853 false,
854 )
855}
856
857pub fn spawn_single_ring_shm_runtime_with_wait_and_resolver_and_thread_init_and_prefault(
861 spec: &PipelineSpecV1,
862 lane_path: std::path::PathBuf,
863 wait: WaitPreset,
864 resolver: Option<&dyn StageResolver>,
865 thread_init: Option<ThreadInitHook>,
866 prefault: bool,
867) -> Result<SingleRingSpRuntime, ExecError> {
868 validate_v1(spec).map_err(|e| ExecError::with_source("spec validation failed", e))?;
869 let ring = match spec {
870 PipelineSpecV1::SingleRing(ring) => ring,
871 _ => return Err(ExecError::new("expected SingleRing spec")),
872 };
873
874 if ring.shards > 1 {
875 return Err(ExecError::new(
876 "sharded SingleRing requires the sharded executor helper",
877 ));
878 }
879
880 let lane = "ignored";
881 let stages = ring.stages.len();
882 let active_gating = stages;
883 if active_gating == 0 || active_gating > 4 {
884 return Err(ExecError::new("single_ring gating exceeds Layout4"));
885 }
886
887 let opts = byteor_pipeline_backings_shm::AttachOptions {
888 blocking: false,
889 prefault,
890 path: Some(lane_path.clone()),
891 queue: 0,
892 };
893 let shm =
894 byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, lane).map_err(|e| {
895 ExecError::with_context_source(
896 "attach shm failed",
897 format!("path={} lane={lane}", lane_path.display()),
898 e,
899 )
900 })?;
901
902 let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
903 let iterations = usize::MAX;
904 let auto_pin_cpus = if thread_init.is_none() {
905 current_allowed_cpus()
906 } else {
907 Vec::new()
908 };
909
910 let mut handles = Vec::new();
911 for (stage_idx, st) in ring.stages.iter().cloned().enumerate() {
912 let opts_t = opts.clone();
913 let stop_t = stop.clone();
914 let cursor_t = byteor_pipeline_backings_shm::CursorBarrier::new(&shm);
915 let wait_t = wait;
916 let thread_init_t = thread_init.clone();
917 let cpu_t = runtime_stage_cpu(&auto_pin_cpus, stage_idx);
918
919 let barrier = if st.depends_on.is_empty() {
920 StageBarrier::Cursor(cursor_t)
921 } else if st.depends_on.len() == 1 {
922 let dep = st.depends_on[0] as usize;
923 StageBarrier::Gating(byteor_pipeline_backings_shm::GatingBarrier::new(&shm, dep))
924 } else {
925 let mut deps = Vec::new();
926 for d in &st.depends_on {
927 deps.push(byteor_pipeline_backings_shm::GatingBarrier::new(
928 &shm,
929 *d as usize,
930 ));
931 }
932 StageBarrier::Join(JoinBarrier { deps })
933 };
934
935 let mut op = resolve_stage_op(st.op, resolver)?;
936
937 let release = stage_idx + 1 == stages;
938
939 let role_name = format!("byteor-single-ring:stage-{stage_idx}");
940 let handle = std::thread::Builder::new()
941 .name(role_name.clone())
942 .spawn(move || {
943 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
944 let _affinity_guard = ThreadAffinityGuard::pin(cpu_t)?;
945
946 if let Some(h) = &thread_init_t {
947 let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Stage);
948 h(&ctx)?;
949 }
950
951 let mut stage = byteor_pipeline_backings_shm::ShmSequencedSlotsStage::attach(
952 &opts_t, lane, stage_idx,
953 )
954 .map_err(|_| {
955 ExecError::with_context(
956 "attach stage failed",
957 format!("stage_idx={stage_idx}"),
958 )
959 })?;
960
961 match wait_t {
962 WaitPreset::MaxPerfSpin => {
963 let mut wait = indexbus_core::SpinWait::default();
964 run_stage_loop_stoppable(
965 &mut stage,
966 &barrier,
967 &mut wait,
968 stop_t.as_ref(),
969 &mut op,
970 iterations,
971 release,
972 )?;
973 }
974 WaitPreset::StdBackoff => {
975 let mut wait = indexbus_core::StdBackoff::default();
976 run_stage_loop_stoppable(
977 &mut stage,
978 &barrier,
979 &mut wait,
980 stop_t.as_ref(),
981 &mut op,
982 iterations,
983 release,
984 )?;
985 }
986 WaitPreset::Yield => {
987 let mut wait = indexbus_core::YieldWait;
988 run_stage_loop_stoppable(
989 &mut stage,
990 &barrier,
991 &mut wait,
992 stop_t.as_ref(),
993 &mut op,
994 iterations,
995 release,
996 )?;
997 }
998 }
999
1000 Ok::<(), ExecError>(())
1001 }));
1002
1003 match result {
1004 Ok(Ok(())) => Ok(()),
1005 Ok(Err(e)) => {
1006 stop_t.store(true, std::sync::atomic::Ordering::Relaxed);
1007 Err(ExecError::with_context(
1008 "stage failed",
1009 format!("stage_idx={stage_idx}: {e}"),
1010 ))
1011 }
1012 Err(_) => {
1013 stop_t.store(true, std::sync::atomic::Ordering::Relaxed);
1014 Err(ExecError::with_context(
1015 "stage panic",
1016 format!("stage_idx={stage_idx}"),
1017 ))
1018 }
1019 }
1020 })
1021 .map_err(|e| ExecError::with_source("failed to spawn stage thread", e))?;
1022
1023 handles.push((stage_idx, handle));
1024 }
1025
1026 Ok(SingleRingSpRuntime {
1027 stop,
1028 lane_path,
1029 active_gating,
1030 handles,
1031 })
1032}
1033
1034pub fn run_single_ring_shm(
1038 spec: &PipelineSpecV1,
1039 lane_path: std::path::PathBuf,
1040 input: &[u8],
1041) -> Result<Vec<u8>, ExecError> {
1042 run_single_ring_shm_with_wait(spec, lane_path, input, WaitPreset::MaxPerfSpin)
1043}
1044
1045pub fn run_single_ring_shm_with_wait(
1047 spec: &PipelineSpecV1,
1048 lane_path: std::path::PathBuf,
1049 input: &[u8],
1050 wait: WaitPreset,
1051) -> Result<Vec<u8>, ExecError> {
1052 run_single_ring_shm_with_wait_and_resolver(spec, lane_path, input, wait, None)
1053}
1054
1055pub fn run_single_ring_shm_with_wait_and_resolver(
1058 spec: &PipelineSpecV1,
1059 lane_path: std::path::PathBuf,
1060 input: &[u8],
1061 wait: WaitPreset,
1062 resolver: Option<&dyn StageResolver>,
1063) -> Result<Vec<u8>, ExecError> {
1064 run_single_ring_shm_with_wait_and_resolver_and_prefault(
1065 spec, lane_path, input, wait, resolver, false,
1066 )
1067}
1068
1069pub fn run_single_ring_shm_with_wait_and_resolver_and_prefault(
1072 spec: &PipelineSpecV1,
1073 lane_path: std::path::PathBuf,
1074 input: &[u8],
1075 wait: WaitPreset,
1076 resolver: Option<&dyn StageResolver>,
1077 prefault: bool,
1078) -> Result<Vec<u8>, ExecError> {
1079 run_single_ring_shm_with_wait_and_resolver_and_thread_init_and_prefault(
1080 spec, lane_path, input, wait, resolver, None, prefault,
1081 )
1082}
1083
1084pub fn run_single_ring_shm_with_wait_and_resolver_and_thread_init(
1089 spec: &PipelineSpecV1,
1090 lane_path: std::path::PathBuf,
1091 input: &[u8],
1092 wait: WaitPreset,
1093 resolver: Option<&dyn StageResolver>,
1094 thread_init: Option<ThreadInitHook>,
1095) -> Result<Vec<u8>, ExecError> {
1096 run_single_ring_shm_with_wait_and_resolver_and_thread_init_and_prefault(
1097 spec,
1098 lane_path,
1099 input,
1100 wait,
1101 resolver,
1102 thread_init,
1103 false,
1104 )
1105}
1106
1107pub fn run_single_ring_shm_with_wait_and_resolver_and_thread_init_and_prefault(
1111 spec: &PipelineSpecV1,
1112 lane_path: std::path::PathBuf,
1113 input: &[u8],
1114 wait: WaitPreset,
1115 resolver: Option<&dyn StageResolver>,
1116 thread_init: Option<ThreadInitHook>,
1117 prefault: bool,
1118) -> Result<Vec<u8>, ExecError> {
1119 validate_v1(spec).map_err(|e| ExecError::with_source("spec validation failed", e))?;
1120 let ring = match spec {
1121 PipelineSpecV1::SingleRing(ring) => ring,
1122 _ => return Err(ExecError::new("spec is not SingleRing")),
1123 };
1124
1125 if ring.shards > 1 {
1126 return Err(ExecError::new(
1127 "sharded SingleRing requires the sharded executor helper",
1128 ));
1129 }
1130
1131 let opts = byteor_pipeline_backings_shm::AttachOptions {
1132 blocking: false,
1133 prefault,
1134 path: Some(lane_path),
1135 queue: 0,
1136 };
1137
1138 let lane = "ignored";
1139 let stages = ring.stages.len();
1140 let active_gating = stages;
1141 if active_gating == 0 || active_gating > 4 {
1142 return Err(ExecError::new("single_ring gating exceeds Layout4"));
1143 }
1144
1145 let mut prod = byteor_pipeline_backings_shm::ShmSequencedSlotsProducer::attach(
1146 &opts,
1147 lane,
1148 1,
1149 active_gating,
1150 )
1151 .map_err(|e| {
1152 ExecError::with_context_source(
1153 "attach producer failed",
1154 format!("path={} lane={lane}", opts.path.as_ref().unwrap().display()),
1155 e,
1156 )
1157 })?;
1158
1159 let shm =
1160 byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, lane).map_err(|e| {
1161 ExecError::with_context_source(
1162 "attach shm failed",
1163 format!("path={} lane={lane}", opts.path.as_ref().unwrap().display()),
1164 e,
1165 )
1166 })?;
1167 let cursor = byteor_pipeline_backings_shm::CursorBarrier::new(&shm);
1168
1169 let mut handles = Vec::new();
1170 let iterations = input.len();
1171 let final_stage_idx = stages.saturating_sub(1);
1172 let auto_pin_cpus = if thread_init.is_none() {
1173 current_allowed_cpus()
1174 } else {
1175 Vec::new()
1176 };
1177 for (stage_idx, st) in ring
1178 .stages
1179 .iter()
1180 .cloned()
1181 .enumerate()
1182 .take(final_stage_idx)
1183 {
1184 let opts_t = opts.clone();
1185 let cursor_t = byteor_pipeline_backings_shm::CursorBarrier::new(&shm);
1186 let wait_t = wait;
1187 let thread_init_t = thread_init.clone();
1188 let cpu_t = worker_thread_cpu(&auto_pin_cpus, stage_idx);
1189
1190 let barrier = if st.depends_on.is_empty() {
1191 StageBarrier::Cursor(cursor_t)
1192 } else if st.depends_on.len() == 1 {
1193 let dep = st.depends_on[0] as usize;
1194 StageBarrier::Gating(byteor_pipeline_backings_shm::GatingBarrier::new(&shm, dep))
1195 } else {
1196 let mut deps = Vec::new();
1197 for d in &st.depends_on {
1198 deps.push(byteor_pipeline_backings_shm::GatingBarrier::new(
1199 &shm,
1200 *d as usize,
1201 ));
1202 }
1203 StageBarrier::Join(JoinBarrier { deps })
1204 };
1205
1206 let mut op = resolve_stage_op(st.op, resolver)?;
1207
1208 let role_name = format!("byteor-single-ring:stage-{stage_idx}");
1209 let handle = std::thread::Builder::new()
1210 .name(role_name.clone())
1211 .spawn(move || {
1212 let _affinity_guard = ThreadAffinityGuard::pin(cpu_t)?;
1213
1214 if let Some(h) = &thread_init_t {
1215 let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Stage);
1216 h(&ctx)?;
1217 }
1218
1219 let mut stage = byteor_pipeline_backings_shm::ShmSequencedSlotsStage::attach(
1220 &opts_t, lane, stage_idx,
1221 )
1222 .map_err(|e| {
1223 ExecError::with_context_source(
1224 "attach stage failed",
1225 format!(
1226 "path={} lane={lane} stage_idx={stage_idx}",
1227 opts_t.path.as_ref().unwrap().display()
1228 ),
1229 e,
1230 )
1231 })?;
1232
1233 match wait_t {
1234 WaitPreset::MaxPerfSpin => {
1235 let mut wait = indexbus_core::SpinWait::default();
1236 run_stage_loop_iters(
1237 &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1238 )?;
1239 }
1240 WaitPreset::StdBackoff => {
1241 let mut wait = indexbus_core::StdBackoff::default();
1242 run_stage_loop_iters(
1243 &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1244 )?;
1245 }
1246 WaitPreset::Yield => {
1247 let mut wait = indexbus_core::YieldWait;
1248 run_stage_loop_iters(
1249 &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1250 )?;
1251 }
1252 }
1253
1254 Ok::<(), ExecError>(())
1255 })
1256 .map_err(|e| ExecError::with_source("failed to spawn stage thread", e))?;
1257 handles.push(handle);
1258 }
1259
1260 let _main_affinity_guard = ThreadAffinityGuard::pin(main_thread_cpu(&auto_pin_cpus))?;
1261
1262 let mut publish_wait = match wait {
1263 WaitPreset::MaxPerfSpin => WaitHolder::Spin(indexbus_core::SpinWait::default()),
1264 WaitPreset::StdBackoff => WaitHolder::Backoff(indexbus_core::StdBackoff::default()),
1265 WaitPreset::Yield => WaitHolder::Yield(indexbus_core::YieldWait),
1266 };
1267
1268 let final_stage = ring
1270 .stages
1271 .get(final_stage_idx)
1272 .cloned()
1273 .ok_or_else(|| ExecError::new("single_ring requires at least one stage"))?;
1274
1275 let consumer_barrier = if final_stage.depends_on.is_empty() {
1276 StageBarrier::Cursor(cursor)
1277 } else if final_stage.depends_on.len() == 1 {
1278 let dep = final_stage.depends_on[0] as usize;
1279 StageBarrier::Gating(byteor_pipeline_backings_shm::GatingBarrier::new(&shm, dep))
1280 } else {
1281 let mut deps = Vec::new();
1282 for d in &final_stage.depends_on {
1283 deps.push(byteor_pipeline_backings_shm::GatingBarrier::new(
1284 &shm,
1285 *d as usize,
1286 ));
1287 }
1288 StageBarrier::Join(JoinBarrier { deps })
1289 };
1290
1291 let mut consumer_stage =
1292 byteor_pipeline_backings_shm::ShmSequencedSlotsStage::attach(&opts, lane, final_stage_idx)
1293 .map_err(|e| {
1294 ExecError::with_context_source(
1295 "attach stage failed",
1296 format!(
1297 "path={} lane={lane} stage_idx={final_stage_idx}",
1298 opts.path.as_ref().unwrap().display()
1299 ),
1300 e,
1301 )
1302 })?;
1303 let mut consumer_op = resolve_stage_op(final_stage.op, resolver)?;
1304 let mut scratch = vec![0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
1305
1306 let mut out = Vec::with_capacity(input.len());
1307 let mut wait = match wait {
1308 WaitPreset::MaxPerfSpin => WaitHolder::Spin(indexbus_core::SpinWait::default()),
1309 WaitPreset::StdBackoff => WaitHolder::Backoff(indexbus_core::StdBackoff::default()),
1310 WaitPreset::Yield => WaitHolder::Yield(indexbus_core::YieldWait),
1311 };
1312
1313 for &b in input {
1314 loop {
1315 match prod.publish_bytes(&[b]) {
1316 Ok(_) => break,
1317 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Full) => {
1318 publish_wait.wait();
1319 }
1320 Err(e) => {
1321 return Err(ExecError::with_context("publish failed", e.to_string()));
1322 }
1323 }
1324 }
1325
1326 match &mut wait {
1327 WaitHolder::Spin(w) => run_consumer_step_collect_u8(
1328 &mut consumer_stage,
1329 &consumer_barrier,
1330 w,
1331 &mut consumer_op,
1332 &mut scratch,
1333 &mut out,
1334 )?,
1335 WaitHolder::Backoff(w) => run_consumer_step_collect_u8(
1336 &mut consumer_stage,
1337 &consumer_barrier,
1338 w,
1339 &mut consumer_op,
1340 &mut scratch,
1341 &mut out,
1342 )?,
1343 WaitHolder::Yield(w) => run_consumer_step_collect_u8(
1344 &mut consumer_stage,
1345 &consumer_barrier,
1346 w,
1347 &mut consumer_op,
1348 &mut scratch,
1349 &mut out,
1350 )?,
1351 }
1352 }
1353
1354 for h in handles {
1355 h.join().map_err(|_| ExecError::new("stage panic"))??;
1356 }
1357
1358 Ok(out)
1359}
1360
1361pub fn run_single_ring_shm_record_with_wait_and_resolver_and_thread_init(
1366 spec: &PipelineSpecV1,
1367 lane_path: std::path::PathBuf,
1368 input: &[u8],
1369 wait: WaitPreset,
1370 resolver: Option<&dyn StageResolver>,
1371 thread_init: Option<ThreadInitHook>,
1372) -> Result<Vec<u8>, ExecError> {
1373 validate_v1(spec).map_err(|e| ExecError::with_source("spec validation failed", e))?;
1374 let ring = match spec {
1375 PipelineSpecV1::SingleRing(ring) => ring,
1376 _ => return Err(ExecError::new("spec is not SingleRing")),
1377 };
1378
1379 if ring.shards > 1 {
1380 return Err(ExecError::new(
1381 "sharded SingleRing requires the sharded executor helper",
1382 ));
1383 }
1384
1385 if input.len() > indexbus_core::INDEXBUS_SLOT_DATA_SIZE {
1386 return Err(ExecError::with_context(
1387 "input too large for slot",
1388 format!(
1389 "in={} max={}",
1390 input.len(),
1391 indexbus_core::INDEXBUS_SLOT_DATA_SIZE
1392 ),
1393 ));
1394 }
1395
1396 let opts = byteor_pipeline_backings_shm::AttachOptions {
1397 blocking: false,
1398 prefault: false,
1399 path: Some(lane_path),
1400 queue: 0,
1401 };
1402
1403 let lane = "ignored";
1404 let stages = ring.stages.len();
1405 let active_gating = stages;
1406 if active_gating == 0 || active_gating > 4 {
1407 return Err(ExecError::new("single_ring gating exceeds Layout4"));
1408 }
1409
1410 let mut prod = byteor_pipeline_backings_shm::ShmSequencedSlotsProducer::attach(
1411 &opts,
1412 lane,
1413 1,
1414 active_gating,
1415 )
1416 .map_err(|e| {
1417 ExecError::with_context_source(
1418 "attach producer failed",
1419 format!("path={} lane={lane}", opts.path.as_ref().unwrap().display()),
1420 e,
1421 )
1422 })?;
1423
1424 let shm =
1425 byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, lane).map_err(|e| {
1426 ExecError::with_context_source(
1427 "attach shm failed",
1428 format!("path={} lane={lane}", opts.path.as_ref().unwrap().display()),
1429 e,
1430 )
1431 })?;
1432 let cursor = byteor_pipeline_backings_shm::CursorBarrier::new(&shm);
1433
1434 let mut handles = Vec::new();
1435 let iterations = 1;
1436 let final_stage_idx = stages.saturating_sub(1);
1437 let auto_pin_cpus = if thread_init.is_none() {
1438 current_allowed_cpus()
1439 } else {
1440 Vec::new()
1441 };
1442 for (stage_idx, st) in ring
1443 .stages
1444 .iter()
1445 .cloned()
1446 .enumerate()
1447 .take(final_stage_idx)
1448 {
1449 let opts_t = opts.clone();
1450 let cursor_t = byteor_pipeline_backings_shm::CursorBarrier::new(&shm);
1451 let wait_t = wait;
1452 let thread_init_t = thread_init.clone();
1453 let cpu_t = worker_thread_cpu(&auto_pin_cpus, stage_idx);
1454
1455 let barrier = if st.depends_on.is_empty() {
1456 StageBarrier::Cursor(cursor_t)
1457 } else if st.depends_on.len() == 1 {
1458 let dep = st.depends_on[0] as usize;
1459 StageBarrier::Gating(byteor_pipeline_backings_shm::GatingBarrier::new(&shm, dep))
1460 } else {
1461 let mut deps = Vec::new();
1462 for d in &st.depends_on {
1463 deps.push(byteor_pipeline_backings_shm::GatingBarrier::new(
1464 &shm,
1465 *d as usize,
1466 ));
1467 }
1468 StageBarrier::Join(JoinBarrier { deps })
1469 };
1470
1471 let mut op = resolve_stage_op(st.op, resolver)?;
1472
1473 let role_name = format!("byteor-single-ring:stage-{stage_idx}");
1474 let handle = std::thread::Builder::new()
1475 .name(role_name.clone())
1476 .spawn(move || {
1477 let _affinity_guard = ThreadAffinityGuard::pin(cpu_t)?;
1478
1479 if let Some(h) = &thread_init_t {
1480 let ctx = ThreadInitContext::new(role_name.clone(), RoleKind::Stage);
1481 h(&ctx)?;
1482 }
1483
1484 let mut stage = byteor_pipeline_backings_shm::ShmSequencedSlotsStage::attach(
1485 &opts_t, lane, stage_idx,
1486 )
1487 .map_err(|e| {
1488 ExecError::with_context_source(
1489 "attach stage failed",
1490 format!(
1491 "path={} lane={lane} stage_idx={stage_idx}",
1492 opts_t.path.as_ref().unwrap().display()
1493 ),
1494 e,
1495 )
1496 })?;
1497
1498 match wait_t {
1499 WaitPreset::MaxPerfSpin => {
1500 let mut wait = indexbus_core::SpinWait::default();
1501 run_stage_loop_iters(
1502 &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1503 )?;
1504 }
1505 WaitPreset::StdBackoff => {
1506 let mut wait = indexbus_core::StdBackoff::default();
1507 run_stage_loop_iters(
1508 &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1509 )?;
1510 }
1511 WaitPreset::Yield => {
1512 let mut wait = indexbus_core::YieldWait;
1513 run_stage_loop_iters(
1514 &mut stage, &barrier, &mut wait, &mut op, iterations, false,
1515 )?;
1516 }
1517 }
1518
1519 Ok::<(), ExecError>(())
1520 })
1521 .map_err(|e| ExecError::with_source("failed to spawn stage thread", e))?;
1522 handles.push(handle);
1523 }
1524
1525 let _main_affinity_guard = ThreadAffinityGuard::pin(main_thread_cpu(&auto_pin_cpus))?;
1526
1527 let mut publish_wait = match wait {
1528 WaitPreset::MaxPerfSpin => WaitHolder::Spin(indexbus_core::SpinWait::default()),
1529 WaitPreset::StdBackoff => WaitHolder::Backoff(indexbus_core::StdBackoff::default()),
1530 WaitPreset::Yield => WaitHolder::Yield(indexbus_core::YieldWait),
1531 };
1532
1533 let final_stage = ring
1534 .stages
1535 .get(final_stage_idx)
1536 .cloned()
1537 .ok_or_else(|| ExecError::new("single_ring requires at least one stage"))?;
1538
1539 let consumer_barrier = if final_stage.depends_on.is_empty() {
1540 StageBarrier::Cursor(cursor)
1541 } else if final_stage.depends_on.len() == 1 {
1542 let dep = final_stage.depends_on[0] as usize;
1543 StageBarrier::Gating(byteor_pipeline_backings_shm::GatingBarrier::new(&shm, dep))
1544 } else {
1545 let mut deps = Vec::new();
1546 for d in &final_stage.depends_on {
1547 deps.push(byteor_pipeline_backings_shm::GatingBarrier::new(
1548 &shm,
1549 *d as usize,
1550 ));
1551 }
1552 StageBarrier::Join(JoinBarrier { deps })
1553 };
1554
1555 let mut consumer_stage =
1556 byteor_pipeline_backings_shm::ShmSequencedSlotsStage::attach(&opts, lane, final_stage_idx)
1557 .map_err(|e| {
1558 ExecError::with_context_source(
1559 "attach stage failed",
1560 format!(
1561 "path={} lane={lane} stage_idx={final_stage_idx}",
1562 opts.path.as_ref().unwrap().display()
1563 ),
1564 e,
1565 )
1566 })?;
1567 let mut consumer_op = resolve_stage_op(final_stage.op, resolver)?;
1568 let mut scratch = vec![0u8; indexbus_core::INDEXBUS_SLOT_DATA_SIZE];
1569
1570 let mut out = Vec::with_capacity(input.len());
1571 let mut wait = match wait {
1572 WaitPreset::MaxPerfSpin => WaitHolder::Spin(indexbus_core::SpinWait::default()),
1573 WaitPreset::StdBackoff => WaitHolder::Backoff(indexbus_core::StdBackoff::default()),
1574 WaitPreset::Yield => WaitHolder::Yield(indexbus_core::YieldWait),
1575 };
1576
1577 loop {
1578 match prod.publish_bytes(input) {
1579 Ok(_) => break,
1580 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Full) => publish_wait.wait(),
1581 Err(e) => return Err(ExecError::with_context("publish failed", e.to_string())),
1582 }
1583 }
1584
1585 match &mut wait {
1586 WaitHolder::Spin(w) => run_consumer_step_collect_bytes(
1587 &mut consumer_stage,
1588 &consumer_barrier,
1589 w,
1590 &mut consumer_op,
1591 &mut scratch,
1592 &mut out,
1593 )?,
1594 WaitHolder::Backoff(w) => run_consumer_step_collect_bytes(
1595 &mut consumer_stage,
1596 &consumer_barrier,
1597 w,
1598 &mut consumer_op,
1599 &mut scratch,
1600 &mut out,
1601 )?,
1602 WaitHolder::Yield(w) => run_consumer_step_collect_bytes(
1603 &mut consumer_stage,
1604 &consumer_barrier,
1605 w,
1606 &mut consumer_op,
1607 &mut scratch,
1608 &mut out,
1609 )?,
1610 }
1611
1612 for h in handles {
1613 h.join().map_err(|_| ExecError::new("stage panic"))??;
1614 }
1615
1616 Ok(out)
1617}
1618
1619fn run_consumer_step_collect_u8<W: byteor_pipeline_kernel::WaitStrategy>(
1620 stage: &mut byteor_pipeline_backings_shm::ShmSequencedSlotsStage,
1621 barrier: &StageBarrier,
1622 wait: &mut W,
1623 op: &mut StageExecOp,
1624 scratch: &mut [u8],
1625 out: &mut Vec<u8>,
1626) -> Result<(), ExecError> {
1627 match op {
1628 StageExecOp::BuiltIn(op) => match op {
1629 StageOpV1::Identity => {
1630 stage
1631 .inspect_next(barrier, wait, true, |data, len| {
1632 out.push(if len > 0 { data[0] } else { 0 });
1633 Ok(())
1634 })
1635 .map_err(|e| ExecError::with_source("stage failed", e))?;
1636 }
1637 StageOpV1::AddU8 { delta } => {
1638 let delta = *delta;
1639 stage
1640 .process_next(barrier, wait, true, |data, len| {
1641 let n = clamp_len(*len, data.len());
1642 op_add_u8(&mut data[..n], delta);
1643 out.push(if *len > 0 { data[0] } else { 0 });
1644 Ok(())
1645 })
1646 .map_err(|e| ExecError::with_source("stage failed", e))?;
1647 }
1648 StageOpV1::ResolverKey { stage } => {
1649 return Err(ExecError::with_context(
1650 "unresolved stage",
1651 format!("stage={stage}"),
1652 ));
1653 }
1654 },
1655 StageExecOp::Resolved {
1656 key,
1657 stage: resolved,
1658 } => {
1659 let mut stage_err: Option<ExecError> = None;
1660
1661 match resolved {
1662 ResolvedStage::Inspect(f) => {
1663 let res = stage.inspect_next(barrier, wait, true, |data, len| {
1664 let n = clamp_len(len, data.len());
1665 if let Err(e) = f(&data[..n]) {
1666 stage_err = Some(ExecError::with_context(
1667 "resolved stage failed",
1668 format!("stage={key}: {e}"),
1669 ));
1670 return Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped);
1671 }
1672 out.push(if len > 0 { data[0] } else { 0 });
1673 Ok(())
1674 });
1675
1676 match res {
1677 Ok(_) => {}
1678 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
1679 return Err(stage_err.unwrap_or_else(|| {
1680 ExecError::with_context(
1681 "resolved stage stopped",
1682 format!("stage={key}"),
1683 )
1684 }));
1685 }
1686 Err(e) => return Err(ExecError::with_source("stage failed", e)),
1687 }
1688 }
1689 _ => {
1690 let res = stage.process_next(barrier, wait, true, |data, len| {
1691 let n_in = clamp_len(*len, data.len());
1692 let input = &data[..n_in];
1693 let out_buf = &mut scratch[..n_in];
1694
1695 let mut set_err = |e: ExecError| {
1696 stage_err = Some(e);
1697 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped)
1698 };
1699
1700 match resolved {
1701 ResolvedStage::MapOk(f) => {
1702 let n = f(input, out_buf);
1703 if n > input.len() {
1704 return set_err(ExecError::with_context(
1705 "resolved stage output too large",
1706 format!("stage={key} out={n} in={}", input.len()),
1707 ));
1708 }
1709 data[..n].copy_from_slice(&out_buf[..n]);
1710 *len = n as u32;
1711 }
1712 ResolvedStage::Map(f) => match f(input, out_buf) {
1713 Ok(n) => {
1714 if n > input.len() {
1715 return set_err(ExecError::with_context(
1716 "resolved stage output too large",
1717 format!(
1718 "stage={key} out={n} in={} (fallible)",
1719 input.len()
1720 ),
1721 ));
1722 }
1723 data[..n].copy_from_slice(&out_buf[..n]);
1724 *len = n as u32;
1725 }
1726 Err(e) => {
1727 return set_err(ExecError::with_context(
1728 "resolved stage failed",
1729 format!("stage={key}: {e}"),
1730 ));
1731 }
1732 },
1733 ResolvedStage::Filter(f) => match f(input) {
1734 Ok(true) => {}
1735 Ok(false) => {
1736 *len = 0;
1737 }
1738 Err(e) => {
1739 return set_err(ExecError::with_context(
1740 "resolved stage failed",
1741 format!("stage={key}: {e}"),
1742 ));
1743 }
1744 },
1745 ResolvedStage::Inspect(f) => match f(input) {
1746 Ok(()) => {}
1747 Err(e) => {
1748 return set_err(ExecError::with_context(
1749 "resolved stage failed",
1750 format!("stage={key}: {e}"),
1751 ));
1752 }
1753 },
1754 ResolvedStage::StatefulTransform(st) => match st.call(input, out_buf) {
1755 Ok(TransformOutcome::Drop) => {
1756 *len = 0;
1757 }
1758 Ok(TransformOutcome::ForwardInput) => {}
1759 Ok(TransformOutcome::ForwardOutput(n)) => {
1760 if n > input.len() {
1761 return set_err(ExecError::with_context(
1762 "resolved stage output too large",
1763 format!(
1764 "stage={key} out={n} in={} (stateful transform)",
1765 input.len()
1766 ),
1767 ));
1768 }
1769 data[..n].copy_from_slice(&out_buf[..n]);
1770 *len = n as u32;
1771 }
1772 Ok(_) => {
1773 return set_err(ExecError::new(
1774 "unsupported transform outcome (non-exhaustive)",
1775 ));
1776 }
1777 Err(e) => {
1778 return set_err(ExecError::with_context(
1779 "resolved stage failed",
1780 format!("stage={key}: {e}"),
1781 ));
1782 }
1783 },
1784 ResolvedStage::Stateful(st) => match st.call(input, out_buf) {
1785 Ok(n) => {
1786 if n > input.len() {
1787 return set_err(ExecError::with_context(
1788 "resolved stage output too large",
1789 format!(
1790 "stage={key} out={n} in={} (stateful)",
1791 input.len()
1792 ),
1793 ));
1794 }
1795 data[..n].copy_from_slice(&out_buf[..n]);
1796 *len = n as u32;
1797 }
1798 Err(e) => {
1799 return set_err(ExecError::with_context(
1800 "resolved stage failed",
1801 format!("stage={key}: {e}"),
1802 ));
1803 }
1804 },
1805 }
1806
1807 out.push(if *len > 0 { data[0] } else { 0 });
1808 Ok(())
1809 });
1810
1811 match res {
1812 Ok(_) => {}
1813 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
1814 return Err(stage_err.unwrap_or_else(|| {
1815 ExecError::with_context(
1816 "resolved stage stopped",
1817 format!("stage={key}"),
1818 )
1819 }));
1820 }
1821 Err(e) => return Err(ExecError::with_source("stage failed", e)),
1822 }
1823 }
1824 }
1825 }
1826 }
1827
1828 Ok(())
1829}
1830
1831fn run_consumer_step_collect_bytes<W: byteor_pipeline_kernel::WaitStrategy>(
1832 stage: &mut byteor_pipeline_backings_shm::ShmSequencedSlotsStage,
1833 barrier: &StageBarrier,
1834 wait: &mut W,
1835 op: &mut StageExecOp,
1836 scratch: &mut [u8],
1837 out: &mut Vec<u8>,
1838) -> Result<(), ExecError> {
1839 out.clear();
1840
1841 match op {
1842 StageExecOp::BuiltIn(op) => match op {
1843 StageOpV1::Identity => {
1844 stage
1845 .inspect_next(barrier, wait, true, |data, len| {
1846 let n = clamp_len(len, data.len());
1847 out.extend_from_slice(&data[..n]);
1848 Ok(())
1849 })
1850 .map_err(|e| ExecError::with_source("stage failed", e))?;
1851 }
1852 StageOpV1::AddU8 { delta } => {
1853 let delta = *delta;
1854 stage
1855 .process_next(barrier, wait, true, |data, len| {
1856 let n = clamp_len(*len, data.len());
1857 op_add_u8(&mut data[..n], delta);
1858 out.extend_from_slice(&data[..n]);
1859 Ok(())
1860 })
1861 .map_err(|e| ExecError::with_source("stage failed", e))?;
1862 }
1863 StageOpV1::ResolverKey { stage } => {
1864 return Err(ExecError::with_context(
1865 "unresolved stage",
1866 format!("stage={stage}"),
1867 ));
1868 }
1869 },
1870 StageExecOp::Resolved {
1871 key,
1872 stage: resolved,
1873 } => {
1874 let mut stage_err: Option<ExecError> = None;
1875
1876 match resolved {
1877 ResolvedStage::Inspect(f) => {
1878 let res = stage.inspect_next(barrier, wait, true, |data, len| {
1879 let n = clamp_len(len, data.len());
1880 if let Err(e) = f(&data[..n]) {
1881 stage_err = Some(ExecError::with_context(
1882 "resolved stage failed",
1883 format!("stage={key}: {e}"),
1884 ));
1885 return Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped);
1886 }
1887 out.extend_from_slice(&data[..n]);
1888 Ok(())
1889 });
1890
1891 match res {
1892 Ok(_) => {}
1893 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
1894 return Err(stage_err
1895 .unwrap_or_else(|| ExecError::new("resolved stage stopped")));
1896 }
1897 Err(e) => return Err(ExecError::with_source("stage failed", e)),
1898 }
1899 }
1900 _ => {
1901 let res = stage.process_next(barrier, wait, true, |data, len| {
1902 let n_in = clamp_len(*len, data.len());
1903 let input = &data[..n_in];
1904 let out_buf = &mut scratch[..n_in];
1905
1906 let mut set_err = |e: ExecError| {
1907 stage_err = Some(e);
1908 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped)
1909 };
1910
1911 match resolved {
1912 ResolvedStage::MapOk(f) => {
1913 let n = f(input, out_buf);
1914 if n > input.len() {
1915 return set_err(ExecError::with_context(
1916 "resolved stage output too large",
1917 format!("stage={key} out={n} in={}", input.len()),
1918 ));
1919 }
1920 data[..n].copy_from_slice(&out_buf[..n]);
1921 *len = n as u32;
1922 }
1923 ResolvedStage::Map(f) => match f(input, out_buf) {
1924 Ok(n) => {
1925 if n > input.len() {
1926 return set_err(ExecError::with_context(
1927 "resolved stage output too large",
1928 format!(
1929 "stage={key} out={n} in={} (fallible)",
1930 input.len()
1931 ),
1932 ));
1933 }
1934 data[..n].copy_from_slice(&out_buf[..n]);
1935 *len = n as u32;
1936 }
1937 Err(e) => {
1938 return set_err(ExecError::with_context(
1939 "resolved stage failed",
1940 format!("stage={key}: {e}"),
1941 ));
1942 }
1943 },
1944 ResolvedStage::Filter(f) => match f(input) {
1945 Ok(true) => {}
1946 Ok(false) => {
1947 *len = 0;
1948 }
1949 Err(e) => {
1950 return set_err(ExecError::with_context(
1951 "resolved stage failed",
1952 format!("stage={key}: {e}"),
1953 ));
1954 }
1955 },
1956 ResolvedStage::Inspect(f) => match f(input) {
1957 Ok(()) => {}
1958 Err(e) => {
1959 return set_err(ExecError::with_context(
1960 "resolved stage failed",
1961 format!("stage={key}: {e}"),
1962 ));
1963 }
1964 },
1965 ResolvedStage::StatefulTransform(st) => match st.call(input, out_buf) {
1966 Ok(TransformOutcome::Drop) => {
1967 *len = 0;
1968 }
1969 Ok(TransformOutcome::ForwardInput) => {}
1970 Ok(TransformOutcome::ForwardOutput(n)) => {
1971 if n > input.len() {
1972 return set_err(ExecError::with_context(
1973 "resolved stage output too large",
1974 format!(
1975 "stage={key} out={n} in={} (stateful transform)",
1976 input.len()
1977 ),
1978 ));
1979 }
1980 data[..n].copy_from_slice(&out_buf[..n]);
1981 *len = n as u32;
1982 }
1983 Ok(_) => {
1984 return set_err(ExecError::new(
1985 "unsupported transform outcome (non-exhaustive)",
1986 ));
1987 }
1988 Err(e) => {
1989 return set_err(ExecError::with_context(
1990 "resolved stage failed",
1991 format!("stage={key}: {e}"),
1992 ));
1993 }
1994 },
1995 ResolvedStage::Stateful(st) => match st.call(input, out_buf) {
1996 Ok(n) => {
1997 if n > input.len() {
1998 return set_err(ExecError::with_context(
1999 "resolved stage output too large",
2000 format!(
2001 "stage={key} out={n} in={} (stateful)",
2002 input.len()
2003 ),
2004 ));
2005 }
2006 data[..n].copy_from_slice(&out_buf[..n]);
2007 *len = n as u32;
2008 }
2009 Err(e) => {
2010 return set_err(ExecError::with_context(
2011 "resolved stage failed",
2012 format!("stage={key}: {e}"),
2013 ));
2014 }
2015 },
2016 }
2017
2018 let n_out = clamp_len(*len, data.len());
2019 out.extend_from_slice(&data[..n_out]);
2020 Ok(())
2021 });
2022
2023 match res {
2024 Ok(_) => {}
2025 Err(byteor_pipeline_backings_shm::SequencedSlotsError::Stopped) => {
2026 return Err(stage_err
2027 .unwrap_or_else(|| ExecError::new("resolved stage stopped")));
2028 }
2029 Err(e) => return Err(ExecError::with_source("stage failed", e)),
2030 }
2031 }
2032 }
2033 }
2034 }
2035
2036 Ok(())
2037}
2038
2039fn shard_lane_path(base: &std::path::Path, shard_idx: u32) -> std::path::PathBuf {
2040 let dir = base.parent().unwrap_or_else(|| std::path::Path::new("."));
2041 let stem = base
2042 .file_stem()
2043 .and_then(|s| s.to_str())
2044 .unwrap_or("byteor-single-ring");
2045 let ext = base.extension().and_then(|s| s.to_str());
2046 let name = match ext {
2047 Some(ext) => format!("{stem}-shard{shard_idx}.{ext}"),
2048 None => format!("{stem}-shard{shard_idx}"),
2049 };
2050 dir.join(name)
2051}
2052
2053pub fn run_single_ring_shm_per_key_sharded_with_wait(
2062 spec: &PipelineSpecV1,
2063 lane_path: std::path::PathBuf,
2064 input: &[u8],
2065 wait: WaitPreset,
2066) -> Result<Vec<alloc::vec::Vec<u8>>, ExecError> {
2067 run_single_ring_shm_per_key_sharded_with_wait_and_resolver(spec, lane_path, input, wait, None)
2068}
2069
2070pub fn run_single_ring_shm_per_key_sharded_with_wait_and_resolver(
2075 spec: &PipelineSpecV1,
2076 lane_path: std::path::PathBuf,
2077 input: &[u8],
2078 wait: WaitPreset,
2079 resolver: Option<&dyn StageResolver>,
2080) -> Result<Vec<alloc::vec::Vec<u8>>, ExecError> {
2081 run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_prefault(
2082 spec, lane_path, input, wait, resolver, false,
2083 )
2084}
2085
2086pub fn run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_prefault(
2090 spec: &PipelineSpecV1,
2091 lane_path: std::path::PathBuf,
2092 input: &[u8],
2093 wait: WaitPreset,
2094 resolver: Option<&dyn StageResolver>,
2095 prefault: bool,
2096) -> Result<Vec<alloc::vec::Vec<u8>>, ExecError> {
2097 run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_thread_init_and_prefault(
2098 spec, lane_path, input, wait, resolver, None, prefault,
2099 )
2100}
2101
2102pub fn run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_thread_init(
2108 spec: &PipelineSpecV1,
2109 lane_path: std::path::PathBuf,
2110 input: &[u8],
2111 wait: WaitPreset,
2112 resolver: Option<&dyn StageResolver>,
2113 thread_init: Option<ThreadInitHook>,
2114) -> Result<Vec<alloc::vec::Vec<u8>>, ExecError> {
2115 run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_thread_init_and_prefault(
2116 spec,
2117 lane_path,
2118 input,
2119 wait,
2120 resolver,
2121 thread_init,
2122 false,
2123 )
2124}
2125
2126pub fn run_single_ring_shm_per_key_sharded_with_wait_and_resolver_and_thread_init_and_prefault(
2130 spec: &PipelineSpecV1,
2131 lane_path: std::path::PathBuf,
2132 input: &[u8],
2133 wait: WaitPreset,
2134 resolver: Option<&dyn StageResolver>,
2135 thread_init: Option<ThreadInitHook>,
2136 prefault: bool,
2137) -> Result<Vec<alloc::vec::Vec<u8>>, ExecError> {
2138 validate_v1(spec).map_err(|e| ExecError::with_source("spec validation failed", e))?;
2139 let ring = match spec {
2140 PipelineSpecV1::SingleRing(ring) => ring,
2141 _ => return Err(ExecError::new("spec is not SingleRing")),
2142 };
2143
2144 let shards = ring.shards;
2145 if shards <= 1 {
2146 return Ok(alloc::vec::Vec::from([
2147 run_single_ring_shm_with_wait_and_resolver_and_thread_init_and_prefault(
2148 spec,
2149 lane_path,
2150 input,
2151 wait,
2152 resolver,
2153 thread_init,
2154 prefault,
2155 )?,
2156 ]));
2157 }
2158
2159 let mut per_shard: alloc::vec::Vec<alloc::vec::Vec<u8>> =
2161 (0..shards).map(|_| alloc::vec::Vec::new()).collect();
2162 for &b in input {
2163 let shard = (b as u32) % shards;
2165 per_shard[shard as usize].push(b);
2166 }
2167
2168 let mut out = alloc::vec::Vec::with_capacity(shards as usize);
2173 std::thread::scope(|scope| {
2174 let mut handles = alloc::vec::Vec::with_capacity(shards as usize);
2175 for shard_idx in 0..shards {
2176 let mut spec_t = spec.clone();
2177 if let PipelineSpecV1::SingleRing(r) = &mut spec_t {
2178 r.shards = 1;
2179 }
2180 let path_t = shard_lane_path(&lane_path, shard_idx);
2181 let input_t = core::mem::take(&mut per_shard[shard_idx as usize]);
2182 let thread_init_t = thread_init.clone();
2183 handles.push(scope.spawn(move || {
2184 run_single_ring_shm_with_wait_and_resolver_and_thread_init_and_prefault(
2185 &spec_t,
2186 path_t,
2187 &input_t,
2188 wait,
2189 resolver,
2190 thread_init_t,
2191 prefault,
2192 )
2193 }));
2194 }
2195
2196 for h in handles {
2197 out.push(h.join().map_err(|_| ExecError::new("stage panic"))??);
2198 }
2199 Ok::<(), ExecError>(())
2200 })?;
2201
2202 Ok(out)
2203}