1use core::sync::atomic::Ordering;
2
3use core::cell::UnsafeCell;
4
5use indexbus_abi::layouts::SharedFanoutLayout;
6use indexbus_abi::{caps, flags, LayoutHeader};
7
8use crate::internal;
9use crate::{Error, PublishWithError, RecvWithError, INDEXBUS_SLOT_DATA_SIZE};
10
11#[repr(transparent)]
19pub struct SharedFanoutLayoutCell<const N: usize>(UnsafeCell<SharedFanoutLayout<N>>);
20
21impl<const N: usize> SharedFanoutLayoutCell<N> {
22 #[inline]
24 pub fn from_mut(shared: &mut SharedFanoutLayout<N>) -> &SharedFanoutLayoutCell<N> {
25 unsafe { &*(shared as *mut SharedFanoutLayout<N> as *const SharedFanoutLayoutCell<N>) }
26 }
27
28 #[inline]
33 pub unsafe fn from_ptr<'a>(ptr: *mut SharedFanoutLayout<N>) -> &'a SharedFanoutLayoutCell<N> {
34 unsafe { &*(ptr as *const SharedFanoutLayoutCell<N>) }
35 }
36
37 #[inline]
38 fn as_ptr(&self) -> *mut SharedFanoutLayout<N> {
39 self.0.get()
40 }
41}
42
43pub struct FanoutProducer<const N: usize> {
51 shared: *mut SharedFanoutLayout<N>,
52}
53
54pub struct FanoutMpscProducer<const N: usize> {
58 shared: *mut SharedFanoutLayout<N>,
59}
60
61pub struct FanoutRouter<const N: usize> {
65 shared: *mut SharedFanoutLayout<N>,
66}
67
68pub struct FanoutConsumer<const N: usize> {
73 shared: *mut SharedFanoutLayout<N>,
74 consumer: usize,
75}
76
77unsafe impl<const N: usize> Send for FanoutProducer<N> {}
84unsafe impl<const N: usize> Send for FanoutMpscProducer<N> {}
85unsafe impl<const N: usize> Sync for FanoutMpscProducer<N> {}
86unsafe impl<const N: usize> Send for FanoutRouter<N> {}
87unsafe impl<const N: usize> Send for FanoutConsumer<N> {}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum RouterSource {
92 Spsc,
94 Mpsc,
96}
97
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub enum RouterMode {
101 Broadcast,
103 WorkQueue,
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub struct RouteOnceResult {
110 pub routed: bool,
112 pub delivered: u32,
114 pub dropped: u32,
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub struct RouteOnceCreditResult {
131 pub routed: bool,
133
134 pub delivered: u32,
136
137 pub dropped_full: u32,
139
140 pub dropped_no_credit: u32,
142
143 pub delivered_mask: u64,
145
146 pub dropped_full_mask: u64,
148
149 pub dropped_no_credit_mask: u64,
151}
152
153pub fn init_shared_fanout_layout_with_layout_bytes<const N: usize>(
161 shared: &mut SharedFanoutLayout<N>,
162 capabilities: u32,
163 layout_bytes: u32,
164) {
165 let initialized = unsafe { internal::atomics::atomic_u32(&shared.initialized) };
166 initialized.store(1, Ordering::Release);
167
168 shared.header = LayoutHeader::new_v1_with_layout_bytes(
169 capabilities,
170 flags::INDEXBUS_REGION_KIND_FANOUT,
171 layout_bytes,
172 );
173
174 unsafe {
175 internal::events::pool_init_once(core::ptr::addr_of_mut!(shared.slot_pool));
176 internal::events::index_queue_init(core::ptr::addr_of_mut!(shared.producer_queue));
177 internal::events::mpsc_queue_init(core::ptr::addr_of_mut!(shared.producer_queue_mpsc));
178 internal::atomics::atomic_u32(&shared.router_rr).store(0, Ordering::Release);
179 for q in &mut shared.consumer_queues {
180 internal::events::index_queue_init(q as *mut _);
181 }
182 }
183
184 initialized.store(2, Ordering::Release);
185}
186
187pub fn init_shared_fanout_layout<const N: usize>(shared: &mut SharedFanoutLayout<N>) {
193 init_shared_fanout_layout_with_layout_bytes::<N>(
194 shared,
195 caps::INDEXBUS_CAP_SUPPORTS_EVENTS | caps::INDEXBUS_CAP_SUPPORTS_FANOUT,
196 core::mem::size_of::<SharedFanoutLayout<N>>() as u32,
197 )
198}
199
200pub fn fanout_handles<const N: usize>(
207 shared: &SharedFanoutLayoutCell<N>,
208 consumer: usize,
209) -> Result<(FanoutProducer<N>, FanoutRouter<N>, FanoutConsumer<N>), Error> {
210 let ptr = shared.as_ptr();
211 internal::validate::validate_fanout_layout::<N>(unsafe { &*ptr })?;
212 Ok((
213 FanoutProducer { shared: ptr },
214 FanoutRouter { shared: ptr },
215 FanoutConsumer {
216 shared: ptr,
217 consumer,
218 },
219 ))
220}
221
222pub fn fanout_mpsc_producer<const N: usize>(
229 shared: &SharedFanoutLayoutCell<N>,
230) -> Result<FanoutMpscProducer<N>, Error> {
231 let ptr = shared.as_ptr();
232 internal::validate::validate_fanout_layout::<N>(unsafe { &*ptr })?;
233 Ok(FanoutMpscProducer { shared: ptr })
234}
235
236impl<const N: usize> FanoutProducer<N> {
237 #[inline]
238 pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
245 if data.len() > INDEXBUS_SLOT_DATA_SIZE {
246 return Err(Error::TooLarge {
247 max: INDEXBUS_SLOT_DATA_SIZE,
248 len: data.len(),
249 });
250 }
251
252 let slot = unsafe { internal::fanout::pool_alloc(self.shared) }.ok_or(Error::Full)?;
253 unsafe {
254 internal::fanout::slot_write(self.shared, slot, data);
255 internal::fanout::producer_queue_push_spsc(self.shared, slot).map_err(|_| {
256 internal::fanout::pool_free(self.shared, slot);
257 Error::Full
258 })?;
259 }
260 Ok(())
261 }
262
263 #[inline]
273 pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
274 where
275 F: FnOnce(&mut [u8]) -> Result<usize, E>,
276 {
277 let slot = unsafe { internal::fanout::pool_alloc(self.shared) }
278 .ok_or(PublishWithError::Core(Error::Full))?;
279
280 let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
281 let s = unsafe { core::ptr::addr_of_mut!((*pool).slots[slot as usize]) };
282
283 unsafe { internal::atomics::atomic_u32(&(*s).refcnt) }.store(0, Ordering::Relaxed);
285
286 let data_ptr = unsafe { core::ptr::addr_of_mut!((*s).data) as *mut u8 };
287 let buf = unsafe { core::slice::from_raw_parts_mut(data_ptr, INDEXBUS_SLOT_DATA_SIZE) };
288
289 let len = match f(buf) {
290 Ok(n) => n,
291 Err(e) => {
292 unsafe { internal::fanout::pool_free(self.shared, slot) };
293 return Err(PublishWithError::Encode(e));
294 }
295 };
296
297 if len > INDEXBUS_SLOT_DATA_SIZE {
298 unsafe { internal::fanout::pool_free(self.shared, slot) };
299 return Err(PublishWithError::Core(Error::TooLarge {
300 max: INDEXBUS_SLOT_DATA_SIZE,
301 len,
302 }));
303 }
304
305 unsafe { core::ptr::addr_of_mut!((*s).len).write(len as u32) };
306 unsafe {
307 internal::fanout::producer_queue_push_spsc(self.shared, slot).map_err(|_| {
308 internal::fanout::pool_free(self.shared, slot);
309 PublishWithError::Core(Error::Full)
310 })?;
311 }
312 Ok(())
313 }
314}
315
316impl<const N: usize> FanoutMpscProducer<N> {
317 #[inline]
318 pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
325 if data.len() > INDEXBUS_SLOT_DATA_SIZE {
326 return Err(Error::TooLarge {
327 max: INDEXBUS_SLOT_DATA_SIZE,
328 len: data.len(),
329 });
330 }
331
332 let slot = unsafe { internal::fanout::pool_alloc(self.shared) }.ok_or(Error::Full)?;
333 unsafe {
334 internal::fanout::slot_write(self.shared, slot, data);
335 internal::fanout::producer_queue_push_mpsc(self.shared, slot).map_err(|_| {
336 internal::fanout::pool_free(self.shared, slot);
337 Error::Full
338 })?;
339 }
340 Ok(())
341 }
342
343 #[inline]
350 pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
351 where
352 F: FnOnce(&mut [u8]) -> Result<usize, E>,
353 {
354 let slot = unsafe { internal::fanout::pool_alloc(self.shared) }
355 .ok_or(PublishWithError::Core(Error::Full))?;
356
357 let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
358 let s = unsafe { core::ptr::addr_of_mut!((*pool).slots[slot as usize]) };
359
360 unsafe { internal::atomics::atomic_u32(&(*s).refcnt) }.store(0, Ordering::Relaxed);
362
363 let data_ptr = unsafe { core::ptr::addr_of_mut!((*s).data) as *mut u8 };
364 let buf = unsafe { core::slice::from_raw_parts_mut(data_ptr, INDEXBUS_SLOT_DATA_SIZE) };
365
366 let len = match f(buf) {
367 Ok(n) => n,
368 Err(e) => {
369 unsafe { internal::fanout::pool_free(self.shared, slot) };
370 return Err(PublishWithError::Encode(e));
371 }
372 };
373
374 if len > INDEXBUS_SLOT_DATA_SIZE {
375 unsafe { internal::fanout::pool_free(self.shared, slot) };
376 return Err(PublishWithError::Core(Error::TooLarge {
377 max: INDEXBUS_SLOT_DATA_SIZE,
378 len,
379 }));
380 }
381
382 unsafe { core::ptr::addr_of_mut!((*s).len).write(len as u32) };
383 unsafe {
384 internal::fanout::producer_queue_push_mpsc(self.shared, slot).map_err(|_| {
385 internal::fanout::pool_free(self.shared, slot);
386 PublishWithError::Core(Error::Full)
387 })?;
388 }
389 Ok(())
390 }
391}
392
393impl<const N: usize> FanoutRouter<N> {
394 #[inline]
400 pub fn route_once_with(&self, source: RouterSource, mode: RouterMode) -> bool {
401 unsafe { internal::fanout::route_once_with(self.shared, source, mode) }
402 }
403
404 #[inline]
405 pub fn route_once_with_stats(&self, source: RouterSource, mode: RouterMode) -> RouteOnceResult {
407 unsafe { internal::fanout::route_once_with_stats(self.shared, source, mode) }
408 }
409
410 #[inline]
418 pub fn route_once_with_credit_masks(
419 &self,
420 source: RouterSource,
421 mode: RouterMode,
422 eligible_mask: u64,
423 no_credit_mask: u64,
424 full_mask: u64,
425 ) -> RouteOnceCreditResult {
426 unsafe {
427 internal::fanout::route_once_with_credit_masks(
428 self.shared,
429 source,
430 mode,
431 eligible_mask,
432 no_credit_mask,
433 full_mask,
434 )
435 }
436 }
437
438 #[inline]
439 pub fn as_ptr(&self) -> *mut SharedFanoutLayout<N> {
443 self.shared
444 }
445}
446
447impl<const N: usize> FanoutConsumer<N> {
448 #[inline]
449 pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, Error> {
456 if self.consumer >= N {
457 return Ok(None);
458 }
459 unsafe { internal::fanout::consumer_try_recv_into(self.shared, self.consumer, out) }
460 }
461
462 #[inline]
469 pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, RecvWithError<E>>
470 where
471 F: FnOnce(&[u8]) -> Result<R, E>,
472 {
473 if self.consumer >= N {
474 return Ok(None);
475 }
476
477 unsafe { internal::fanout::consumer_try_recv_with(self.shared, self.consumer, f) }
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 extern crate alloc;
484
485 use super::*;
486 use alloc::boxed::Box;
487 use indexbus_abi::layouts::SharedFanoutLayout;
488
489 fn make_layout<const N: usize>() -> Box<SharedFanoutLayout<N>> {
490 let mut shared: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
491 init_shared_fanout_layout::<N>(&mut shared);
492 shared
493 }
494
495 #[test]
496 fn mpsc_source_broadcast_delivers_to_all_consumers() {
497 let mut shared = make_layout::<4>();
498 let cell = SharedFanoutLayoutCell::from_mut(&mut shared);
499
500 let (p1, router, cons0) = fanout_handles::<4>(cell, 0).unwrap();
501 let (_p, _r, cons1) = fanout_handles::<4>(cell, 1).unwrap();
502 let (_p, _r, cons2) = fanout_handles::<4>(cell, 2).unwrap();
503 let (_p, _r, cons3) = fanout_handles::<4>(cell, 3).unwrap();
504
505 let mp1 = fanout_mpsc_producer::<4>(cell).unwrap();
506 let mp2 = fanout_mpsc_producer::<4>(cell).unwrap();
507
508 p1.publish(b"s").unwrap();
510 assert!(router.route_once_with(RouterSource::Spsc, RouterMode::Broadcast));
511 let mut out = [0u8; 256];
512 let n0 = cons0.try_recv_into(&mut out).unwrap().unwrap();
513 assert_eq!(&out[..n0], b"s");
514 let n1 = cons1.try_recv_into(&mut out).unwrap().unwrap();
515 assert_eq!(&out[..n1], b"s");
516 let n2 = cons2.try_recv_into(&mut out).unwrap().unwrap();
517 assert_eq!(&out[..n2], b"s");
518 let n3 = cons3.try_recv_into(&mut out).unwrap().unwrap();
519 assert_eq!(&out[..n3], b"s");
520
521 mp1.publish(b"a").unwrap();
522 let r1 = router.route_once_with_stats(RouterSource::Mpsc, RouterMode::Broadcast);
523 assert!(r1.routed);
524 assert_eq!(r1.delivered, 4);
525
526 mp2.publish(b"b").unwrap();
527 let r2 = router.route_once_with_stats(RouterSource::Mpsc, RouterMode::Broadcast);
528 assert!(r2.routed);
529 assert_eq!(r2.delivered, 4);
530
531 let a0 = cons0.try_recv_into(&mut out).unwrap().unwrap();
532 let a1 = cons1.try_recv_into(&mut out).unwrap().unwrap();
533 let a2 = cons2.try_recv_into(&mut out).unwrap().unwrap();
534 let a3 = cons3.try_recv_into(&mut out).unwrap().unwrap();
535 assert_eq!(&out[..a0], b"a");
536 assert_eq!(&out[..a1], b"a");
537 assert_eq!(&out[..a2], b"a");
538 assert_eq!(&out[..a3], b"a");
539
540 let b0 = cons0.try_recv_into(&mut out).unwrap().unwrap();
541 let b1 = cons1.try_recv_into(&mut out).unwrap().unwrap();
542 let b2 = cons2.try_recv_into(&mut out).unwrap().unwrap();
543 let b3 = cons3.try_recv_into(&mut out).unwrap().unwrap();
544 assert_eq!(&out[..b0], b"b");
545 assert_eq!(&out[..b1], b"b");
546 assert_eq!(&out[..b2], b"b");
547 assert_eq!(&out[..b3], b"b");
548 }
549
550 #[test]
551 fn mpsc_source_workqueue_delivers_to_exactly_one_consumer() {
552 let mut shared = make_layout::<4>();
553 let cell = SharedFanoutLayoutCell::from_mut(&mut shared);
554
555 let (_p, router, cons0) = fanout_handles::<4>(cell, 0).unwrap();
556 let (_p, _r, cons1) = fanout_handles::<4>(cell, 1).unwrap();
557 let (_p, _r, cons2) = fanout_handles::<4>(cell, 2).unwrap();
558 let (_p, _r, cons3) = fanout_handles::<4>(cell, 3).unwrap();
559
560 let mp = fanout_mpsc_producer::<4>(cell).unwrap();
561 mp.publish(b"x").unwrap();
562
563 let r = router.route_once_with_stats(RouterSource::Mpsc, RouterMode::WorkQueue);
564 assert!(r.routed);
565 assert_eq!(r.delivered, 1);
566
567 let mut out = [0u8; 256];
568 let mut got = 0;
569 for cons in [&cons0, &cons1, &cons2, &cons3] {
570 if let Some(n) = cons.try_recv_into(&mut out).unwrap() {
571 assert_eq!(&out[..n], b"x");
572 got += 1;
573 }
574 }
575 assert_eq!(got, 1);
576 }
577}