1use std::sync::atomic::Ordering;
2use std::time::{Duration, Instant};
3
4use indexbus_abi::layouts::{FanoutWakeSection, SharedFanoutLayout, WakeCell};
5use indexbus_core::{
6 fanout_handles, FanoutConsumer, FanoutProducer, FanoutRouter, RouterMode, RouterSource,
7 SharedFanoutLayoutCell,
8};
9
10use crate::internal::atomics::atomic_u32_abi;
11use crate::internal::layout::{fanout_wake_section_ptr, require_blocking_caps, wake_cell_seq_ptr};
12use crate::{Error, Result};
13
14pub struct BlockingFanoutProducer<const N: usize> {
16 inner: FanoutProducer<N>,
17 wake: *mut WakeCell,
18}
19
20pub struct BlockingFanoutRouter<const N: usize> {
22 inner: FanoutRouter<N>,
23 wake_section: *mut FanoutWakeSection<N>,
24}
25
26pub struct BlockingFanoutRouterRef<'a, const N: usize> {
28 router: &'a FanoutRouter<N>,
29 wake_section: *mut FanoutWakeSection<N>,
30}
31
32pub struct BlockingFanoutConsumer<const N: usize> {
34 inner: FanoutConsumer<N>,
35 wake: *mut WakeCell,
36}
37
38unsafe impl<const N: usize> Send for BlockingFanoutProducer<N> {}
39unsafe impl<const N: usize> Sync for BlockingFanoutProducer<N> {}
40unsafe impl<const N: usize> Send for BlockingFanoutRouter<N> {}
41unsafe impl<const N: usize> Sync for BlockingFanoutRouter<N> {}
42unsafe impl<const N: usize> Send for BlockingFanoutRouterRef<'_, N> {}
43unsafe impl<const N: usize> Sync for BlockingFanoutRouterRef<'_, N> {}
44unsafe impl<const N: usize> Send for BlockingFanoutConsumer<N> {}
45unsafe impl<const N: usize> Sync for BlockingFanoutConsumer<N> {}
46
47pub fn fanout_handles_blocking<const N: usize>(
51 shared: &mut SharedFanoutLayout<N>,
52 consumer: usize,
53) -> Result<(
54 BlockingFanoutProducer<N>,
55 BlockingFanoutRouter<N>,
56 BlockingFanoutConsumer<N>,
57)> {
58 indexbus_core::validate_fanout_layout::<N>(shared)?;
59 require_blocking_caps(&shared.header)?;
60
61 let cell = SharedFanoutLayoutCell::from_mut(shared);
62 let (tx, router, rx) = fanout_handles(cell, consumer)?;
63
64 let wake_section =
65 unsafe { &mut *fanout_wake_section_ptr(shared as *mut SharedFanoutLayout<N>) };
66
67 let producer_wake = core::ptr::addr_of_mut!(wake_section.producer_wake);
68 let consumer_wake = if consumer < N {
69 core::ptr::addr_of_mut!(wake_section.consumer_wake[consumer])
70 } else {
71 core::ptr::null_mut()
73 };
74
75 Ok((
76 BlockingFanoutProducer {
77 inner: tx,
78 wake: producer_wake,
79 },
80 BlockingFanoutRouter {
81 inner: router,
82 wake_section,
83 },
84 BlockingFanoutConsumer {
85 inner: rx,
86 wake: consumer_wake,
87 },
88 ))
89}
90
91pub fn fanout_router_ref_blocking<const N: usize>(
95 router: &FanoutRouter<N>,
96) -> Result<BlockingFanoutRouterRef<'_, N>> {
97 let shared = router.as_ptr();
98 indexbus_core::validate_fanout_layout::<N>(unsafe { &*shared })?;
99 require_blocking_caps(&unsafe { &*shared }.header)?;
100
101 let wake_section = unsafe { &mut *fanout_wake_section_ptr(shared) };
102
103 Ok(BlockingFanoutRouterRef {
104 router,
105 wake_section,
106 })
107}
108
109impl<const N: usize> BlockingFanoutProducer<N> {
110 #[inline]
111 pub fn publish(&self, data: &[u8]) -> core::result::Result<(), indexbus_core::Error> {
113 self.inner.publish(data)?;
114
115 unsafe { indexbus_wake::wake_one(wake_cell_seq_ptr(self.wake)) };
117 Ok(())
118 }
119}
120
121impl<const N: usize> BlockingFanoutRouter<N> {
122 #[inline]
123 pub fn route_once_with_stats(
125 &self,
126 source: RouterSource,
127 mode: RouterMode,
128 ) -> indexbus_core::RouteOnceResult {
129 let res = self.inner.route_once_with_stats(source, mode);
130
131 if res.delivered > 0 {
133 for i in 0..N {
134 let cell =
135 unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[i]) };
136 unsafe { indexbus_wake::wake_all(wake_cell_seq_ptr(cell)) };
137 }
138 }
139
140 res
141 }
142
143 #[inline]
144 pub fn as_inner(&self) -> &FanoutRouter<N> {
146 &self.inner
147 }
148
149 #[inline]
150 pub fn producer_seq(&self) -> u32 {
152 let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).producer_wake) };
153 unsafe { atomic_u32_abi(&(*cell).seq) }.load(Ordering::Acquire)
154 }
155
156 pub fn wait_producer_seq_ne(&self, expected: u32, timeout: Option<Duration>) -> Result<bool> {
162 let remaining = timeout;
163 let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).producer_wake) };
164 match unsafe { indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(cell), expected, remaining) } {
165 Ok(()) => Ok(true),
166 Err(indexbus_wake::WaitError::TimedOut) => Ok(false),
167 Err(e) => Err(Error::Wait(e)),
168 }
169 }
170
171 pub fn wait_consumer_seq_ne(
173 &self,
174 consumer: usize,
175 expected: u32,
176 timeout: Option<Duration>,
177 ) -> Result<bool> {
178 if consumer >= N {
179 return Ok(false);
180 }
181
182 let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[consumer]) };
183 match unsafe { indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(cell), expected, timeout) } {
184 Ok(()) => Ok(true),
185 Err(indexbus_wake::WaitError::TimedOut) => Ok(false),
186 Err(e) => Err(Error::Wait(e)),
187 }
188 }
189
190 #[inline]
191 pub fn consumer_seq(&self, consumer: usize) -> Option<u32> {
193 if consumer >= N {
194 return None;
195 }
196 let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[consumer]) };
197 Some(unsafe { atomic_u32_abi(&(*cell).seq) }.load(Ordering::Acquire))
198 }
199}
200
201impl<const N: usize> BlockingFanoutRouterRef<'_, N> {
202 #[inline]
203 pub fn route_once_with_stats(
205 &self,
206 source: RouterSource,
207 mode: RouterMode,
208 ) -> indexbus_core::RouteOnceResult {
209 let res = self.router.route_once_with_stats(source, mode);
210
211 if res.delivered > 0 {
212 for i in 0..N {
213 let cell =
214 unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[i]) };
215 unsafe { indexbus_wake::wake_all(wake_cell_seq_ptr(cell)) };
216 }
217 }
218
219 res
220 }
221
222 #[inline]
223 pub fn producer_seq(&self) -> u32 {
225 let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).producer_wake) };
226 unsafe { atomic_u32_abi(&(*cell).seq) }.load(Ordering::Acquire)
227 }
228
229 #[inline]
230 pub fn consumer_seq(&self, consumer: usize) -> Option<u32> {
232 if consumer >= N {
233 return None;
234 }
235 let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[consumer]) };
236 Some(unsafe { atomic_u32_abi(&(*cell).seq) }.load(Ordering::Acquire))
237 }
238
239 pub fn wait_producer_seq_ne(&self, expected: u32, timeout: Option<Duration>) -> Result<bool> {
241 let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).producer_wake) };
242 match unsafe { indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(cell), expected, timeout) } {
243 Ok(()) => Ok(true),
244 Err(indexbus_wake::WaitError::TimedOut) => Ok(false),
245 Err(e) => Err(Error::Wait(e)),
246 }
247 }
248
249 pub fn wait_consumer_seq_ne(
251 &self,
252 consumer: usize,
253 expected: u32,
254 timeout: Option<Duration>,
255 ) -> Result<bool> {
256 if consumer >= N {
257 return Ok(false);
258 }
259
260 let cell = unsafe { core::ptr::addr_of_mut!((*self.wake_section).consumer_wake[consumer]) };
261 match unsafe { indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(cell), expected, timeout) } {
262 Ok(()) => Ok(true),
263 Err(indexbus_wake::WaitError::TimedOut) => Ok(false),
264 Err(e) => Err(Error::Wait(e)),
265 }
266 }
267}
268
269impl<const N: usize> BlockingFanoutConsumer<N> {
270 #[inline]
271 pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>> {
273 let n = self.inner.try_recv_into(out)?;
274 if n.is_some() && !self.wake.is_null() {
275 unsafe { indexbus_wake::wake_one(wake_cell_seq_ptr(self.wake)) };
277 }
278 Ok(n)
279 }
280
281 pub fn recv_blocking_into(
286 &self,
287 out: &mut [u8],
288 timeout: Option<Duration>,
289 ) -> Result<Option<usize>> {
290 if self.wake.is_null() {
291 return Ok(None);
292 }
293
294 let deadline = timeout.map(|t| Instant::now() + t);
295 loop {
296 match self.try_recv_into(out) {
297 Ok(Some(n)) => return Ok(Some(n)),
298 Ok(None) => {}
299 Err(e) => return Err(e),
300 }
301
302 let expected = unsafe { atomic_u32_abi(&(*self.wake).seq) }.load(Ordering::Acquire);
303
304 if let Ok(Some(n)) = self.try_recv_into(out) {
306 return Ok(Some(n));
307 }
308
309 let remaining = deadline.map(|d| d.saturating_duration_since(Instant::now()));
310 if let Some(r) = remaining {
311 if r.is_zero() {
312 return Ok(None);
313 }
314 }
315
316 match unsafe {
317 indexbus_wake::wait_u32_eq(wake_cell_seq_ptr(self.wake), expected, remaining)
318 } {
319 Ok(()) => {}
320 Err(indexbus_wake::WaitError::TimedOut) => return Ok(None),
321 Err(e) => return Err(Error::Wait(e)),
322 }
323 }
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use std::alloc::{alloc_zeroed, dealloc, Layout};
331 use std::sync::mpsc;
332 use std::thread;
333
334 #[test]
335 fn fanout_blocking_roundtrip() {
336 const N: usize = 4;
337
338 let mut base_layout: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
339 indexbus_core::init_shared_fanout_layout::<N>(&mut base_layout);
340
341 let base = core::mem::size_of::<SharedFanoutLayout<N>>();
342 let wake_offset = base.div_ceil(64) * 64;
343 let total = wake_offset + core::mem::size_of::<FanoutWakeSection<N>>();
344
345 let layout = Layout::from_size_align(total, 64).unwrap();
346 let buf_ptr = unsafe { alloc_zeroed(layout) };
347 assert!(!buf_ptr.is_null());
348 assert_eq!((buf_ptr as usize) % 64, 0);
349
350 unsafe {
351 core::ptr::copy_nonoverlapping(
352 (&*base_layout as *const SharedFanoutLayout<N>) as *const u8,
353 buf_ptr,
354 base,
355 );
356 }
357
358 let shared_ptr = buf_ptr as *mut SharedFanoutLayout<N>;
359 unsafe {
360 (*shared_ptr).header.capabilities |= indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING;
361 (*shared_ptr).header.layout_bytes = total as u32;
362 }
363
364 let shared_ref = unsafe { &mut *shared_ptr };
365 let (tx, router, rx) = fanout_handles_blocking::<N>(shared_ref, 0).unwrap();
366
367 let t = thread::spawn(move || {
368 let mut out = [0u8; 256];
369 let n = rx
370 .recv_blocking_into(&mut out, Some(Duration::from_secs(1)))
371 .unwrap()
372 .unwrap();
373 assert_eq!(&out[..n], b"hi");
374 });
375
376 tx.publish(b"hi").unwrap();
377
378 let mut routed = false;
380 for _ in 0..10_000 {
381 let res = router.route_once_with_stats(RouterSource::Spsc, RouterMode::Broadcast);
382 if res.routed {
383 routed = true;
384 break;
385 }
386 std::thread::yield_now();
387 }
388 assert!(routed);
389
390 t.join().unwrap();
391
392 unsafe { dealloc(buf_ptr, layout) };
393 }
394
395 #[test]
396 fn router_waits_when_idle_then_wakes_on_publish() {
397 const N: usize = 1;
398
399 let mut base_layout: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
400 indexbus_core::init_shared_fanout_layout::<N>(&mut base_layout);
401
402 let base = core::mem::size_of::<SharedFanoutLayout<N>>();
403 let wake_offset = base.div_ceil(64) * 64;
404 let total = wake_offset + core::mem::size_of::<FanoutWakeSection<N>>();
405
406 let layout = Layout::from_size_align(total, 64).unwrap();
407 let buf_ptr = unsafe { alloc_zeroed(layout) };
408 assert!(!buf_ptr.is_null());
409 assert_eq!((buf_ptr as usize) % 64, 0);
410
411 unsafe {
412 core::ptr::copy_nonoverlapping(
413 (&*base_layout as *const SharedFanoutLayout<N>) as *const u8,
414 buf_ptr,
415 base,
416 );
417 }
418
419 let shared_ptr = buf_ptr as *mut SharedFanoutLayout<N>;
420 unsafe {
421 (*shared_ptr).header.capabilities |= indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING;
422 (*shared_ptr).header.layout_bytes = total as u32;
423 }
424
425 let shared_ref = unsafe { &mut *shared_ptr };
426 let (tx, router, _rx) = fanout_handles_blocking::<N>(shared_ref, 0).unwrap();
427
428 let expected = router.producer_seq();
429 assert!(!router
430 .wait_producer_seq_ne(expected, Some(Duration::from_millis(10)))
431 .unwrap());
432
433 let (ready_tx, ready_rx) = mpsc::channel::<()>();
434 let (done_tx, done_rx) = mpsc::channel::<bool>();
435 let waiter = thread::spawn(move || {
436 ready_tx.send(()).unwrap();
437 let woke = router
438 .wait_producer_seq_ne(expected, Some(Duration::from_secs(1)))
439 .unwrap();
440 done_tx.send(woke).unwrap();
441 });
442
443 ready_rx.recv().unwrap();
444 tx.publish(b"ping").unwrap();
445
446 assert!(done_rx.recv().unwrap());
447 waiter.join().unwrap();
448
449 unsafe { dealloc(buf_ptr, layout) };
450 }
451
452 #[test]
453 fn router_waits_for_consumer_pressure_and_wakes_on_recv() {
454 const N: usize = 1;
455
456 let mut base_layout: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
457 indexbus_core::init_shared_fanout_layout::<N>(&mut base_layout);
458
459 let base = core::mem::size_of::<SharedFanoutLayout<N>>();
460 let wake_offset = base.div_ceil(64) * 64;
461 let total = wake_offset + core::mem::size_of::<FanoutWakeSection<N>>();
462
463 let layout = Layout::from_size_align(total, 64).unwrap();
464 let buf_ptr = unsafe { alloc_zeroed(layout) };
465 assert!(!buf_ptr.is_null());
466 assert_eq!((buf_ptr as usize) % 64, 0);
467
468 unsafe {
469 core::ptr::copy_nonoverlapping(
470 (&*base_layout as *const SharedFanoutLayout<N>) as *const u8,
471 buf_ptr,
472 base,
473 );
474 }
475
476 let shared_ptr = buf_ptr as *mut SharedFanoutLayout<N>;
477 unsafe {
478 (*shared_ptr).header.capabilities |= indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING;
479 (*shared_ptr).header.layout_bytes = total as u32;
480 }
481
482 let shared_ref = unsafe { &mut *shared_ptr };
483 let (tx, router, rx) = fanout_handles_blocking::<N>(shared_ref, 0).unwrap();
484
485 let mut saw_full = false;
488 for _ in 0..1_000_000 {
489 match tx.publish(b"x") {
490 Ok(()) => {
491 let _ = router.route_once_with_stats(RouterSource::Spsc, RouterMode::WorkQueue);
492 }
493 Err(indexbus_core::Error::Full) => {
494 saw_full = true;
495 break;
496 }
497 Err(e) => panic!("unexpected publish error: {e:?}"),
498 }
499 }
500 assert!(
501 saw_full,
502 "did not reach a full producer pool / destination pressure state"
503 );
504
505 let expected = router.consumer_seq(0).unwrap();
506 let (ready_tx, ready_rx) = mpsc::channel::<()>();
507 let (done_tx, done_rx) = mpsc::channel::<bool>();
508
509 let waiter = thread::spawn(move || {
510 ready_tx.send(()).unwrap();
511 let woke = router
512 .wait_consumer_seq_ne(0, expected, Some(Duration::from_secs(1)))
513 .unwrap();
514 done_tx.send(woke).unwrap();
515 });
516
517 ready_rx.recv().unwrap();
518
519 let mut out = [0u8; 64];
521 assert!(rx.try_recv_into(&mut out).unwrap().is_some());
522
523 assert!(done_rx.recv().unwrap());
524 waiter.join().unwrap();
525
526 unsafe { dealloc(buf_ptr, layout) };
527 }
528}