1use core::cell::UnsafeCell;
2use core::sync::atomic::Ordering;
3
4use indexbus_abi::layouts::{SharedLayout, SlotPoolLayout};
5use indexbus_abi::{caps, flags, LayoutHeader};
6
7use crate::internal;
8use crate::{Error, PublishWithError, RecvWithError, INDEXBUS_SLOT_DATA_SIZE};
9
10#[repr(transparent)]
18pub struct SharedLayoutCell(UnsafeCell<SharedLayout>);
19
20impl SharedLayoutCell {
21 #[inline]
23 pub fn from_mut(shared: &mut SharedLayout) -> &SharedLayoutCell {
24 unsafe { &*(shared as *mut SharedLayout as *const SharedLayoutCell) }
27 }
28
29 #[inline]
34 pub unsafe fn from_ptr<'a>(ptr: *mut SharedLayout) -> &'a SharedLayoutCell {
35 unsafe { &*(ptr as *const SharedLayoutCell) }
36 }
37
38 #[inline]
39 fn as_ptr(&self) -> *mut SharedLayout {
40 self.0.get()
41 }
42}
43
44pub struct SpscSender {
51 shared: *mut SharedLayout,
52}
53
54pub struct SpscReceiver {
60 shared: *mut SharedLayout,
61}
62
63pub struct MpscProducer {
70 shared: *mut SharedLayout,
71}
72
73pub struct MpscConsumer {
79 shared: *mut SharedLayout,
80}
81
82#[derive(Debug)]
92pub struct EventsSlot {
93 pub(crate) pool: *mut SlotPoolLayout,
94 pub(crate) slot: u32,
95 pub(crate) len: u32,
96 pub(crate) released: bool,
97}
98
99unsafe impl Send for EventsSlot {}
100
101impl EventsSlot {
102 #[inline]
103 pub fn as_bytes(&self) -> &[u8] {
105 let s = unsafe { &*core::ptr::addr_of!((*self.pool).slots[self.slot as usize]) };
106 let len = (self.len as usize).min(INDEXBUS_SLOT_DATA_SIZE);
107 &s.data[..len]
108 }
109
110 #[inline]
111 pub fn len(&self) -> usize {
113 self.len as usize
114 }
115
116 #[inline]
117 pub fn is_empty(&self) -> bool {
119 self.len == 0
120 }
121}
122
123impl AsRef<[u8]> for EventsSlot {
124 #[inline]
125 fn as_ref(&self) -> &[u8] {
126 self.as_bytes()
127 }
128}
129
130impl Drop for EventsSlot {
131 fn drop(&mut self) {
132 if self.released {
133 return;
134 }
135 unsafe { internal::events::pool_free_ptr(self.pool, self.slot) };
136 }
137}
138
139#[derive(Debug)]
141pub struct PublishSlotError {
142 pub error: Error,
144 pub slot: EventsSlot,
146}
147
148unsafe impl Send for SpscSender {}
155unsafe impl Send for SpscReceiver {}
156unsafe impl Send for MpscProducer {}
157unsafe impl Sync for MpscProducer {}
158unsafe impl Send for MpscConsumer {}
159
160pub fn init_shared_layout_with_layout_bytes(
168 shared: &mut SharedLayout,
169 capabilities: u32,
170 layout_bytes: u32,
171) {
172 let initialized = unsafe { internal::atomics::atomic_u32(&shared.initialized) };
173 initialized.store(1, Ordering::Release);
174
175 shared.header = LayoutHeader::new_v1_with_layout_bytes(
177 capabilities,
178 flags::INDEXBUS_REGION_KIND_EVENTS,
179 layout_bytes,
180 );
181
182 unsafe {
183 internal::events::pool_init_once(core::ptr::addr_of_mut!(shared.slot_pool));
184 internal::events::index_queue_init(core::ptr::addr_of_mut!(shared.queue));
185 internal::events::mpsc_queue_init(core::ptr::addr_of_mut!(shared.mpsc_queue));
186 }
187
188 initialized.store(2, Ordering::Release);
189}
190
191pub fn init_shared_layout(shared: &mut SharedLayout) {
197 init_shared_layout_with_layout_bytes(
198 shared,
199 caps::INDEXBUS_CAP_SUPPORTS_EVENTS,
200 core::mem::size_of::<SharedLayout>() as u32,
201 )
202}
203
204pub fn split_spsc(shared: &SharedLayoutCell) -> Result<(SpscSender, SpscReceiver), Error> {
211 internal::validate::validate_shared_layout(unsafe { &*shared.as_ptr() })?;
213
214 let ptr = shared.as_ptr();
215 Ok((SpscSender { shared: ptr }, SpscReceiver { shared: ptr }))
216}
217
218pub fn split_mpsc(shared: &SharedLayoutCell) -> Result<(MpscProducer, MpscConsumer), Error> {
225 internal::validate::validate_shared_layout(unsafe { &*shared.as_ptr() })?;
226
227 let ptr = shared.as_ptr();
228 Ok((MpscProducer { shared: ptr }, MpscConsumer { shared: ptr }))
229}
230
231impl SpscSender {
232 #[inline]
233 pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
240 if data.len() > INDEXBUS_SLOT_DATA_SIZE {
241 return Err(Error::TooLarge {
242 max: INDEXBUS_SLOT_DATA_SIZE,
243 len: data.len(),
244 });
245 }
246
247 let slot =
248 unsafe { internal::events::pool_alloc_from_shared(self.shared) }.ok_or(Error::Full)?;
249
250 unsafe {
251 internal::events::slot_write_from_shared(self.shared, slot, data);
252 internal::events::spsc_queue_push_from_shared(self.shared, slot).map_err(|_| {
253 internal::events::pool_free_from_shared(self.shared, slot);
254 Error::Full
255 })?;
256 }
257
258 Ok(())
259 }
260
261 #[inline]
273 pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
274 where
275 F: FnOnce(&mut [u8]) -> Result<usize, E>,
276 {
277 let slot = unsafe { internal::events::pool_alloc_from_shared(self.shared) }
278 .ok_or(PublishWithError::Core(Error::Full))?;
279
280 let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
281 let s = unsafe { &mut *core::ptr::addr_of_mut!((*pool).slots[slot as usize]) };
282
283 let len = match f(&mut s.data[..]) {
284 Ok(n) => n,
285 Err(e) => {
286 unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
287 return Err(PublishWithError::Encode(e));
288 }
289 };
290
291 if len > INDEXBUS_SLOT_DATA_SIZE {
292 unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
293 return Err(PublishWithError::Core(Error::TooLarge {
294 max: INDEXBUS_SLOT_DATA_SIZE,
295 len,
296 }));
297 }
298
299 s.len = len as u32;
300 unsafe {
301 internal::events::spsc_queue_push_from_shared(self.shared, slot).map_err(|_| {
302 internal::events::pool_free_from_shared(self.shared, slot);
303 PublishWithError::Core(Error::Full)
304 })?;
305 }
306 Ok(())
307 }
308
309 #[inline]
313 pub fn try_publish_slot(&self, mut slot: EventsSlot) -> Result<(), PublishSlotError> {
314 let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
315 if slot.pool != pool {
316 return Err(PublishSlotError {
317 error: Error::IncompatibleLayout,
318 slot,
319 });
320 }
321
322 let pushed = unsafe {
323 internal::events::spsc_queue_push_from_shared(self.shared, slot.slot).is_ok()
324 };
325 if !pushed {
326 return Err(PublishSlotError {
327 error: Error::Full,
328 slot,
329 });
330 }
331
332 slot.released = true;
333 Ok(())
334 }
335
336 #[inline]
338 pub fn publish_slot(&self, slot: EventsSlot) -> Result<(), Error> {
339 match self.try_publish_slot(slot) {
340 Ok(()) => Ok(()),
341 Err(e) => Err(e.error),
342 }
343 }
344}
345
346impl SpscReceiver {
347 #[inline]
351 pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, Error> {
352 let Some(slot) = (unsafe { internal::events::spsc_queue_pop_from_shared(self.shared) })
353 else {
354 return Ok(None);
355 };
356
357 unsafe {
358 let len =
359 internal::events::slot_read_into_and_free_from_shared(self.shared, slot, out)?;
360 Ok(Some(len))
361 }
362 }
363
364 #[inline]
375 pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, RecvWithError<E>>
376 where
377 F: FnOnce(&[u8]) -> Result<R, E>,
378 {
379 let Some(slot) = (unsafe { internal::events::spsc_queue_pop_from_shared(self.shared) })
380 else {
381 return Ok(None);
382 };
383
384 let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
385 let s = unsafe { &*core::ptr::addr_of!((*pool).slots[slot as usize]) };
386 let len = s.len as usize;
387
388 if len > INDEXBUS_SLOT_DATA_SIZE {
389 unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
390 return Err(RecvWithError::Core(Error::IncompatibleLayout));
391 }
392
393 let result = f(&s.data[..len]).map_err(RecvWithError::Decode);
394 unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
395 result.map(Some)
396 }
397
398 #[inline]
403 pub fn try_recv_slot(&self) -> Result<Option<EventsSlot>, Error> {
404 let Some(slot) = (unsafe { internal::events::spsc_queue_pop_from_shared(self.shared) })
405 else {
406 return Ok(None);
407 };
408
409 let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
410 let s = unsafe { &*core::ptr::addr_of!((*pool).slots[slot as usize]) };
411 let len = s.len as usize;
412 if len > INDEXBUS_SLOT_DATA_SIZE {
413 unsafe { internal::events::pool_free_ptr(pool, slot) };
414 return Err(Error::IncompatibleLayout);
415 }
416
417 Ok(Some(EventsSlot {
418 pool,
419 slot,
420 len: len as u32,
421 released: false,
422 }))
423 }
424}
425
426impl MpscProducer {
427 #[inline]
428 pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
435 if data.len() > INDEXBUS_SLOT_DATA_SIZE {
436 return Err(Error::TooLarge {
437 max: INDEXBUS_SLOT_DATA_SIZE,
438 len: data.len(),
439 });
440 }
441
442 let slot =
443 unsafe { internal::events::pool_alloc_from_shared(self.shared) }.ok_or(Error::Full)?;
444 unsafe {
445 internal::events::slot_write_from_shared(self.shared, slot, data);
446 internal::events::mpsc_queue_push_from_shared(self.shared, slot).map_err(|_| {
447 internal::events::pool_free_from_shared(self.shared, slot);
448 Error::Full
449 })?;
450 }
451
452 Ok(())
453 }
454
455 #[inline]
465 pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
466 where
467 F: FnOnce(&mut [u8]) -> Result<usize, E>,
468 {
469 let slot = unsafe { internal::events::pool_alloc_from_shared(self.shared) }
470 .ok_or(PublishWithError::Core(Error::Full))?;
471
472 let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
473 let s = unsafe { &mut *core::ptr::addr_of_mut!((*pool).slots[slot as usize]) };
474
475 let len = match f(&mut s.data[..]) {
476 Ok(n) => n,
477 Err(e) => {
478 unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
479 return Err(PublishWithError::Encode(e));
480 }
481 };
482
483 if len > INDEXBUS_SLOT_DATA_SIZE {
484 unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
485 return Err(PublishWithError::Core(Error::TooLarge {
486 max: INDEXBUS_SLOT_DATA_SIZE,
487 len,
488 }));
489 }
490
491 s.len = len as u32;
492 unsafe {
493 internal::events::mpsc_queue_push_from_shared(self.shared, slot).map_err(|_| {
494 internal::events::pool_free_from_shared(self.shared, slot);
495 PublishWithError::Core(Error::Full)
496 })?;
497 }
498 Ok(())
499 }
500
501 #[inline]
505 pub fn try_publish_slot(&self, mut slot: EventsSlot) -> Result<(), PublishSlotError> {
506 let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
507 if slot.pool != pool {
508 return Err(PublishSlotError {
509 error: Error::IncompatibleLayout,
510 slot,
511 });
512 }
513
514 let pushed = unsafe {
515 internal::events::mpsc_queue_push_from_shared(self.shared, slot.slot).is_ok()
516 };
517 if !pushed {
518 return Err(PublishSlotError {
519 error: Error::Full,
520 slot,
521 });
522 }
523
524 slot.released = true;
525 Ok(())
526 }
527
528 #[inline]
530 pub fn publish_slot(&self, slot: EventsSlot) -> Result<(), Error> {
531 match self.try_publish_slot(slot) {
532 Ok(()) => Ok(()),
533 Err(e) => Err(e.error),
534 }
535 }
536}
537
538impl MpscConsumer {
539 #[inline]
543 pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, Error> {
544 let Some(slot) = (unsafe { internal::events::mpsc_queue_pop_from_shared(self.shared) })
545 else {
546 return Ok(None);
547 };
548
549 unsafe {
550 let len =
551 internal::events::slot_read_into_and_free_from_shared(self.shared, slot, out)?;
552 Ok(Some(len))
553 }
554 }
555
556 #[inline]
561 pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, RecvWithError<E>>
562 where
563 F: FnOnce(&[u8]) -> Result<R, E>,
564 {
565 let Some(slot) = (unsafe { internal::events::mpsc_queue_pop_from_shared(self.shared) })
566 else {
567 return Ok(None);
568 };
569
570 let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
571 let s = unsafe { &*core::ptr::addr_of!((*pool).slots[slot as usize]) };
572 let len = s.len as usize;
573
574 if len > INDEXBUS_SLOT_DATA_SIZE {
575 unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
576 return Err(RecvWithError::Core(Error::IncompatibleLayout));
577 }
578
579 let result = f(&s.data[..len]).map_err(RecvWithError::Decode);
580 unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
581 result.map(Some)
582 }
583
584 #[inline]
589 pub fn try_recv_slot(&self) -> Result<Option<EventsSlot>, Error> {
590 let Some(slot) = (unsafe { internal::events::mpsc_queue_pop_from_shared(self.shared) })
591 else {
592 return Ok(None);
593 };
594
595 let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
596 let s = unsafe { &*core::ptr::addr_of!((*pool).slots[slot as usize]) };
597 let len = s.len as usize;
598 if len > INDEXBUS_SLOT_DATA_SIZE {
599 unsafe { internal::events::pool_free_ptr(pool, slot) };
600 return Err(Error::IncompatibleLayout);
601 }
602
603 Ok(Some(EventsSlot {
604 pool,
605 slot,
606 len: len as u32,
607 released: false,
608 }))
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 extern crate alloc;
615
616 use super::*;
617 use alloc::boxed::Box;
618 use indexbus_abi::layouts::SharedLayout;
619
620 #[test]
621 fn mpsc_two_producers_single_consumer_smoke() {
622 let mut shared: Box<SharedLayout> = Box::new(unsafe { core::mem::zeroed() });
623 init_shared_layout(&mut shared);
624
625 let cell = SharedLayoutCell::from_mut(&mut shared);
626 let (p1, consumer) = split_mpsc(cell).unwrap();
627 let (p2, _consumer2) = split_mpsc(cell).unwrap();
628
629 p1.publish(b"a").unwrap();
630 p2.publish(b"b").unwrap();
631
632 let mut out = [0u8; 256];
633 let n1 = consumer.try_recv_into(&mut out).unwrap().unwrap();
634 assert_eq!(&out[..n1], b"a");
635
636 let n2 = consumer.try_recv_into(&mut out).unwrap().unwrap();
637 assert_eq!(&out[..n2], b"b");
638
639 assert_eq!(consumer.try_recv_into(&mut out).unwrap(), None);
640 }
641
642 #[test]
643 fn slot_passing_spsc_to_mpsc_roundtrip() {
644 let mut shared: Box<SharedLayout> = Box::new(unsafe { core::mem::zeroed() });
645 init_shared_layout(&mut shared);
646
647 let cell = SharedLayoutCell::from_mut(&mut shared);
648 let (spsc_tx, spsc_rx) = split_spsc(cell).unwrap();
649 let (mpsc_tx, mpsc_rx) = split_mpsc(cell).unwrap();
650
651 spsc_tx.publish(b"hello").unwrap();
652 let slot = spsc_rx.try_recv_slot().unwrap().unwrap();
653 assert_eq!(slot.as_bytes(), b"hello");
654
655 mpsc_tx.try_publish_slot(slot).unwrap();
656 let slot2 = mpsc_rx.try_recv_slot().unwrap().unwrap();
657 assert_eq!(slot2.as_bytes(), b"hello");
658 }
659}