indexbus_blocking/
events.rs

1use indexbus_core::{
2    split_mpsc, split_spsc, MpscConsumer, MpscProducer, SharedLayoutCell, SpscReceiver, SpscSender,
3};
4use std::sync::atomic::Ordering;
5use std::time::{Duration, Instant};
6
7use indexbus_abi::layouts::SharedLayout;
8
9use crate::internal::atomics::atomic_u32_abi;
10use crate::internal::layout::{require_blocking_caps, shared_wake_section_ptr, wake_cell_seq_ptr};
11use crate::{Error, Result};
12
13/// Blocking wrapper over an `indexbus-core` SPSC sender.
14pub struct BlockingSpscSender {
15    inner: SpscSender,
16    wake: *mut indexbus_abi::layouts::WakeCell,
17}
18
19/// Blocking wrapper over an `indexbus-core` SPSC receiver.
20pub struct BlockingSpscReceiver {
21    inner: SpscReceiver,
22    wake: *mut indexbus_abi::layouts::WakeCell,
23}
24
25/// Blocking wrapper over an `indexbus-core` MPSC producer.
26pub struct BlockingMpscProducer {
27    inner: MpscProducer,
28    wake: *mut indexbus_abi::layouts::WakeCell,
29}
30
31/// Blocking wrapper over an `indexbus-core` MPSC consumer.
32pub struct BlockingMpscConsumer {
33    inner: MpscConsumer,
34    wake: *mut indexbus_abi::layouts::WakeCell,
35}
36
37unsafe impl Send for BlockingSpscSender {}
38unsafe impl Sync for BlockingSpscSender {}
39unsafe impl Send for BlockingSpscReceiver {}
40unsafe impl Sync for BlockingSpscReceiver {}
41unsafe impl Send for BlockingMpscProducer {}
42unsafe impl Sync for BlockingMpscProducer {}
43unsafe impl Send for BlockingMpscConsumer {}
44unsafe impl Sync for BlockingMpscConsumer {}
45
46/// Create wake-backed blocking wrappers for SPSC over a `SharedLayout`.
47///
48/// Requires `INDEXBUS_CAP_SUPPORTS_BLOCKING` and a present appended `SharedWakeSection`.
49pub fn split_spsc_blocking(
50    shared: &mut SharedLayout,
51) -> Result<(BlockingSpscSender, BlockingSpscReceiver)> {
52    indexbus_core::validate_shared_layout(shared)?;
53    require_blocking_caps(&shared.header)?;
54
55    let (tx, rx) = split_spsc(SharedLayoutCell::from_mut(shared))?;
56
57    let wake_section = unsafe { &mut *shared_wake_section_ptr(shared as *mut SharedLayout) };
58    let wake = core::ptr::addr_of_mut!(wake_section.spsc_wake);
59
60    Ok((
61        BlockingSpscSender { inner: tx, wake },
62        BlockingSpscReceiver { inner: rx, wake },
63    ))
64}
65
66/// Create wake-backed blocking wrappers for MPSC over a `SharedLayout`.
67///
68/// Requires `INDEXBUS_CAP_SUPPORTS_BLOCKING` and a present appended `SharedWakeSection`.
69pub fn split_mpsc_blocking(
70    shared: &mut SharedLayout,
71) -> Result<(BlockingMpscProducer, BlockingMpscConsumer)> {
72    indexbus_core::validate_shared_layout(shared)?;
73    require_blocking_caps(&shared.header)?;
74
75    let (tx, rx) = split_mpsc(SharedLayoutCell::from_mut(shared))?;
76
77    let wake_section = unsafe { &mut *shared_wake_section_ptr(shared as *mut SharedLayout) };
78    let wake = core::ptr::addr_of_mut!(wake_section.mpsc_wake);
79
80    Ok((
81        BlockingMpscProducer { inner: tx, wake },
82        BlockingMpscConsumer { inner: rx, wake },
83    ))
84}
85
86impl BlockingSpscSender {
87    #[inline]
88    /// Publish a message and wake a blocked receiver.
89    pub fn publish(&self, data: &[u8]) -> core::result::Result<(), indexbus_core::Error> {
90        self.inner.publish(data)?;
91
92        // publish -> seq update -> OS wake
93        let seq = unsafe { atomic_u32_abi(&(*self.wake).seq) };
94        seq.fetch_add(1, Ordering::Release);
95        unsafe { indexbus_wake::wake_one(wake_cell_seq_ptr(self.wake)) };
96        Ok(())
97    }
98}
99
100impl BlockingMpscProducer {
101    #[inline]
102    /// Publish a message and wake a blocked consumer.
103    pub fn publish(&self, data: &[u8]) -> core::result::Result<(), indexbus_core::Error> {
104        self.inner.publish(data)?;
105
106        // publish -> seq update -> OS wake
107        let seq = unsafe { atomic_u32_abi(&(*self.wake).seq) };
108        seq.fetch_add(1, Ordering::Release);
109        unsafe { indexbus_wake::wake_one(wake_cell_seq_ptr(self.wake)) };
110        Ok(())
111    }
112}
113
114impl BlockingSpscReceiver {
115    /// Block until a message is received (copied into `out`) or timeout.
116    ///
117    /// - `Ok(Some(n))` on success
118    /// - `Ok(None)` on timeout
119    pub fn recv_blocking_into(
120        &self,
121        out: &mut [u8],
122        timeout: Option<Duration>,
123    ) -> Result<Option<usize>> {
124        let deadline = timeout.map(|t| Instant::now() + t);
125        loop {
126            match self.inner.try_recv_into(out) {
127                Ok(Some(n)) => return Ok(Some(n)),
128                Ok(None) => {}
129                Err(e) => return Err(Error::Core(e)),
130            }
131
132            let expected = unsafe { atomic_u32_abi(&(*self.wake).seq) }.load(Ordering::Acquire);
133
134            // Re-check after capturing expected to avoid missing a wake.
135            if let Ok(Some(n)) = self.inner.try_recv_into(out) {
136                return Ok(Some(n));
137            }
138
139            let remaining = deadline.map(|d| d.saturating_duration_since(Instant::now()));
140            if let Some(r) = remaining {
141                if r.is_zero() {
142                    return Ok(None);
143                }
144            }
145
146            unsafe {
147                indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(self.wake), expected, remaining)?;
148            }
149        }
150    }
151}
152
153impl BlockingMpscConsumer {
154    /// Block until a message is received (copied into `out`) or timeout.
155    ///
156    /// - `Ok(Some(n))` on success
157    /// - `Ok(None)` on timeout
158    pub fn recv_blocking_into(
159        &self,
160        out: &mut [u8],
161        timeout: Option<Duration>,
162    ) -> Result<Option<usize>> {
163        let deadline = timeout.map(|t| Instant::now() + t);
164        loop {
165            match self.inner.try_recv_into(out) {
166                Ok(Some(n)) => return Ok(Some(n)),
167                Ok(None) => {}
168                Err(e) => return Err(Error::Core(e)),
169            }
170
171            let expected = unsafe { atomic_u32_abi(&(*self.wake).seq) }.load(Ordering::Acquire);
172
173            if let Ok(Some(n)) = self.inner.try_recv_into(out) {
174                return Ok(Some(n));
175            }
176
177            let remaining = deadline.map(|d| d.saturating_duration_since(Instant::now()));
178            if let Some(r) = remaining {
179                if r.is_zero() {
180                    return Ok(None);
181                }
182            }
183
184            unsafe {
185                indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(self.wake), expected, remaining)?;
186            }
187        }
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use indexbus_abi::layouts::SharedWakeSection;
195    use std::alloc::{alloc_zeroed, dealloc, Layout};
196
197    #[test]
198    fn spsc_blocking_roundtrip() {
199        let mut shared: Box<SharedLayout> = Box::new(unsafe { core::mem::zeroed() });
200        indexbus_core::init_shared_layout(&mut shared);
201
202        // Add blocking capability by patching header/layout_bytes and ensuring wake section exists.
203        // For this unit test, we allocate extra bytes by leaking a Vec and using it as a fake mapping.
204        // (This avoids needing indexbus-transport-shm.)
205        let base = core::mem::size_of::<SharedLayout>();
206        let wake_offset = base.div_ceil(64) * 64;
207        let total = wake_offset + core::mem::size_of::<SharedWakeSection>();
208
209        // Allocate a 64B-aligned backing buffer so we can safely treat it as SharedLayout.
210        let layout = Layout::from_size_align(total, 64).unwrap();
211        let buf_ptr = unsafe { alloc_zeroed(layout) };
212        assert!(!buf_ptr.is_null());
213        assert_eq!((buf_ptr as usize) % 64, 0);
214
215        unsafe {
216            core::ptr::copy_nonoverlapping(
217                (&*shared as *const SharedLayout) as *const u8,
218                buf_ptr,
219                base,
220            );
221        }
222
223        let shared_ptr = buf_ptr as *mut SharedLayout;
224        unsafe {
225            (*shared_ptr).header.capabilities |= indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING;
226            (*shared_ptr).header.layout_bytes = total as u32;
227        }
228
229        // Mark initialized, since we copied a fully initialized layout.
230        // Wake section starts zeroed which is fine.
231
232        let shared_ref = unsafe { &mut *shared_ptr };
233        let (tx, rx) = split_spsc_blocking(shared_ref).unwrap();
234
235        tx.publish(b"hi").unwrap();
236        let mut out = [0u8; 256];
237        let n = rx
238            .recv_blocking_into(&mut out, Some(Duration::from_secs(1)))
239            .unwrap()
240            .unwrap();
241        assert_eq!(&out[..n], b"hi");
242
243        unsafe { dealloc(buf_ptr, layout) };
244    }
245}