indexbus_blocking/
events.rs1use indexbus_core::{
2 split_mpsc, split_spsc, MpscConsumer, MpscProducer, SharedLayoutCell, SpscReceiver, SpscSender,
3};
4use std::sync::atomic::Ordering;
5use std::time::{Duration, Instant};
6
7use indexbus_abi::layouts::SharedLayout;
8
9use crate::internal::atomics::atomic_u32_abi;
10use crate::internal::layout::{require_blocking_caps, shared_wake_section_ptr, wake_cell_seq_ptr};
11use crate::{Error, Result};
12
13pub struct BlockingSpscSender {
15 inner: SpscSender,
16 wake: *mut indexbus_abi::layouts::WakeCell,
17}
18
19pub struct BlockingSpscReceiver {
21 inner: SpscReceiver,
22 wake: *mut indexbus_abi::layouts::WakeCell,
23}
24
25pub struct BlockingMpscProducer {
27 inner: MpscProducer,
28 wake: *mut indexbus_abi::layouts::WakeCell,
29}
30
31pub struct BlockingMpscConsumer {
33 inner: MpscConsumer,
34 wake: *mut indexbus_abi::layouts::WakeCell,
35}
36
37unsafe impl Send for BlockingSpscSender {}
38unsafe impl Sync for BlockingSpscSender {}
39unsafe impl Send for BlockingSpscReceiver {}
40unsafe impl Sync for BlockingSpscReceiver {}
41unsafe impl Send for BlockingMpscProducer {}
42unsafe impl Sync for BlockingMpscProducer {}
43unsafe impl Send for BlockingMpscConsumer {}
44unsafe impl Sync for BlockingMpscConsumer {}
45
46pub fn split_spsc_blocking(
50 shared: &mut SharedLayout,
51) -> Result<(BlockingSpscSender, BlockingSpscReceiver)> {
52 indexbus_core::validate_shared_layout(shared)?;
53 require_blocking_caps(&shared.header)?;
54
55 let (tx, rx) = split_spsc(SharedLayoutCell::from_mut(shared))?;
56
57 let wake_section = unsafe { &mut *shared_wake_section_ptr(shared as *mut SharedLayout) };
58 let wake = core::ptr::addr_of_mut!(wake_section.spsc_wake);
59
60 Ok((
61 BlockingSpscSender { inner: tx, wake },
62 BlockingSpscReceiver { inner: rx, wake },
63 ))
64}
65
66pub fn split_mpsc_blocking(
70 shared: &mut SharedLayout,
71) -> Result<(BlockingMpscProducer, BlockingMpscConsumer)> {
72 indexbus_core::validate_shared_layout(shared)?;
73 require_blocking_caps(&shared.header)?;
74
75 let (tx, rx) = split_mpsc(SharedLayoutCell::from_mut(shared))?;
76
77 let wake_section = unsafe { &mut *shared_wake_section_ptr(shared as *mut SharedLayout) };
78 let wake = core::ptr::addr_of_mut!(wake_section.mpsc_wake);
79
80 Ok((
81 BlockingMpscProducer { inner: tx, wake },
82 BlockingMpscConsumer { inner: rx, wake },
83 ))
84}
85
86impl BlockingSpscSender {
87 #[inline]
88 pub fn publish(&self, data: &[u8]) -> core::result::Result<(), indexbus_core::Error> {
90 self.inner.publish(data)?;
91
92 let seq = unsafe { atomic_u32_abi(&(*self.wake).seq) };
94 seq.fetch_add(1, Ordering::Release);
95 unsafe { indexbus_wake::wake_one(wake_cell_seq_ptr(self.wake)) };
96 Ok(())
97 }
98}
99
100impl BlockingMpscProducer {
101 #[inline]
102 pub fn publish(&self, data: &[u8]) -> core::result::Result<(), indexbus_core::Error> {
104 self.inner.publish(data)?;
105
106 let seq = unsafe { atomic_u32_abi(&(*self.wake).seq) };
108 seq.fetch_add(1, Ordering::Release);
109 unsafe { indexbus_wake::wake_one(wake_cell_seq_ptr(self.wake)) };
110 Ok(())
111 }
112}
113
114impl BlockingSpscReceiver {
115 pub fn recv_blocking_into(
120 &self,
121 out: &mut [u8],
122 timeout: Option<Duration>,
123 ) -> Result<Option<usize>> {
124 let deadline = timeout.map(|t| Instant::now() + t);
125 loop {
126 match self.inner.try_recv_into(out) {
127 Ok(Some(n)) => return Ok(Some(n)),
128 Ok(None) => {}
129 Err(e) => return Err(Error::Core(e)),
130 }
131
132 let expected = unsafe { atomic_u32_abi(&(*self.wake).seq) }.load(Ordering::Acquire);
133
134 if let Ok(Some(n)) = self.inner.try_recv_into(out) {
136 return Ok(Some(n));
137 }
138
139 let remaining = deadline.map(|d| d.saturating_duration_since(Instant::now()));
140 if let Some(r) = remaining {
141 if r.is_zero() {
142 return Ok(None);
143 }
144 }
145
146 unsafe {
147 indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(self.wake), expected, remaining)?;
148 }
149 }
150 }
151}
152
153impl BlockingMpscConsumer {
154 pub fn recv_blocking_into(
159 &self,
160 out: &mut [u8],
161 timeout: Option<Duration>,
162 ) -> Result<Option<usize>> {
163 let deadline = timeout.map(|t| Instant::now() + t);
164 loop {
165 match self.inner.try_recv_into(out) {
166 Ok(Some(n)) => return Ok(Some(n)),
167 Ok(None) => {}
168 Err(e) => return Err(Error::Core(e)),
169 }
170
171 let expected = unsafe { atomic_u32_abi(&(*self.wake).seq) }.load(Ordering::Acquire);
172
173 if let Ok(Some(n)) = self.inner.try_recv_into(out) {
174 return Ok(Some(n));
175 }
176
177 let remaining = deadline.map(|d| d.saturating_duration_since(Instant::now()));
178 if let Some(r) = remaining {
179 if r.is_zero() {
180 return Ok(None);
181 }
182 }
183
184 unsafe {
185 indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(self.wake), expected, remaining)?;
186 }
187 }
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use indexbus_abi::layouts::SharedWakeSection;
195 use std::alloc::{alloc_zeroed, dealloc, Layout};
196
197 #[test]
198 fn spsc_blocking_roundtrip() {
199 let mut shared: Box<SharedLayout> = Box::new(unsafe { core::mem::zeroed() });
200 indexbus_core::init_shared_layout(&mut shared);
201
202 let base = core::mem::size_of::<SharedLayout>();
206 let wake_offset = base.div_ceil(64) * 64;
207 let total = wake_offset + core::mem::size_of::<SharedWakeSection>();
208
209 let layout = Layout::from_size_align(total, 64).unwrap();
211 let buf_ptr = unsafe { alloc_zeroed(layout) };
212 assert!(!buf_ptr.is_null());
213 assert_eq!((buf_ptr as usize) % 64, 0);
214
215 unsafe {
216 core::ptr::copy_nonoverlapping(
217 (&*shared as *const SharedLayout) as *const u8,
218 buf_ptr,
219 base,
220 );
221 }
222
223 let shared_ptr = buf_ptr as *mut SharedLayout;
224 unsafe {
225 (*shared_ptr).header.capabilities |= indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING;
226 (*shared_ptr).header.layout_bytes = total as u32;
227 }
228
229 let shared_ref = unsafe { &mut *shared_ptr };
233 let (tx, rx) = split_spsc_blocking(shared_ref).unwrap();
234
235 tx.publish(b"hi").unwrap();
236 let mut out = [0u8; 256];
237 let n = rx
238 .recv_blocking_into(&mut out, Some(Duration::from_secs(1)))
239 .unwrap()
240 .unwrap();
241 assert_eq!(&out[..n], b"hi");
242
243 unsafe { dealloc(buf_ptr, layout) };
244 }
245}