byteor_pipeline_backings_shm/
sequenced_slots.rs

1//! SequencedSlots (SingleRing substrate) over SHM.
2
3use std::sync::Arc;
4
5use core::sync::atomic::{AtomicBool, Ordering};
6
7use byteor_pipeline_kernel::{LaneTx, LaneTxError};
8
9use crate::{attach_sequenced_slots_region, AttachOptions, BackingError};
10
11/// Errors returned by sequenced-slots operations.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum SequencedSlotsError {
14    /// Ring is full (producer would wrap past the minimum gating sequence).
15    Full,
16    /// Attempted to publish out of order.
17    BadPublish {
18        /// Expected next publish sequence.
19        expected: u64,
20        /// Sequence that was attempted to be published.
21        got: u64,
22    },
23    /// Consumer/stage attempted to move its gating sequence backwards.
24    BadGating {
25        /// Previous gating sequence.
26        prev: u64,
27        /// Proposed next gating sequence.
28        next: u64,
29    },
30    /// Slot payload is too large for the fixed slot size.
31    TooLarge {
32        /// Maximum supported/available bytes.
33        max: usize,
34        /// Requested/observed bytes.
35        len: usize,
36    },
37    /// Layout is incompatible or corrupt.
38    Incompatible,
39
40    /// Operation was interrupted by a cooperative stop request.
41    Stopped,
42}
43
44impl core::fmt::Display for SequencedSlotsError {
45    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
46        match self {
47            SequencedSlotsError::Full => write!(f, "full"),
48            SequencedSlotsError::BadPublish { expected, got } => {
49                write!(f, "bad publish: expected={expected} got={got}")
50            }
51            SequencedSlotsError::BadGating { prev, next } => {
52                write!(f, "bad gating: prev={prev} next={next}")
53            }
54            SequencedSlotsError::TooLarge { max, len } => {
55                write!(f, "too large: max={max} len={len}")
56            }
57            SequencedSlotsError::Incompatible => write!(f, "incompatible"),
58            SequencedSlotsError::Stopped => write!(f, "stopped"),
59        }
60    }
61}
62
63impl std::error::Error for SequencedSlotsError {}
64
65#[inline]
66pub(crate) fn wait_for_or_stop<
67    B: ?Sized + byteor_pipeline_kernel::SeqBarrier,
68    W: byteor_pipeline_kernel::WaitStrategy,
69>(
70    barrier: &B,
71    seq: u64,
72    wait: &mut W,
73    stop: &AtomicBool,
74) -> Result<(), SequencedSlotsError> {
75    loop {
76        if stop.load(Ordering::Relaxed) {
77            return Err(SequencedSlotsError::Stopped);
78        }
79        if barrier.available() >= seq {
80            return Ok(());
81        }
82        wait.wait();
83    }
84}
85
86/// A shared handle to a mapped `SequencedSlotsLayout4` region.
87pub struct ShmSequencedSlots {
88    pub(crate) region: Arc<byteor_transport_shm::SequencedSlotsRegion<4>>,
89    pub(crate) ptr: *mut byteor_abi::layouts::SequencedSlotsLayout<4>,
90}
91
92/// Minimal observability snapshot for SingleRing / sequenced-slots.
93///
94/// This is intentionally read-only and cheap to gather. It can be sampled without stopping
95/// the pipeline.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub struct SingleRingSnapshot {
98    /// Current producer cursor.
99    pub cursor: u64,
100    /// Per-stage/consumer gating sequences (Layout4).
101    pub gating: [u64; 4],
102}
103
104impl SingleRingSnapshot {
105    /// Compute `cursor - gating[stage]` (saturating).
106    pub fn lag(&self, stage: usize) -> u64 {
107        let g = self.gating.get(stage).copied().unwrap_or(0);
108        self.cursor.saturating_sub(g)
109    }
110
111    /// Return the minimum gating sequence across the first `active` gating cells.
112    ///
113    /// If `active == 0`, returns `0`.
114    pub fn min_gating(&self, active: usize) -> u64 {
115        let active = active.min(self.gating.len());
116        if active == 0 {
117            return 0;
118        }
119        let mut min = u64::MAX;
120        for &g in self.gating[..active].iter() {
121            min = min.min(g);
122        }
123        if min == u64::MAX {
124            0
125        } else {
126            min
127        }
128    }
129
130    /// Compute `cursor - min_gating(active)` (saturating).
131    pub fn min_lag(&self, active: usize) -> u64 {
132        self.cursor.saturating_sub(self.min_gating(active))
133    }
134
135    /// Compute the producer wrap point for the next claim/publish.
136    ///
137    /// This corresponds to the producer's internal check:
138    /// `wrap_point = (next_seq) - window_capacity`.
139    ///
140    /// When `wrap_point > min_gating(active)`, the producer is full.
141    pub fn wrap_point(&self, window_capacity: u64) -> u64 {
142        self.cursor
143            .saturating_add(1)
144            .saturating_sub(window_capacity)
145    }
146
147    /// Compute producer wrap pressure (saturating).
148    ///
149    /// Returns `wrap_point(window_capacity) - min_gating(active)` (saturating).
150    ///
151    /// - `0` means the producer can still claim/publish without wrapping past min-gating.
152    /// - `>0` means the next claim would be blocked (`Full`).
153    pub fn wrap_pressure(&self, active: usize, window_capacity: u64) -> u64 {
154        self.wrap_point(window_capacity)
155            .saturating_sub(self.min_gating(active))
156    }
157
158    /// Compute the number of in-flight sequences against the slowest (min) gating cell.
159    pub fn outstanding(&self, active: usize) -> u64 {
160        self.min_lag(active)
161    }
162
163    /// Compute remaining headroom before backpressure triggers (saturating).
164    pub fn headroom(&self, active: usize, window_capacity: u64) -> u64 {
165        window_capacity.saturating_sub(self.outstanding(active))
166    }
167}
168
169unsafe impl Send for ShmSequencedSlots {}
170unsafe impl Sync for ShmSequencedSlots {}
171
172impl ShmSequencedSlots {
173    /// Attach to a shared region (reusing the transport's validation).
174    pub fn attach(opts: &AttachOptions, lane: &str) -> Result<Self, BackingError> {
175        let region = attach_sequenced_slots_region(opts, lane)?;
176        let ptr = region.as_ptr();
177        Ok(Self {
178            region: Arc::new(region),
179            ptr,
180        })
181    }
182
183    #[inline]
184    pub(crate) fn layout_ptr(&self) -> *mut byteor_abi::layouts::SequencedSlotsLayout<4> {
185        self.ptr
186    }
187
188    /// Snapshot cursor + gating sequences.
189    pub fn snapshot(&self) -> SingleRingSnapshot {
190        let layout = self.layout_ptr();
191        unsafe {
192            let cursor = crate::ss_internal::load_cursor(layout);
193            let mut gating = [0u64; 4];
194            for (i, g) in gating.iter_mut().enumerate() {
195                *g = crate::ss_internal::atomic_u64(&(*layout).gating[i].seq)
196                    .load(Ordering::Acquire);
197            }
198            SingleRingSnapshot { cursor, gating }
199        }
200    }
201
202    /// Effective maximum number of outstanding sequences before producer backpressure triggers.
203    ///
204    /// This is currently `min(INDEXBUS_QUEUE_CAPACITY, INDEXBUS_SLOTS_CAPACITY)`.
205    pub fn window_capacity(&self) -> u64 {
206        crate::ss_internal::effective_window_capacity()
207    }
208}
209
210/// Single-producer handle over `SequencedSlotsLayout4`.
211///
212/// v1: single-writer claim/publish (no MPMC claiming).
213pub struct ShmSequencedSlotsProducer {
214    shm: ShmSequencedSlots,
215    ring_capacity: u64,
216    window_capacity: u64,
217    refcnt_on_publish: u32,
218    active_gating: usize,
219    next_to_claim: u64,
220    next_to_publish: u64,
221    cached_min_gating: u64,
222}
223
224impl ShmSequencedSlotsProducer {
225    /// Attach a producer.
226    ///
227    /// `refcnt_on_publish` controls the slot refcount initialized for each published entry.
228    /// - Use `1` for single-ring pipelines where only the final stage releases.
229    /// - Use `k` for multicast scenarios with `k` independent consumers.
230    pub fn attach(
231        opts: &AttachOptions,
232        lane: &str,
233        refcnt_on_publish: u32,
234        active_gating: usize,
235    ) -> Result<Self, BackingError> {
236        let shm = ShmSequencedSlots::attach(opts, lane)?;
237        if refcnt_on_publish == 0 {
238            return Err(BackingError::Invalid("refcnt_on_publish must be > 0"));
239        }
240        if active_gating == 0 || active_gating > 4 {
241            return Err(BackingError::Invalid("active_gating must be 1..=4"));
242        }
243
244        let ring_capacity = crate::ss_internal::ring_capacity();
245        let window_capacity = crate::ss_internal::effective_window_capacity();
246        let cursor = unsafe { crate::ss_internal::load_cursor(shm.layout_ptr()) };
247        let cached_min_gating =
248            unsafe { crate::ss_internal::min_gating(shm.layout_ptr(), active_gating) };
249        Ok(Self {
250            shm,
251            ring_capacity,
252            window_capacity,
253            refcnt_on_publish,
254            active_gating,
255            next_to_claim: cursor,
256            next_to_publish: cursor.saturating_add(1),
257            cached_min_gating,
258        })
259    }
260
261    /// Maximum number of outstanding unpublished/ungated sequences permitted by this backing.
262    ///
263    /// This is currently `min(INDEXBUS_QUEUE_CAPACITY, INDEXBUS_SLOTS_CAPACITY)`.
264    pub fn window_capacity(&self) -> u64 {
265        self.window_capacity
266    }
267
268    /// Try to reserve the next sequence.
269    pub fn try_claim_next(&mut self) -> Result<u64, SequencedSlotsError> {
270        let next = self.next_to_claim.saturating_add(1);
271        let wrap_point = next.saturating_sub(self.window_capacity);
272        if wrap_point > self.cached_min_gating {
273            let min_gating = unsafe {
274                crate::ss_internal::min_gating(self.shm.layout_ptr(), self.active_gating)
275            };
276            self.cached_min_gating = min_gating;
277            if wrap_point > min_gating {
278                return Err(SequencedSlotsError::Full);
279            }
280        }
281        self.next_to_claim = next;
282        Ok(next)
283    }
284
285    /// Publish bytes as the next sequenced entry.
286    pub fn publish_bytes(&mut self, bytes: &[u8]) -> Result<u64, SequencedSlotsError> {
287        if bytes.len() > indexbus_core::INDEXBUS_SLOT_DATA_SIZE {
288            return Err(SequencedSlotsError::TooLarge {
289                max: indexbus_core::INDEXBUS_SLOT_DATA_SIZE,
290                len: bytes.len(),
291            });
292        }
293
294        let seq = self.try_claim_next()?;
295        self.publish_claimed_with(seq, |slot_bytes| {
296            slot_bytes[..bytes.len()].copy_from_slice(bytes);
297            Ok(bytes.len())
298        })
299    }
300
301    /// Publish a previously-claimed sequence by writing into the slot buffer.
302    pub fn publish_claimed_with<F>(&mut self, seq: u64, f: F) -> Result<u64, SequencedSlotsError>
303    where
304        F: FnOnce(&mut [u8]) -> Result<usize, SequencedSlotsError>,
305    {
306        if seq != self.next_to_publish {
307            return Err(SequencedSlotsError::BadPublish {
308                expected: self.next_to_publish,
309                got: seq,
310            });
311        }
312
313        let layout = self.shm.layout_ptr();
314        let pool = unsafe { core::ptr::addr_of_mut!((*layout).slot_pool) };
315        let slot_idx =
316            unsafe { crate::ss_internal::pool_alloc_ptr(pool) }.ok_or(SequencedSlotsError::Full)?;
317
318        let wrote = unsafe {
319            let s = &mut *core::ptr::addr_of_mut!((*pool).slots[slot_idx as usize]);
320            crate::ss_internal::atomic_u32(&s.refcnt)
321                .store(self.refcnt_on_publish, Ordering::Release);
322            let len = f(&mut s.data)?;
323            if len > indexbus_core::INDEXBUS_SLOT_DATA_SIZE {
324                crate::ss_internal::slot_dec_refcnt_and_free_if_zero(pool, slot_idx);
325                return Err(SequencedSlotsError::TooLarge {
326                    max: indexbus_core::INDEXBUS_SLOT_DATA_SIZE,
327                    len,
328                });
329            }
330            s.len = len as u32;
331            len
332        };
333
334        let _ = wrote;
335        let ring_idx = (seq % self.ring_capacity) as usize;
336        unsafe {
337            crate::ss_internal::atomic_u32(&(*layout).ring[ring_idx])
338                .store(slot_idx, Ordering::Release);
339            crate::ss_internal::publish_cursor(layout, seq);
340        }
341
342        self.next_to_publish = self.next_to_publish.saturating_add(1);
343        Ok(seq)
344    }
345}
346
347impl LaneTx for ShmSequencedSlotsProducer {
348    fn publish(&mut self, msg: &[u8]) -> Result<(), LaneTxError> {
349        self.publish_bytes(msg).map(|_| ()).map_err(|e| match e {
350            SequencedSlotsError::Full => LaneTxError::Full,
351            SequencedSlotsError::TooLarge { max, len } => LaneTxError::TooLarge { max, len },
352            _ => LaneTxError::Failed,
353        })
354    }
355}