indexbus_blocking/
fanout.rs

1use std::sync::atomic::Ordering;
2use std::time::{Duration, Instant};
3
4use indexbus_abi::layouts::{FanoutWakeSection, SharedFanoutLayout, WakeCell};
5use indexbus_core::{
6    fanout_handles, FanoutConsumer, FanoutProducer, FanoutRouter, RouterMode, RouterSource,
7    SharedFanoutLayoutCell,
8};
9
10use crate::internal::atomics::atomic_u32_abi;
11use crate::internal::layout::{fanout_wake_section_ptr, require_blocking_caps, wake_cell_seq_ptr};
12use crate::{Error, Result};
13
14/// Blocking wrapper over an `indexbus-core` fanout producer.
15pub struct BlockingFanoutProducer<const N: usize> {
16    inner: FanoutProducer<N>,
17    wake: *mut WakeCell,
18}
19
20/// Blocking wrapper over an `indexbus-core` fanout router.
21pub struct BlockingFanoutRouter<const N: usize> {
22    inner: FanoutRouter<N>,
23    wake_section: *mut FanoutWakeSection<N>,
24}
25
26/// Blocking view over an existing `FanoutRouter` reference.
27pub struct BlockingFanoutRouterRef<'a, const N: usize> {
28    router: &'a FanoutRouter<N>,
29    wake_section: *mut FanoutWakeSection<N>,
30}
31
32/// Blocking wrapper over an `indexbus-core` fanout consumer.
33pub struct BlockingFanoutConsumer<const N: usize> {
34    inner: FanoutConsumer<N>,
35    wake: *mut WakeCell,
36}
37
38unsafe impl<const N: usize> Send for BlockingFanoutProducer<N> {}
39unsafe impl<const N: usize> Sync for BlockingFanoutProducer<N> {}
40unsafe impl<const N: usize> Send for BlockingFanoutRouter<N> {}
41unsafe impl<const N: usize> Sync for BlockingFanoutRouter<N> {}
42unsafe impl<const N: usize> Send for BlockingFanoutRouterRef<'_, N> {}
43unsafe impl<const N: usize> Sync for BlockingFanoutRouterRef<'_, N> {}
44unsafe impl<const N: usize> Send for BlockingFanoutConsumer<N> {}
45unsafe impl<const N: usize> Sync for BlockingFanoutConsumer<N> {}
46
47/// Create wake-backed blocking wrappers for a fanout region.
48///
49/// Requires `INDEXBUS_CAP_SUPPORTS_BLOCKING` and a present appended `FanoutWakeSection<N>`.
50pub fn fanout_handles_blocking<const N: usize>(
51    shared: &mut SharedFanoutLayout<N>,
52    consumer: usize,
53) -> Result<(
54    BlockingFanoutProducer<N>,
55    BlockingFanoutRouter<N>,
56    BlockingFanoutConsumer<N>,
57)> {
58    indexbus_core::validate_fanout_layout::<N>(shared)?;
59    require_blocking_caps(&shared.header)?;
60
61    let cell = SharedFanoutLayoutCell::from_mut(shared);
62    let (tx, router, rx) = fanout_handles(cell, consumer)?;
63
64    let wake_section =
65        unsafe { &mut *fanout_wake_section_ptr(shared as *mut SharedFanoutLayout<N>) };
66
67    let producer_wake = core::ptr::addr_of_mut!(wake_section.producer_wake);
68    let consumer_wake = if consumer < N {
69        core::ptr::addr_of_mut!(wake_section.consumer_wake[consumer])
70    } else {
71        // Matches indexbus-core fanout consumer semantics: out-of-range consumer is inert.
72        core::ptr::null_mut()
73    };
74
75    Ok((
76        BlockingFanoutProducer {
77            inner: tx,
78            wake: producer_wake,
79        },
80        BlockingFanoutRouter {
81            inner: router,
82            wake_section,
83        },
84        BlockingFanoutConsumer {
85            inner: rx,
86            wake: consumer_wake,
87        },
88    ))
89}
90
91/// Create a wake-backed blocking view over an existing `FanoutRouter`.
92///
93/// This is intended for routing loops (`indexbus-route`) that already own a router handle.
94pub fn fanout_router_ref_blocking<const N: usize>(
95    router: &FanoutRouter<N>,
96) -> Result<BlockingFanoutRouterRef<'_, N>> {
97    let shared = router.as_ptr();
98    indexbus_core::validate_fanout_layout::<N>(unsafe { &*shared })?;
99    require_blocking_caps(&unsafe { &*shared }.header)?;
100
101    let wake_section = unsafe { &mut *fanout_wake_section_ptr(shared) };
102
103    Ok(BlockingFanoutRouterRef {
104        router,
105        wake_section,
106    })
107}
108
109impl<const N: usize> BlockingFanoutProducer<N> {
110    #[inline]
111    /// Publish a message and wake any blocked routers/consumers.
112    pub fn publish(&self, data: &[u8]) -> core::result::Result<(), indexbus_core::Error> {
113        self.inner.publish(data)?;
114
115        // publish -> core bumps seq -> OS wake
116        unsafe { indexbus_wake::wake_one(wake_cell_seq_ptr(self.wake)) };
117        Ok(())
118    }
119}
120
121impl<const N: usize> BlockingFanoutRouter<N> {
122    #[inline]
123    /// Route once (delegates to `indexbus-core`) and wake any blocked consumers on delivery.
124    pub fn route_once_with_stats(
125        &self,
126        source: RouterSource,
127        mode: RouterMode,
128    ) -> indexbus_core::RouteOnceResult {
129        let res = self.inner.route_once_with_stats(source, mode);
130
131        // route -> core bumps consumer seq(s) on successful enqueue -> OS wake
132        if res.delivered > 0 {
133            for i in 0..N {
134                let cell =
135                    unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[i]) };
136                unsafe { indexbus_wake::wake_all(wake_cell_seq_ptr(cell)) };
137            }
138        }
139
140        res
141    }
142
143    #[inline]
144    /// Access the underlying nonblocking router.
145    pub fn as_inner(&self) -> &FanoutRouter<N> {
146        &self.inner
147    }
148
149    #[inline]
150    /// Read the current producer wake sequence.
151    pub fn producer_seq(&self) -> u32 {
152        let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).producer_wake) };
153        unsafe { atomic_u32_abi(&(*cell).seq) }.load(Ordering::Acquire)
154    }
155
156    /// Wait until the producer sequence differs from `expected` or timeout.
157    ///
158    /// Returns:
159    /// - `Ok(true)` if woken due to a change
160    /// - `Ok(false)` on timeout
161    pub fn wait_producer_seq_ne(&self, expected: u32, timeout: Option<Duration>) -> Result<bool> {
162        let remaining = timeout;
163        let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).producer_wake) };
164        match unsafe { indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(cell), expected, remaining) } {
165            Ok(()) => Ok(true),
166            Err(indexbus_wake::WaitError::TimedOut) => Ok(false),
167            Err(e) => Err(Error::Wait(e)),
168        }
169    }
170
171    /// Wait until the consumer wake sequence differs from `expected` or timeout.
172    pub fn wait_consumer_seq_ne(
173        &self,
174        consumer: usize,
175        expected: u32,
176        timeout: Option<Duration>,
177    ) -> Result<bool> {
178        if consumer >= N {
179            return Ok(false);
180        }
181
182        let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[consumer]) };
183        match unsafe { indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(cell), expected, timeout) } {
184            Ok(()) => Ok(true),
185            Err(indexbus_wake::WaitError::TimedOut) => Ok(false),
186            Err(e) => Err(Error::Wait(e)),
187        }
188    }
189
190    #[inline]
191    /// Read the current consumer wake sequence for `consumer`.
192    pub fn consumer_seq(&self, consumer: usize) -> Option<u32> {
193        if consumer >= N {
194            return None;
195        }
196        let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[consumer]) };
197        Some(unsafe { atomic_u32_abi(&(*cell).seq) }.load(Ordering::Acquire))
198    }
199}
200
201impl<const N: usize> BlockingFanoutRouterRef<'_, N> {
202    #[inline]
203    /// Route once (delegates to `indexbus-core`) and wake any blocked consumers on delivery.
204    pub fn route_once_with_stats(
205        &self,
206        source: RouterSource,
207        mode: RouterMode,
208    ) -> indexbus_core::RouteOnceResult {
209        let res = self.router.route_once_with_stats(source, mode);
210
211        if res.delivered > 0 {
212            for i in 0..N {
213                let cell =
214                    unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[i]) };
215                unsafe { indexbus_wake::wake_all(wake_cell_seq_ptr(cell)) };
216            }
217        }
218
219        res
220    }
221
222    #[inline]
223    /// Read the current producer wake sequence.
224    pub fn producer_seq(&self) -> u32 {
225        let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).producer_wake) };
226        unsafe { atomic_u32_abi(&(*cell).seq) }.load(Ordering::Acquire)
227    }
228
229    #[inline]
230    /// Read the current consumer wake sequence for `consumer`.
231    pub fn consumer_seq(&self, consumer: usize) -> Option<u32> {
232        if consumer >= N {
233            return None;
234        }
235        let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[consumer]) };
236        Some(unsafe { atomic_u32_abi(&(*cell).seq) }.load(Ordering::Acquire))
237    }
238
239    /// Wait until the producer wake sequence differs from `expected` or timeout.
240    pub fn wait_producer_seq_ne(&self, expected: u32, timeout: Option<Duration>) -> Result<bool> {
241        let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).producer_wake) };
242        match unsafe { indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(cell), expected, timeout) } {
243            Ok(()) => Ok(true),
244            Err(indexbus_wake::WaitError::TimedOut) => Ok(false),
245            Err(e) => Err(Error::Wait(e)),
246        }
247    }
248
249    /// Wait until the consumer wake sequence differs from `expected` or timeout.
250    pub fn wait_consumer_seq_ne(
251        &self,
252        consumer: usize,
253        expected: u32,
254        timeout: Option<Duration>,
255    ) -> Result<bool> {
256        if consumer >= N {
257            return Ok(false);
258        }
259
260        let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[consumer]) };
261        match unsafe { indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(cell), expected, timeout) } {
262            Ok(()) => Ok(true),
263            Err(indexbus_wake::WaitError::TimedOut) => Ok(false),
264            Err(e) => Err(Error::Wait(e)),
265        }
266    }
267}
268
269impl<const N: usize> BlockingFanoutConsumer<N> {
270    #[inline]
271    /// Attempt to receive once, waking blocked routers if space is released.
272    pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>> {
273        let n = self.inner.try_recv_into(out)?;
274        if n.is_some() && !self.wake.is_null() {
275            // recv -> core bumps consumer seq -> OS wake (unblocks routers waiting for space)
276            unsafe { indexbus_wake::wake_one(wake_cell_seq_ptr(self.wake)) };
277        }
278        Ok(n)
279    }
280
281    /// Block until a message is received (copied into `out`) or timeout.
282    ///
283    /// - `Ok(Some(n))` on success
284    /// - `Ok(None)` on timeout
285    pub fn recv_blocking_into(
286        &self,
287        out: &mut [u8],
288        timeout: Option<Duration>,
289    ) -> Result<Option<usize>> {
290        if self.wake.is_null() {
291            return Ok(None);
292        }
293
294        let deadline = timeout.map(|t| Instant::now() + t);
295        loop {
296            match self.try_recv_into(out) {
297                Ok(Some(n)) => return Ok(Some(n)),
298                Ok(None) => {}
299                Err(e) => return Err(e),
300            }
301
302            let expected = unsafe { atomic_u32_abi(&(*self.wake).seq) }.load(Ordering::Acquire);
303
304            // Re-check after capturing expected to avoid missing a wake.
305            if let Ok(Some(n)) = self.try_recv_into(out) {
306                return Ok(Some(n));
307            }
308
309            let remaining = deadline.map(|d| d.saturating_duration_since(Instant::now()));
310            if let Some(r) = remaining {
311                if r.is_zero() {
312                    return Ok(None);
313                }
314            }
315
316            match unsafe {
317                indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(self.wake), expected, remaining)
318            } {
319                Ok(()) => {}
320                Err(indexbus_wake::WaitError::TimedOut) => return Ok(None),
321                Err(e) => return Err(Error::Wait(e)),
322            }
323        }
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use std::alloc::{alloc_zeroed, dealloc, Layout};
331    use std::sync::mpsc;
332    use std::thread;
333
334    #[test]
335    fn fanout_blocking_roundtrip() {
336        const N: usize = 4;
337
338        let mut base_layout: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
339        indexbus_core::init_shared_fanout_layout::<N>(&mut base_layout);
340
341        let base = core::mem::size_of::<SharedFanoutLayout<N>>();
342        let wake_offset = base.div_ceil(64) * 64;
343        let total = wake_offset + core::mem::size_of::<FanoutWakeSection<N>>();
344
345        let layout = Layout::from_size_align(total, 64).unwrap();
346        let buf_ptr = unsafe { alloc_zeroed(layout) };
347        assert!(!buf_ptr.is_null());
348        assert_eq!((buf_ptr as usize) % 64, 0);
349
350        unsafe {
351            core::ptr::copy_nonoverlapping(
352                (&*base_layout as *const SharedFanoutLayout<N>) as *const u8,
353                buf_ptr,
354                base,
355            );
356        }
357
358        let shared_ptr = buf_ptr as *mut SharedFanoutLayout<N>;
359        unsafe {
360            (*shared_ptr).header.capabilities |= indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING;
361            (*shared_ptr).header.layout_bytes = total as u32;
362        }
363
364        let shared_ref = unsafe { &mut *shared_ptr };
365        let (tx, router, rx) = fanout_handles_blocking::<N>(shared_ref, 0).unwrap();
366
367        let t = thread::spawn(move || {
368            let mut out = [0u8; 256];
369            let n = rx
370                .recv_blocking_into(&mut out, Some(Duration::from_secs(1)))
371                .unwrap()
372                .unwrap();
373            assert_eq!(&out[..n], b"hi");
374        });
375
376        tx.publish(b"hi").unwrap();
377
378        // Drive the router until it routes our message.
379        let mut routed = false;
380        for _ in 0..10_000 {
381            let res = router.route_once_with_stats(RouterSource::Spsc, RouterMode::Broadcast);
382            if res.routed {
383                routed = true;
384                break;
385            }
386            std::thread::yield_now();
387        }
388        assert!(routed);
389
390        t.join().unwrap();
391
392        unsafe { dealloc(buf_ptr, layout) };
393    }
394
395    #[test]
396    fn router_waits_when_idle_then_wakes_on_publish() {
397        const N: usize = 1;
398
399        let mut base_layout: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
400        indexbus_core::init_shared_fanout_layout::<N>(&mut base_layout);
401
402        let base = core::mem::size_of::<SharedFanoutLayout<N>>();
403        let wake_offset = base.div_ceil(64) * 64;
404        let total = wake_offset + core::mem::size_of::<FanoutWakeSection<N>>();
405
406        let layout = Layout::from_size_align(total, 64).unwrap();
407        let buf_ptr = unsafe { alloc_zeroed(layout) };
408        assert!(!buf_ptr.is_null());
409        assert_eq!((buf_ptr as usize) % 64, 0);
410
411        unsafe {
412            core::ptr::copy_nonoverlapping(
413                (&*base_layout as *const SharedFanoutLayout<N>) as *const u8,
414                buf_ptr,
415                base,
416            );
417        }
418
419        let shared_ptr = buf_ptr as *mut SharedFanoutLayout<N>;
420        unsafe {
421            (*shared_ptr).header.capabilities |= indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING;
422            (*shared_ptr).header.layout_bytes = total as u32;
423        }
424
425        let shared_ref = unsafe { &mut *shared_ptr };
426        let (tx, router, _rx) = fanout_handles_blocking::<N>(shared_ref, 0).unwrap();
427
428        let expected = router.producer_seq();
429        assert!(!router
430            .wait_producer_seq_ne(expected, Some(Duration::from_millis(10)))
431            .unwrap());
432
433        let (ready_tx, ready_rx) = mpsc::channel::<()>();
434        let (done_tx, done_rx) = mpsc::channel::<bool>();
435        let waiter = thread::spawn(move || {
436            ready_tx.send(()).unwrap();
437            let woke = router
438                .wait_producer_seq_ne(expected, Some(Duration::from_secs(1)))
439                .unwrap();
440            done_tx.send(woke).unwrap();
441        });
442
443        ready_rx.recv().unwrap();
444        tx.publish(b"ping").unwrap();
445
446        assert!(done_rx.recv().unwrap());
447        waiter.join().unwrap();
448
449        unsafe { dealloc(buf_ptr, layout) };
450    }
451
452    #[test]
453    fn router_waits_for_consumer_pressure_and_wakes_on_recv() {
454        const N: usize = 1;
455
456        let mut base_layout: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
457        indexbus_core::init_shared_fanout_layout::<N>(&mut base_layout);
458
459        let base = core::mem::size_of::<SharedFanoutLayout<N>>();
460        let wake_offset = base.div_ceil(64) * 64;
461        let total = wake_offset + core::mem::size_of::<FanoutWakeSection<N>>();
462
463        let layout = Layout::from_size_align(total, 64).unwrap();
464        let buf_ptr = unsafe { alloc_zeroed(layout) };
465        assert!(!buf_ptr.is_null());
466        assert_eq!((buf_ptr as usize) % 64, 0);
467
468        unsafe {
469            core::ptr::copy_nonoverlapping(
470                (&*base_layout as *const SharedFanoutLayout<N>) as *const u8,
471                buf_ptr,
472                base,
473            );
474        }
475
476        let shared_ptr = buf_ptr as *mut SharedFanoutLayout<N>;
477        unsafe {
478            (*shared_ptr).header.capabilities |= indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING;
479            (*shared_ptr).header.layout_bytes = total as u32;
480        }
481
482        let shared_ref = unsafe { &mut *shared_ptr };
483        let (tx, router, rx) = fanout_handles_blocking::<N>(shared_ref, 0).unwrap();
484
485        // Apply backpressure by filling the consumer queue until the slot pool is exhausted.
486        // This avoids relying on internal queue depths and doesn't require observing drops.
487        let mut saw_full = false;
488        for _ in 0..1_000_000 {
489            match tx.publish(b"x") {
490                Ok(()) => {
491                    let _ = router.route_once_with_stats(RouterSource::Spsc, RouterMode::WorkQueue);
492                }
493                Err(indexbus_core::Error::Full) => {
494                    saw_full = true;
495                    break;
496                }
497                Err(e) => panic!("unexpected publish error: {e:?}"),
498            }
499        }
500        assert!(
501            saw_full,
502            "did not reach a full producer pool / destination pressure state"
503        );
504
505        let expected = router.consumer_seq(0).unwrap();
506        let (ready_tx, ready_rx) = mpsc::channel::<()>();
507        let (done_tx, done_rx) = mpsc::channel::<bool>();
508
509        let waiter = thread::spawn(move || {
510            ready_tx.send(()).unwrap();
511            let woke = router
512                .wait_consumer_seq_ne(0, expected, Some(Duration::from_secs(1)))
513                .unwrap();
514            done_tx.send(woke).unwrap();
515        });
516
517        ready_rx.recv().unwrap();
518
519        // Free one slot: recv should bump the consumer wake seq and issue an OS wake.
520        let mut out = [0u8; 64];
521        assert!(rx.try_recv_into(&mut out).unwrap().is_some());
522
523        assert!(done_rx.recv().unwrap());
524        waiter.join().unwrap();
525
526        unsafe { dealloc(buf_ptr, layout) };
527    }
528}