indexbus_core/
events.rs

1use core::cell::UnsafeCell;
2use core::sync::atomic::Ordering;
3
4use indexbus_abi::layouts::{SharedLayout, SlotPoolLayout};
5use indexbus_abi::{caps, flags, LayoutHeader};
6
7use crate::internal;
8use crate::{Error, PublishWithError, RecvWithError, INDEXBUS_SLOT_DATA_SIZE};
9
10/// `SharedLayout` wrapper that explicitly opts into interior mutability.
11///
12/// The v1 events layout is designed for concurrent mutation through atomics and raw pointers.
13/// Rust requires such mutation through a shared reference to be mediated by [`UnsafeCell`].
14///
15/// This type is `repr(transparent)` over the ABI layout, so it can be used over the same mapped
16/// bytes as `SharedLayout`.
17#[repr(transparent)]
18pub struct SharedLayoutCell(UnsafeCell<SharedLayout>);
19
20impl SharedLayoutCell {
21    /// Borrow a `SharedLayoutCell` view over a uniquely-mutable `SharedLayout`.
22    #[inline]
23    pub fn from_mut(shared: &mut SharedLayout) -> &SharedLayoutCell {
24        // Safety: `SharedLayoutCell` is `repr(transparent)` over `UnsafeCell<SharedLayout>`, and
25        // `UnsafeCell<T>` is `repr(transparent)` over `T`.
26        unsafe { &*(shared as *mut SharedLayout as *const SharedLayoutCell) }
27    }
28
29    /// Borrow a `SharedLayoutCell` view over a raw pointer.
30    ///
31    /// # Safety
32    /// `ptr` must be valid for reads/writes for the lifetime `'a`.
33    #[inline]
34    pub unsafe fn from_ptr<'a>(ptr: *mut SharedLayout) -> &'a SharedLayoutCell {
35        unsafe { &*(ptr as *const SharedLayoutCell) }
36    }
37
38    #[inline]
39    fn as_ptr(&self) -> *mut SharedLayout {
40        self.0.get()
41    }
42}
43
44/// SPSC producer handle for a `SharedLayout` events region.
45///
46/// Semantics (v1):
47/// - at-most-once delivery
48/// - bounded capacity; `publish*` returns `Error::Full` when the region cannot accept more
49/// - a successful `publish*` commits bytes that a receiver will observe on successful receive
50pub struct SpscSender {
51    shared: *mut SharedLayout,
52}
53
54/// SPSC consumer handle for a `SharedLayout` events region.
55///
56/// Semantics (v1):
57/// - nonblocking receive (`Ok(None)` when empty)
58/// - `try_recv_with` borrows slot bytes only for the duration of the callback
59pub struct SpscReceiver {
60    shared: *mut SharedLayout,
61}
62
63/// MPSC producer handle for a `SharedLayout` events region.
64///
65/// Semantics (v1):
66/// - at-most-once delivery
67/// - per-producer ordering preserved; cross-producer ordering not guaranteed
68/// - bounded capacity; `publish*` returns `Error::Full` when saturated
69pub struct MpscProducer {
70    shared: *mut SharedLayout,
71}
72
73/// MPSC consumer handle for a `SharedLayout` events region.
74///
75/// Semantics (v1):
76/// - nonblocking receive (`Ok(None)` when empty)
77/// - `try_recv_with` borrows slot bytes only for the duration of the callback
78pub struct MpscConsumer {
79    shared: *mut SharedLayout,
80}
81
82/// Owned handle to an Events slot (slot index + pool pointer).
83///
84/// This enables *slot passing*: forwarding a message by moving the slot index between queues
85/// without copying payload bytes.
86///
87/// # Safety / lifetime
88///
89/// This type contains a raw pointer to a shared-memory mapping. The caller must ensure the
90/// mapping outlives the slot handle.
91#[derive(Debug)]
92pub struct EventsSlot {
93    pub(crate) pool: *mut SlotPoolLayout,
94    pub(crate) slot: u32,
95    pub(crate) len: u32,
96    pub(crate) released: bool,
97}
98
99unsafe impl Send for EventsSlot {}
100
101impl EventsSlot {
102    #[inline]
103    /// Returns the payload bytes for this slot.
104    pub fn as_bytes(&self) -> &[u8] {
105        let s = unsafe { &*core::ptr::addr_of!((*self.pool).slots[self.slot as usize]) };
106        let len = (self.len as usize).min(INDEXBUS_SLOT_DATA_SIZE);
107        &s.data[..len]
108    }
109
110    #[inline]
111    /// Returns the payload length.
112    pub fn len(&self) -> usize {
113        self.len as usize
114    }
115
116    #[inline]
117    /// Returns true if this slot has no payload bytes.
118    pub fn is_empty(&self) -> bool {
119        self.len == 0
120    }
121}
122
123impl AsRef<[u8]> for EventsSlot {
124    #[inline]
125    fn as_ref(&self) -> &[u8] {
126        self.as_bytes()
127    }
128}
129
130impl Drop for EventsSlot {
131    fn drop(&mut self) {
132        if self.released {
133            return;
134        }
135        unsafe { internal::events::pool_free_ptr(self.pool, self.slot) };
136    }
137}
138
139/// Error returned by `publish_slot` APIs when the slot cannot be forwarded.
140#[derive(Debug)]
141pub struct PublishSlotError {
142    /// Why publishing failed.
143    pub error: Error,
144    /// The original slot, returned to the caller so it can be retried or dropped.
145    pub slot: EventsSlot,
146}
147
148// Thread-safety notes:
149// - Handles are thin wrappers around a raw pointer to a shared-memory mapping. The caller must
150//   ensure the mapped memory outlives all handles.
151// - `SpscSender` / `SpscReceiver` must each be used by a single thread at a time.
152// - `MpscProducer` is safe to share across threads (it uses MPSC algorithms internally), but
153//   `MpscConsumer` must be used by a single thread at a time.
154unsafe impl Send for SpscSender {}
155unsafe impl Send for SpscReceiver {}
156unsafe impl Send for MpscProducer {}
157unsafe impl Sync for MpscProducer {}
158unsafe impl Send for MpscConsumer {}
159
160/// Initialize a shared events layout in-place.
161///
162/// Safe because `&mut` guarantees exclusive access.
163///
164/// `layout_bytes` should reflect the total mapped byte length of the region.
165/// If `capabilities` includes `INDEXBUS_CAP_SUPPORTS_BLOCKING`, the mapping is expected to
166/// include an appended wake section at the next 64B boundary (see `validate_shared_layout`).
167pub fn init_shared_layout_with_layout_bytes(
168    shared: &mut SharedLayout,
169    capabilities: u32,
170    layout_bytes: u32,
171) {
172    let initialized = unsafe { internal::atomics::atomic_u32(&shared.initialized) };
173    initialized.store(1, Ordering::Release);
174
175    // Transport may advertise optional appended sections via caps/layout_bytes.
176    shared.header = LayoutHeader::new_v1_with_layout_bytes(
177        capabilities,
178        flags::INDEXBUS_REGION_KIND_EVENTS,
179        layout_bytes,
180    );
181
182    unsafe {
183        internal::events::pool_init_once(core::ptr::addr_of_mut!(shared.slot_pool));
184        internal::events::index_queue_init(core::ptr::addr_of_mut!(shared.queue));
185        internal::events::mpsc_queue_init(core::ptr::addr_of_mut!(shared.mpsc_queue));
186    }
187
188    initialized.store(2, Ordering::Release);
189}
190
191/// Initialize a shared events layout in-place with default v1 capabilities.
192///
193/// This is a convenience wrapper around `init_shared_layout_with_layout_bytes` using:
194/// - `caps::INDEXBUS_CAP_SUPPORTS_EVENTS`
195/// - `size_of::<SharedLayout>()` as `layout_bytes`
196pub fn init_shared_layout(shared: &mut SharedLayout) {
197    init_shared_layout_with_layout_bytes(
198        shared,
199        caps::INDEXBUS_CAP_SUPPORTS_EVENTS,
200        core::mem::size_of::<SharedLayout>() as u32,
201    )
202}
203
204/// Create SPSC sender/receiver handles over a shared layout.
205///
206/// Returns `Err` if the region is not initialized or is not v1-compatible.
207///
208/// The returned handles borrow the region by raw pointer; the caller must keep the underlying
209/// `SharedLayout` alive (and mapped) for as long as the handles are used.
210pub fn split_spsc(shared: &SharedLayoutCell) -> Result<(SpscSender, SpscReceiver), Error> {
211    // Validation is read-only; take a shared ref to the underlying layout.
212    internal::validate::validate_shared_layout(unsafe { &*shared.as_ptr() })?;
213
214    let ptr = shared.as_ptr();
215    Ok((SpscSender { shared: ptr }, SpscReceiver { shared: ptr }))
216}
217
218/// Create MPSC producer/consumer handles over a shared layout.
219///
220/// Returns `Err` if the region is not initialized or is not v1-compatible.
221///
222/// The returned handles borrow the region by raw pointer; the caller must keep the underlying
223/// `SharedLayout` alive (and mapped) for as long as the handles are used.
224pub fn split_mpsc(shared: &SharedLayoutCell) -> Result<(MpscProducer, MpscConsumer), Error> {
225    internal::validate::validate_shared_layout(unsafe { &*shared.as_ptr() })?;
226
227    let ptr = shared.as_ptr();
228    Ok((MpscProducer { shared: ptr }, MpscConsumer { shared: ptr }))
229}
230
231impl SpscSender {
232    #[inline]
233    /// Publish a byte payload.
234    ///
235    /// Returns:
236    /// - `Ok(())` if the message was committed
237    /// - `Err(Error::TooLarge)` if `data.len() > INDEXBUS_SLOT_DATA_SIZE`
238    /// - `Err(Error::Full)` if bounded capacity is exhausted
239    pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
240        if data.len() > INDEXBUS_SLOT_DATA_SIZE {
241            return Err(Error::TooLarge {
242                max: INDEXBUS_SLOT_DATA_SIZE,
243                len: data.len(),
244            });
245        }
246
247        let slot =
248            unsafe { internal::events::pool_alloc_from_shared(self.shared) }.ok_or(Error::Full)?;
249
250        unsafe {
251            internal::events::slot_write_from_shared(self.shared, slot, data);
252            internal::events::spsc_queue_push_from_shared(self.shared, slot).map_err(|_| {
253                internal::events::pool_free_from_shared(self.shared, slot);
254                Error::Full
255            })?;
256        }
257
258        Ok(())
259    }
260
261    /// Publish by writing directly into the slot buffer.
262    ///
263    /// This avoids intermediate allocations/copies for higher-level adapters.
264    ///
265    /// The closure must return the number of bytes written. That length becomes the published
266    /// message length.
267    ///
268    /// # Errors
269    ///
270    /// - [`PublishWithError::Encode`]: the closure returned an error; no message is published.
271    /// - [`PublishWithError::Core`]: capacity/layout failures (e.g. full), or `len > INDEXBUS_SLOT_DATA_SIZE`.
272    #[inline]
273    pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
274    where
275        F: FnOnce(&mut [u8]) -> Result<usize, E>,
276    {
277        let slot = unsafe { internal::events::pool_alloc_from_shared(self.shared) }
278            .ok_or(PublishWithError::Core(Error::Full))?;
279
280        let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
281        let s = unsafe { &mut *core::ptr::addr_of_mut!((*pool).slots[slot as usize]) };
282
283        let len = match f(&mut s.data[..]) {
284            Ok(n) => n,
285            Err(e) => {
286                unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
287                return Err(PublishWithError::Encode(e));
288            }
289        };
290
291        if len > INDEXBUS_SLOT_DATA_SIZE {
292            unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
293            return Err(PublishWithError::Core(Error::TooLarge {
294                max: INDEXBUS_SLOT_DATA_SIZE,
295                len,
296            }));
297        }
298
299        s.len = len as u32;
300        unsafe {
301            internal::events::spsc_queue_push_from_shared(self.shared, slot).map_err(|_| {
302                internal::events::pool_free_from_shared(self.shared, slot);
303                PublishWithError::Core(Error::Full)
304            })?;
305        }
306        Ok(())
307    }
308
309    /// Publish a previously-received slot by forwarding its slot index into this sender's queue.
310    ///
311    /// On failure, returns the original slot back to the caller.
312    #[inline]
313    pub fn try_publish_slot(&self, mut slot: EventsSlot) -> Result<(), PublishSlotError> {
314        let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
315        if slot.pool != pool {
316            return Err(PublishSlotError {
317                error: Error::IncompatibleLayout,
318                slot,
319            });
320        }
321
322        let pushed = unsafe {
323            internal::events::spsc_queue_push_from_shared(self.shared, slot.slot).is_ok()
324        };
325        if !pushed {
326            return Err(PublishSlotError {
327                error: Error::Full,
328                slot,
329            });
330        }
331
332        slot.released = true;
333        Ok(())
334    }
335
336    /// Publish a previously-received slot, discarding it on failure.
337    #[inline]
338    pub fn publish_slot(&self, slot: EventsSlot) -> Result<(), Error> {
339        match self.try_publish_slot(slot) {
340            Ok(()) => Ok(()),
341            Err(e) => Err(e.error),
342        }
343    }
344}
345
346impl SpscReceiver {
347    /// - `Ok(Some(len))` on success
348    /// - `Ok(None)` if queue empty
349    /// - `Err(BufferTooSmall)` if `out` is too small
350    #[inline]
351    pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, Error> {
352        let Some(slot) = (unsafe { internal::events::spsc_queue_pop_from_shared(self.shared) })
353        else {
354            return Ok(None);
355        };
356
357        unsafe {
358            let len =
359                internal::events::slot_read_into_and_free_from_shared(self.shared, slot, out)?;
360            Ok(Some(len))
361        }
362    }
363
364    /// Receive by borrowing the slot bytes for the duration of `f`.
365    ///
366    /// This enables zero-copy parsing/decoding in adapter layers.
367    ///
368    /// The borrowed `&[u8]` is only valid for the duration of the callback.
369    ///
370    /// # Errors
371    ///
372    /// If the callback returns `Err`, the message is still considered consumed; it will not be
373    /// re-delivered by this receiver.
374    #[inline]
375    pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, RecvWithError<E>>
376    where
377        F: FnOnce(&[u8]) -> Result<R, E>,
378    {
379        let Some(slot) = (unsafe { internal::events::spsc_queue_pop_from_shared(self.shared) })
380        else {
381            return Ok(None);
382        };
383
384        let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
385        let s = unsafe { &*core::ptr::addr_of!((*pool).slots[slot as usize]) };
386        let len = s.len as usize;
387
388        if len > INDEXBUS_SLOT_DATA_SIZE {
389            unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
390            return Err(RecvWithError::Core(Error::IncompatibleLayout));
391        }
392
393        let result = f(&s.data[..len]).map_err(RecvWithError::Decode);
394        unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
395        result.map(Some)
396    }
397
398    /// Receive a message as an owned slot handle.
399    ///
400    /// The returned slot is valid until it is dropped (which frees it back to the pool) or
401    /// forwarded via `publish_slot`.
402    #[inline]
403    pub fn try_recv_slot(&self) -> Result<Option<EventsSlot>, Error> {
404        let Some(slot) = (unsafe { internal::events::spsc_queue_pop_from_shared(self.shared) })
405        else {
406            return Ok(None);
407        };
408
409        let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
410        let s = unsafe { &*core::ptr::addr_of!((*pool).slots[slot as usize]) };
411        let len = s.len as usize;
412        if len > INDEXBUS_SLOT_DATA_SIZE {
413            unsafe { internal::events::pool_free_ptr(pool, slot) };
414            return Err(Error::IncompatibleLayout);
415        }
416
417        Ok(Some(EventsSlot {
418            pool,
419            slot,
420            len: len as u32,
421            released: false,
422        }))
423    }
424}
425
426impl MpscProducer {
427    #[inline]
428    /// Publish a byte payload.
429    ///
430    /// Returns:
431    /// - `Ok(())` if the message was committed
432    /// - `Err(Error::TooLarge)` if `data.len() > INDEXBUS_SLOT_DATA_SIZE`
433    /// - `Err(Error::Full)` if bounded capacity is exhausted
434    pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
435        if data.len() > INDEXBUS_SLOT_DATA_SIZE {
436            return Err(Error::TooLarge {
437                max: INDEXBUS_SLOT_DATA_SIZE,
438                len: data.len(),
439            });
440        }
441
442        let slot =
443            unsafe { internal::events::pool_alloc_from_shared(self.shared) }.ok_or(Error::Full)?;
444        unsafe {
445            internal::events::slot_write_from_shared(self.shared, slot, data);
446            internal::events::mpsc_queue_push_from_shared(self.shared, slot).map_err(|_| {
447                internal::events::pool_free_from_shared(self.shared, slot);
448                Error::Full
449            })?;
450        }
451
452        Ok(())
453    }
454
455    /// Publish by writing directly into the slot buffer.
456    ///
457    /// The closure must return the number of bytes written. That length becomes the published
458    /// message length.
459    ///
460    /// # Errors
461    ///
462    /// - [`PublishWithError::Encode`]: the closure returned an error; no message is published.
463    /// - [`PublishWithError::Core`]: capacity/layout failures (e.g. full), or `len > INDEXBUS_SLOT_DATA_SIZE`.
464    #[inline]
465    pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
466    where
467        F: FnOnce(&mut [u8]) -> Result<usize, E>,
468    {
469        let slot = unsafe { internal::events::pool_alloc_from_shared(self.shared) }
470            .ok_or(PublishWithError::Core(Error::Full))?;
471
472        let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
473        let s = unsafe { &mut *core::ptr::addr_of_mut!((*pool).slots[slot as usize]) };
474
475        let len = match f(&mut s.data[..]) {
476            Ok(n) => n,
477            Err(e) => {
478                unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
479                return Err(PublishWithError::Encode(e));
480            }
481        };
482
483        if len > INDEXBUS_SLOT_DATA_SIZE {
484            unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
485            return Err(PublishWithError::Core(Error::TooLarge {
486                max: INDEXBUS_SLOT_DATA_SIZE,
487                len,
488            }));
489        }
490
491        s.len = len as u32;
492        unsafe {
493            internal::events::mpsc_queue_push_from_shared(self.shared, slot).map_err(|_| {
494                internal::events::pool_free_from_shared(self.shared, slot);
495                PublishWithError::Core(Error::Full)
496            })?;
497        }
498        Ok(())
499    }
500
501    /// Publish a previously-received slot by forwarding its slot index into this producer's queue.
502    ///
503    /// On failure, returns the original slot back to the caller.
504    #[inline]
505    pub fn try_publish_slot(&self, mut slot: EventsSlot) -> Result<(), PublishSlotError> {
506        let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
507        if slot.pool != pool {
508            return Err(PublishSlotError {
509                error: Error::IncompatibleLayout,
510                slot,
511            });
512        }
513
514        let pushed = unsafe {
515            internal::events::mpsc_queue_push_from_shared(self.shared, slot.slot).is_ok()
516        };
517        if !pushed {
518            return Err(PublishSlotError {
519                error: Error::Full,
520                slot,
521            });
522        }
523
524        slot.released = true;
525        Ok(())
526    }
527
528    /// Publish a previously-received slot, discarding it on failure.
529    #[inline]
530    pub fn publish_slot(&self, slot: EventsSlot) -> Result<(), Error> {
531        match self.try_publish_slot(slot) {
532            Ok(()) => Ok(()),
533            Err(e) => Err(e.error),
534        }
535    }
536}
537
538impl MpscConsumer {
539    /// - `Ok(Some(len))` on success
540    /// - `Ok(None)` if queue empty
541    /// - `Err(BufferTooSmall)` if `out` is too small
542    #[inline]
543    pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, Error> {
544        let Some(slot) = (unsafe { internal::events::mpsc_queue_pop_from_shared(self.shared) })
545        else {
546            return Ok(None);
547        };
548
549        unsafe {
550            let len =
551                internal::events::slot_read_into_and_free_from_shared(self.shared, slot, out)?;
552            Ok(Some(len))
553        }
554    }
555
556    /// Receive by borrowing the slot bytes for the duration of `f`.
557    ///
558    /// If the callback returns `Err`, the message is still considered consumed; it will not be
559    /// re-delivered.
560    #[inline]
561    pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, RecvWithError<E>>
562    where
563        F: FnOnce(&[u8]) -> Result<R, E>,
564    {
565        let Some(slot) = (unsafe { internal::events::mpsc_queue_pop_from_shared(self.shared) })
566        else {
567            return Ok(None);
568        };
569
570        let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
571        let s = unsafe { &*core::ptr::addr_of!((*pool).slots[slot as usize]) };
572        let len = s.len as usize;
573
574        if len > INDEXBUS_SLOT_DATA_SIZE {
575            unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
576            return Err(RecvWithError::Core(Error::IncompatibleLayout));
577        }
578
579        let result = f(&s.data[..len]).map_err(RecvWithError::Decode);
580        unsafe { internal::events::pool_free_from_shared(self.shared, slot) };
581        result.map(Some)
582    }
583
584    /// Receive a message as an owned slot handle.
585    ///
586    /// The returned slot is valid until it is dropped (which frees it back to the pool) or
587    /// forwarded via `publish_slot`.
588    #[inline]
589    pub fn try_recv_slot(&self) -> Result<Option<EventsSlot>, Error> {
590        let Some(slot) = (unsafe { internal::events::mpsc_queue_pop_from_shared(self.shared) })
591        else {
592            return Ok(None);
593        };
594
595        let pool = unsafe { core::ptr::addr_of_mut!((*self.shared).slot_pool) };
596        let s = unsafe { &*core::ptr::addr_of!((*pool).slots[slot as usize]) };
597        let len = s.len as usize;
598        if len > INDEXBUS_SLOT_DATA_SIZE {
599            unsafe { internal::events::pool_free_ptr(pool, slot) };
600            return Err(Error::IncompatibleLayout);
601        }
602
603        Ok(Some(EventsSlot {
604            pool,
605            slot,
606            len: len as u32,
607            released: false,
608        }))
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    extern crate alloc;
615
616    use super::*;
617    use alloc::boxed::Box;
618    use indexbus_abi::layouts::SharedLayout;
619
620    #[test]
621    fn mpsc_two_producers_single_consumer_smoke() {
622        let mut shared: Box<SharedLayout> = Box::new(unsafe { core::mem::zeroed() });
623        init_shared_layout(&mut shared);
624
625        let cell = SharedLayoutCell::from_mut(&mut shared);
626        let (p1, consumer) = split_mpsc(cell).unwrap();
627        let (p2, _consumer2) = split_mpsc(cell).unwrap();
628
629        p1.publish(b"a").unwrap();
630        p2.publish(b"b").unwrap();
631
632        let mut out = [0u8; 256];
633        let n1 = consumer.try_recv_into(&mut out).unwrap().unwrap();
634        assert_eq!(&out[..n1], b"a");
635
636        let n2 = consumer.try_recv_into(&mut out).unwrap().unwrap();
637        assert_eq!(&out[..n2], b"b");
638
639        assert_eq!(consumer.try_recv_into(&mut out).unwrap(), None);
640    }
641
642    #[test]
643    fn slot_passing_spsc_to_mpsc_roundtrip() {
644        let mut shared: Box<SharedLayout> = Box::new(unsafe { core::mem::zeroed() });
645        init_shared_layout(&mut shared);
646
647        let cell = SharedLayoutCell::from_mut(&mut shared);
648        let (spsc_tx, spsc_rx) = split_spsc(cell).unwrap();
649        let (mpsc_tx, mpsc_rx) = split_mpsc(cell).unwrap();
650
651        spsc_tx.publish(b"hello").unwrap();
652        let slot = spsc_rx.try_recv_slot().unwrap().unwrap();
653        assert_eq!(slot.as_bytes(), b"hello");
654
655        mpsc_tx.try_publish_slot(slot).unwrap();
656        let slot2 = mpsc_rx.try_recv_slot().unwrap().unwrap();
657        assert_eq!(slot2.as_bytes(), b"hello");
658    }
659}