byteor_pipeline_backings_shm/
subscriber.rs

1//! SequencedSlots subscriber/consumer.
2
3use core::sync::atomic::{AtomicBool, Ordering};
4
5use byteor_pipeline_kernel::WaitStrategy;
6
7use crate::barriers::CursorBarrier;
8use crate::sequenced_slots::{wait_for_or_stop, SequencedSlotsError, ShmSequencedSlots};
9use crate::{AttachOptions, BackingError};
10
11/// Subscriber/consumer that reads from the cursor and advances a specific gating sequence.
12///
13/// This is the multi-consumer (multicast) building block.
14pub struct ShmSequencedSlotsSubscriber {
15    shm: ShmSequencedSlots,
16    consumer: usize,
17    next: u64,
18}
19
20impl ShmSequencedSlotsSubscriber {
21    /// Attach a subscriber for `consumer`.
22    pub fn attach(opts: &AttachOptions, lane: &str, consumer: usize) -> Result<Self, BackingError> {
23        let shm = ShmSequencedSlots::attach(opts, lane)?;
24        let layout = shm.layout_ptr();
25        let start = if consumer < 4 {
26            unsafe {
27                crate::ss_internal::atomic_u64(&(*layout).gating[consumer].seq)
28                    .load(Ordering::Acquire)
29            }
30        } else {
31            0
32        };
33        Ok(Self {
34            shm,
35            consumer,
36            next: start.saturating_add(1),
37        })
38    }
39
40    /// Receive the next sequenced entry into `out`, advancing gating and releasing the slot.
41    pub fn recv_next_into<W: WaitStrategy>(
42        &mut self,
43        cursor: &CursorBarrier,
44        wait: &mut W,
45        out: &mut [u8],
46    ) -> Result<(u64, usize), SequencedSlotsError> {
47        self.recv_next_into_from(cursor, wait, out)
48    }
49
50    /// Receive the next sequenced entry into `out`, waiting on an arbitrary `SeqBarrier`.
51    ///
52    /// This is the building block for stage-ordered consumption (e.g. consumer waits on the
53    /// last stage's gating sequence, or on a join barrier).
54    pub fn recv_next_into_from<W: WaitStrategy, B: byteor_pipeline_kernel::SeqBarrier>(
55        &mut self,
56        barrier: &B,
57        wait: &mut W,
58        out: &mut [u8],
59    ) -> Result<(u64, usize), SequencedSlotsError> {
60        let seq = self.next;
61        barrier.wait_for(seq, wait);
62
63        let layout = self.shm.layout_ptr();
64        let ring_idx = (seq % crate::ss_internal::ring_capacity()) as usize;
65        let slot_idx = unsafe {
66            crate::ss_internal::atomic_u32(&(*layout).ring[ring_idx]).load(Ordering::Acquire)
67        };
68        if slot_idx == byteor_abi::layouts::SequencedSlotsLayout::<4>::EMPTY_RING {
69            return Err(SequencedSlotsError::Incompatible);
70        }
71
72        let pool = unsafe { core::ptr::addr_of_mut!((*layout).slot_pool) };
73
74        let len = unsafe {
75            let s = &*core::ptr::addr_of!((*pool).slots[slot_idx as usize]);
76            let len = s.len as usize;
77            if len > indexbus_core::INDEXBUS_SLOT_DATA_SIZE {
78                return Err(SequencedSlotsError::Incompatible);
79            }
80            if out.len() < len {
81                return Err(SequencedSlotsError::TooLarge {
82                    max: out.len(),
83                    len,
84                });
85            }
86            core::ptr::copy_nonoverlapping(s.data.as_ptr(), out.as_mut_ptr(), len);
87            len
88        };
89
90        unsafe {
91            crate::ss_internal::slot_dec_refcnt_and_free_if_zero(pool, slot_idx);
92            crate::ss_internal::store_gating(layout, self.consumer, seq)?;
93        }
94
95        self.next = self.next.saturating_add(1);
96        Ok((seq, len))
97    }
98
99    /// Stop-aware variant of [`Self::recv_next_into`].
100    pub fn recv_next_into_stoppable<W: WaitStrategy>(
101        &mut self,
102        cursor: &CursorBarrier,
103        wait: &mut W,
104        out: &mut [u8],
105        stop: &AtomicBool,
106    ) -> Result<(u64, usize), SequencedSlotsError> {
107        self.recv_next_into_from_stoppable(cursor, wait, out, stop)
108    }
109
110    /// Stop-aware variant of [`Self::recv_next_into_from`].
111    pub fn recv_next_into_from_stoppable<W: WaitStrategy, B: byteor_pipeline_kernel::SeqBarrier>(
112        &mut self,
113        barrier: &B,
114        wait: &mut W,
115        out: &mut [u8],
116        stop: &AtomicBool,
117    ) -> Result<(u64, usize), SequencedSlotsError> {
118        let seq = self.next;
119        wait_for_or_stop(barrier, seq, wait, stop)?;
120
121        let layout = self.shm.layout_ptr();
122        let ring_idx = (seq % crate::ss_internal::ring_capacity()) as usize;
123        let slot_idx = unsafe {
124            crate::ss_internal::atomic_u32(&(*layout).ring[ring_idx]).load(Ordering::Acquire)
125        };
126        if slot_idx == byteor_abi::layouts::SequencedSlotsLayout::<4>::EMPTY_RING {
127            return Err(SequencedSlotsError::Incompatible);
128        }
129
130        let pool = unsafe { core::ptr::addr_of_mut!((*layout).slot_pool) };
131
132        let len = unsafe {
133            let s = &*core::ptr::addr_of!((*pool).slots[slot_idx as usize]);
134            let len = s.len as usize;
135            if len > indexbus_core::INDEXBUS_SLOT_DATA_SIZE {
136                return Err(SequencedSlotsError::Incompatible);
137            }
138            if out.len() < len {
139                return Err(SequencedSlotsError::TooLarge {
140                    max: out.len(),
141                    len,
142                });
143            }
144            core::ptr::copy_nonoverlapping(s.data.as_ptr(), out.as_mut_ptr(), len);
145            len
146        };
147
148        unsafe {
149            crate::ss_internal::slot_dec_refcnt_and_free_if_zero(pool, slot_idx);
150            crate::ss_internal::store_gating(layout, self.consumer, seq)?;
151        }
152
153        self.next = self.next.saturating_add(1);
154        Ok((seq, len))
155    }
156}