byteor_pipeline_backings_shm/
sequenced_slots.rs1use std::sync::Arc;
4
5use core::sync::atomic::{AtomicBool, Ordering};
6
7use byteor_pipeline_kernel::{LaneTx, LaneTxError};
8
9use crate::{attach_sequenced_slots_region, AttachOptions, BackingError};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum SequencedSlotsError {
14 Full,
16 BadPublish {
18 expected: u64,
20 got: u64,
22 },
23 BadGating {
25 prev: u64,
27 next: u64,
29 },
30 TooLarge {
32 max: usize,
34 len: usize,
36 },
37 Incompatible,
39
40 Stopped,
42}
43
44impl core::fmt::Display for SequencedSlotsError {
45 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
46 match self {
47 SequencedSlotsError::Full => write!(f, "full"),
48 SequencedSlotsError::BadPublish { expected, got } => {
49 write!(f, "bad publish: expected={expected} got={got}")
50 }
51 SequencedSlotsError::BadGating { prev, next } => {
52 write!(f, "bad gating: prev={prev} next={next}")
53 }
54 SequencedSlotsError::TooLarge { max, len } => {
55 write!(f, "too large: max={max} len={len}")
56 }
57 SequencedSlotsError::Incompatible => write!(f, "incompatible"),
58 SequencedSlotsError::Stopped => write!(f, "stopped"),
59 }
60 }
61}
62
63impl std::error::Error for SequencedSlotsError {}
64
65#[inline]
66pub(crate) fn wait_for_or_stop<
67 B: ?Sized + byteor_pipeline_kernel::SeqBarrier,
68 W: byteor_pipeline_kernel::WaitStrategy,
69>(
70 barrier: &B,
71 seq: u64,
72 wait: &mut W,
73 stop: &AtomicBool,
74) -> Result<(), SequencedSlotsError> {
75 loop {
76 if stop.load(Ordering::Relaxed) {
77 return Err(SequencedSlotsError::Stopped);
78 }
79 if barrier.available() >= seq {
80 return Ok(());
81 }
82 wait.wait();
83 }
84}
85
86pub struct ShmSequencedSlots {
88 pub(crate) region: Arc<byteor_transport_shm::SequencedSlotsRegion<4>>,
89 pub(crate) ptr: *mut byteor_abi::layouts::SequencedSlotsLayout<4>,
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub struct SingleRingSnapshot {
98 pub cursor: u64,
100 pub gating: [u64; 4],
102}
103
104impl SingleRingSnapshot {
105 pub fn lag(&self, stage: usize) -> u64 {
107 let g = self.gating.get(stage).copied().unwrap_or(0);
108 self.cursor.saturating_sub(g)
109 }
110
111 pub fn min_gating(&self, active: usize) -> u64 {
115 let active = active.min(self.gating.len());
116 if active == 0 {
117 return 0;
118 }
119 let mut min = u64::MAX;
120 for &g in self.gating[..active].iter() {
121 min = min.min(g);
122 }
123 if min == u64::MAX {
124 0
125 } else {
126 min
127 }
128 }
129
130 pub fn min_lag(&self, active: usize) -> u64 {
132 self.cursor.saturating_sub(self.min_gating(active))
133 }
134
135 pub fn wrap_point(&self, window_capacity: u64) -> u64 {
142 self.cursor
143 .saturating_add(1)
144 .saturating_sub(window_capacity)
145 }
146
147 pub fn wrap_pressure(&self, active: usize, window_capacity: u64) -> u64 {
154 self.wrap_point(window_capacity)
155 .saturating_sub(self.min_gating(active))
156 }
157
158 pub fn outstanding(&self, active: usize) -> u64 {
160 self.min_lag(active)
161 }
162
163 pub fn headroom(&self, active: usize, window_capacity: u64) -> u64 {
165 window_capacity.saturating_sub(self.outstanding(active))
166 }
167}
168
169unsafe impl Send for ShmSequencedSlots {}
170unsafe impl Sync for ShmSequencedSlots {}
171
172impl ShmSequencedSlots {
173 pub fn attach(opts: &AttachOptions, lane: &str) -> Result<Self, BackingError> {
175 let region = attach_sequenced_slots_region(opts, lane)?;
176 let ptr = region.as_ptr();
177 Ok(Self {
178 region: Arc::new(region),
179 ptr,
180 })
181 }
182
183 #[inline]
184 pub(crate) fn layout_ptr(&self) -> *mut byteor_abi::layouts::SequencedSlotsLayout<4> {
185 self.ptr
186 }
187
188 pub fn snapshot(&self) -> SingleRingSnapshot {
190 let layout = self.layout_ptr();
191 unsafe {
192 let cursor = crate::ss_internal::load_cursor(layout);
193 let mut gating = [0u64; 4];
194 for (i, g) in gating.iter_mut().enumerate() {
195 *g = crate::ss_internal::atomic_u64(&(*layout).gating[i].seq)
196 .load(Ordering::Acquire);
197 }
198 SingleRingSnapshot { cursor, gating }
199 }
200 }
201
202 pub fn window_capacity(&self) -> u64 {
206 crate::ss_internal::effective_window_capacity()
207 }
208}
209
210pub struct ShmSequencedSlotsProducer {
214 shm: ShmSequencedSlots,
215 ring_capacity: u64,
216 window_capacity: u64,
217 refcnt_on_publish: u32,
218 active_gating: usize,
219 next_to_claim: u64,
220 next_to_publish: u64,
221 cached_min_gating: u64,
222}
223
224impl ShmSequencedSlotsProducer {
225 pub fn attach(
231 opts: &AttachOptions,
232 lane: &str,
233 refcnt_on_publish: u32,
234 active_gating: usize,
235 ) -> Result<Self, BackingError> {
236 let shm = ShmSequencedSlots::attach(opts, lane)?;
237 if refcnt_on_publish == 0 {
238 return Err(BackingError::Invalid("refcnt_on_publish must be > 0"));
239 }
240 if active_gating == 0 || active_gating > 4 {
241 return Err(BackingError::Invalid("active_gating must be 1..=4"));
242 }
243
244 let ring_capacity = crate::ss_internal::ring_capacity();
245 let window_capacity = crate::ss_internal::effective_window_capacity();
246 let cursor = unsafe { crate::ss_internal::load_cursor(shm.layout_ptr()) };
247 let cached_min_gating =
248 unsafe { crate::ss_internal::min_gating(shm.layout_ptr(), active_gating) };
249 Ok(Self {
250 shm,
251 ring_capacity,
252 window_capacity,
253 refcnt_on_publish,
254 active_gating,
255 next_to_claim: cursor,
256 next_to_publish: cursor.saturating_add(1),
257 cached_min_gating,
258 })
259 }
260
261 pub fn window_capacity(&self) -> u64 {
265 self.window_capacity
266 }
267
268 pub fn try_claim_next(&mut self) -> Result<u64, SequencedSlotsError> {
270 let next = self.next_to_claim.saturating_add(1);
271 let wrap_point = next.saturating_sub(self.window_capacity);
272 if wrap_point > self.cached_min_gating {
273 let min_gating = unsafe {
274 crate::ss_internal::min_gating(self.shm.layout_ptr(), self.active_gating)
275 };
276 self.cached_min_gating = min_gating;
277 if wrap_point > min_gating {
278 return Err(SequencedSlotsError::Full);
279 }
280 }
281 self.next_to_claim = next;
282 Ok(next)
283 }
284
285 pub fn publish_bytes(&mut self, bytes: &[u8]) -> Result<u64, SequencedSlotsError> {
287 if bytes.len() > indexbus_core::INDEXBUS_SLOT_DATA_SIZE {
288 return Err(SequencedSlotsError::TooLarge {
289 max: indexbus_core::INDEXBUS_SLOT_DATA_SIZE,
290 len: bytes.len(),
291 });
292 }
293
294 let seq = self.try_claim_next()?;
295 self.publish_claimed_with(seq, |slot_bytes| {
296 slot_bytes[..bytes.len()].copy_from_slice(bytes);
297 Ok(bytes.len())
298 })
299 }
300
301 pub fn publish_claimed_with<F>(&mut self, seq: u64, f: F) -> Result<u64, SequencedSlotsError>
303 where
304 F: FnOnce(&mut [u8]) -> Result<usize, SequencedSlotsError>,
305 {
306 if seq != self.next_to_publish {
307 return Err(SequencedSlotsError::BadPublish {
308 expected: self.next_to_publish,
309 got: seq,
310 });
311 }
312
313 let layout = self.shm.layout_ptr();
314 let pool = unsafe { core::ptr::addr_of_mut!((*layout).slot_pool) };
315 let slot_idx =
316 unsafe { crate::ss_internal::pool_alloc_ptr(pool) }.ok_or(SequencedSlotsError::Full)?;
317
318 let wrote = unsafe {
319 let s = &mut *core::ptr::addr_of_mut!((*pool).slots[slot_idx as usize]);
320 crate::ss_internal::atomic_u32(&s.refcnt)
321 .store(self.refcnt_on_publish, Ordering::Release);
322 let len = f(&mut s.data)?;
323 if len > indexbus_core::INDEXBUS_SLOT_DATA_SIZE {
324 crate::ss_internal::slot_dec_refcnt_and_free_if_zero(pool, slot_idx);
325 return Err(SequencedSlotsError::TooLarge {
326 max: indexbus_core::INDEXBUS_SLOT_DATA_SIZE,
327 len,
328 });
329 }
330 s.len = len as u32;
331 len
332 };
333
334 let _ = wrote;
335 let ring_idx = (seq % self.ring_capacity) as usize;
336 unsafe {
337 crate::ss_internal::atomic_u32(&(*layout).ring[ring_idx])
338 .store(slot_idx, Ordering::Release);
339 crate::ss_internal::publish_cursor(layout, seq);
340 }
341
342 self.next_to_publish = self.next_to_publish.saturating_add(1);
343 Ok(seq)
344 }
345}
346
347impl LaneTx for ShmSequencedSlotsProducer {
348 fn publish(&mut self, msg: &[u8]) -> Result<(), LaneTxError> {
349 self.publish_bytes(msg).map(|_| ()).map_err(|e| match e {
350 SequencedSlotsError::Full => LaneTxError::Full,
351 SequencedSlotsError::TooLarge { max, len } => LaneTxError::TooLarge { max, len },
352 _ => LaneTxError::Failed,
353 })
354 }
355}