byteor_pipeline_backings_shm/
subscriber.rs1use core::sync::atomic::{AtomicBool, Ordering};
4
5use byteor_pipeline_kernel::WaitStrategy;
6
7use crate::barriers::CursorBarrier;
8use crate::sequenced_slots::{wait_for_or_stop, SequencedSlotsError, ShmSequencedSlots};
9use crate::{AttachOptions, BackingError};
10
11pub struct ShmSequencedSlotsSubscriber {
15 shm: ShmSequencedSlots,
16 consumer: usize,
17 next: u64,
18}
19
20impl ShmSequencedSlotsSubscriber {
21 pub fn attach(opts: &AttachOptions, lane: &str, consumer: usize) -> Result<Self, BackingError> {
23 let shm = ShmSequencedSlots::attach(opts, lane)?;
24 let layout = shm.layout_ptr();
25 let start = if consumer < 4 {
26 unsafe {
27 crate::ss_internal::atomic_u64(&(*layout).gating[consumer].seq)
28 .load(Ordering::Acquire)
29 }
30 } else {
31 0
32 };
33 Ok(Self {
34 shm,
35 consumer,
36 next: start.saturating_add(1),
37 })
38 }
39
40 pub fn recv_next_into<W: WaitStrategy>(
42 &mut self,
43 cursor: &CursorBarrier,
44 wait: &mut W,
45 out: &mut [u8],
46 ) -> Result<(u64, usize), SequencedSlotsError> {
47 self.recv_next_into_from(cursor, wait, out)
48 }
49
50 pub fn recv_next_into_from<W: WaitStrategy, B: byteor_pipeline_kernel::SeqBarrier>(
55 &mut self,
56 barrier: &B,
57 wait: &mut W,
58 out: &mut [u8],
59 ) -> Result<(u64, usize), SequencedSlotsError> {
60 let seq = self.next;
61 barrier.wait_for(seq, wait);
62
63 let layout = self.shm.layout_ptr();
64 let ring_idx = (seq % crate::ss_internal::ring_capacity()) as usize;
65 let slot_idx = unsafe {
66 crate::ss_internal::atomic_u32(&(*layout).ring[ring_idx]).load(Ordering::Acquire)
67 };
68 if slot_idx == byteor_abi::layouts::SequencedSlotsLayout::<4>::EMPTY_RING {
69 return Err(SequencedSlotsError::Incompatible);
70 }
71
72 let pool = unsafe { core::ptr::addr_of_mut!((*layout).slot_pool) };
73
74 let len = unsafe {
75 let s = &*core::ptr::addr_of!((*pool).slots[slot_idx as usize]);
76 let len = s.len as usize;
77 if len > indexbus_core::INDEXBUS_SLOT_DATA_SIZE {
78 return Err(SequencedSlotsError::Incompatible);
79 }
80 if out.len() < len {
81 return Err(SequencedSlotsError::TooLarge {
82 max: out.len(),
83 len,
84 });
85 }
86 core::ptr::copy_nonoverlapping(s.data.as_ptr(), out.as_mut_ptr(), len);
87 len
88 };
89
90 unsafe {
91 crate::ss_internal::slot_dec_refcnt_and_free_if_zero(pool, slot_idx);
92 crate::ss_internal::store_gating(layout, self.consumer, seq)?;
93 }
94
95 self.next = self.next.saturating_add(1);
96 Ok((seq, len))
97 }
98
99 pub fn recv_next_into_stoppable<W: WaitStrategy>(
101 &mut self,
102 cursor: &CursorBarrier,
103 wait: &mut W,
104 out: &mut [u8],
105 stop: &AtomicBool,
106 ) -> Result<(u64, usize), SequencedSlotsError> {
107 self.recv_next_into_from_stoppable(cursor, wait, out, stop)
108 }
109
110 pub fn recv_next_into_from_stoppable<W: WaitStrategy, B: byteor_pipeline_kernel::SeqBarrier>(
112 &mut self,
113 barrier: &B,
114 wait: &mut W,
115 out: &mut [u8],
116 stop: &AtomicBool,
117 ) -> Result<(u64, usize), SequencedSlotsError> {
118 let seq = self.next;
119 wait_for_or_stop(barrier, seq, wait, stop)?;
120
121 let layout = self.shm.layout_ptr();
122 let ring_idx = (seq % crate::ss_internal::ring_capacity()) as usize;
123 let slot_idx = unsafe {
124 crate::ss_internal::atomic_u32(&(*layout).ring[ring_idx]).load(Ordering::Acquire)
125 };
126 if slot_idx == byteor_abi::layouts::SequencedSlotsLayout::<4>::EMPTY_RING {
127 return Err(SequencedSlotsError::Incompatible);
128 }
129
130 let pool = unsafe { core::ptr::addr_of_mut!((*layout).slot_pool) };
131
132 let len = unsafe {
133 let s = &*core::ptr::addr_of!((*pool).slots[slot_idx as usize]);
134 let len = s.len as usize;
135 if len > indexbus_core::INDEXBUS_SLOT_DATA_SIZE {
136 return Err(SequencedSlotsError::Incompatible);
137 }
138 if out.len() < len {
139 return Err(SequencedSlotsError::TooLarge {
140 max: out.len(),
141 len,
142 });
143 }
144 core::ptr::copy_nonoverlapping(s.data.as_ptr(), out.as_mut_ptr(), len);
145 len
146 };
147
148 unsafe {
149 crate::ss_internal::slot_dec_refcnt_and_free_if_zero(pool, slot_idx);
150 crate::ss_internal::store_gating(layout, self.consumer, seq)?;
151 }
152
153 self.next = self.next.saturating_add(1);
154 Ok((seq, len))
155 }
156}