indexbus_core/
events_chain.rs

1use core::sync::atomic::Ordering;
2
3use indexbus_abi::layouts::{IndexQueue, MpscQueue, SlotPoolLayout};
4
5use crate::internal;
6use crate::{
7    Error, EventsSlot, PublishSlotError, PublishWithError, RecvWithError, INDEXBUS_SLOT_DATA_SIZE,
8};
9
10/// SPSC sender handle over an explicit `(pool, queue)` pair.
11///
12/// This is a building block for layouts that embed *multiple* SPSC queues sharing the same slot
13/// pool.
14pub struct ChainSpscSender {
15    pool: *mut SlotPoolLayout,
16    queue: *mut IndexQueue,
17}
18
19/// SPSC receiver handle over an explicit `(pool, queue)` pair.
20pub struct ChainSpscReceiver {
21    pool: *mut SlotPoolLayout,
22    queue: *mut IndexQueue,
23}
24
25/// MPSC producer handle over an explicit `(pool, queue)` pair.
26///
27/// This is a building block for layouts that embed *multiple* MPSC queues sharing the same slot
28/// pool.
29pub struct ChainMpscProducer {
30    pool: *mut SlotPoolLayout,
31    queue: *mut MpscQueue,
32}
33
34/// MPSC consumer handle over an explicit `(pool, queue)` pair.
35pub struct ChainMpscConsumer {
36    pool: *mut SlotPoolLayout,
37    queue: *mut MpscQueue,
38}
39
40unsafe impl Send for ChainSpscSender {}
41unsafe impl Send for ChainSpscReceiver {}
42
43unsafe impl Send for ChainMpscProducer {}
44unsafe impl Sync for ChainMpscProducer {}
45unsafe impl Send for ChainMpscConsumer {}
46
47/// Create SPSC sender/receiver handles over a shared slot pool and a specific queue.
48///
49/// # Safety
50/// The caller must ensure:
51/// - `pool` and `queue` point into a valid, initialized, mapped region.
52/// - The mapping outlives all returned handles.
53/// - Exactly one thread uses the sender at a time, and exactly one thread uses the receiver.
54#[inline]
55pub unsafe fn split_chain_spsc(
56    pool: *mut SlotPoolLayout,
57    queue: *mut IndexQueue,
58) -> (ChainSpscSender, ChainSpscReceiver) {
59    (
60        ChainSpscSender { pool, queue },
61        ChainSpscReceiver { pool, queue },
62    )
63}
64
65/// Create MPSC producer/consumer handles over a shared slot pool and a specific MPSC queue.
66///
67/// # Safety
68/// The caller must ensure:
69/// - `pool` and `queue` point into a valid, initialized, mapped region.
70/// - The mapping outlives all returned handles.
71/// - Exactly one thread uses the consumer at a time.
72/// - Producer handles may be used concurrently.
73#[inline]
74pub unsafe fn split_chain_mpsc(
75    pool: *mut SlotPoolLayout,
76    queue: *mut MpscQueue,
77) -> (ChainMpscProducer, ChainMpscConsumer) {
78    (
79        ChainMpscProducer { pool, queue },
80        ChainMpscConsumer { pool, queue },
81    )
82}
83
84impl ChainSpscSender {
85    #[inline]
86    fn alloc_slot(&self) -> Result<u32, Error> {
87        unsafe { internal::events::pool_alloc_ptr(self.pool) }.ok_or(Error::Full)
88    }
89
90    #[inline]
91    /// Publish a byte payload.
92    pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
93        if data.len() > INDEXBUS_SLOT_DATA_SIZE {
94            return Err(Error::TooLarge {
95                max: INDEXBUS_SLOT_DATA_SIZE,
96                len: data.len(),
97            });
98        }
99
100        let slot = self.alloc_slot()?;
101        unsafe {
102            let s = &mut *core::ptr::addr_of_mut!((*self.pool).slots[slot as usize]);
103            s.len = data.len() as u32;
104            core::ptr::copy_nonoverlapping(data.as_ptr(), s.data.as_mut_ptr(), data.len());
105            internal::events::queue_push_ptr(self.queue, slot).map_err(|_| {
106                internal::events::pool_free_ptr(self.pool, slot);
107                Error::Full
108            })?;
109        }
110
111        Ok(())
112    }
113
114    #[inline]
115    /// Publish by writing directly into the slot buffer.
116    pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
117    where
118        F: FnOnce(&mut [u8]) -> Result<usize, E>,
119    {
120        let slot = self.alloc_slot().map_err(PublishWithError::Core)?;
121
122        let len = unsafe {
123            let s = &mut *core::ptr::addr_of_mut!((*self.pool).slots[slot as usize]);
124            let len = match f(&mut s.data) {
125                Ok(n) => n,
126                Err(e) => {
127                    internal::events::pool_free_ptr(self.pool, slot);
128                    return Err(PublishWithError::Encode(e));
129                }
130            };
131
132            if len > INDEXBUS_SLOT_DATA_SIZE {
133                internal::events::pool_free_ptr(self.pool, slot);
134                return Err(PublishWithError::Core(Error::TooLarge {
135                    max: INDEXBUS_SLOT_DATA_SIZE,
136                    len,
137                }));
138            }
139
140            s.len = len as u32;
141            len
142        };
143
144        unsafe {
145            internal::events::queue_push_ptr(self.queue, slot).map_err(|_| {
146                internal::events::pool_free_ptr(self.pool, slot);
147                PublishWithError::Core(Error::Full)
148            })?;
149        }
150
151        let _ = len;
152        Ok(())
153    }
154
155    #[inline]
156    /// Publish a previously-received slot by forwarding its slot index into this sender's queue.
157    pub fn try_publish_slot(&self, mut slot: EventsSlot) -> Result<(), PublishSlotError> {
158        if slot.pool != self.pool {
159            return Err(PublishSlotError {
160                error: Error::IncompatibleLayout,
161                slot,
162            });
163        }
164
165        let pushed = unsafe { internal::events::queue_push_ptr(self.queue, slot.slot).is_ok() };
166        if !pushed {
167            return Err(PublishSlotError {
168                error: Error::Full,
169                slot,
170            });
171        }
172
173        slot.released = true;
174        Ok(())
175    }
176
177    #[inline]
178    /// Publish a previously-received slot, discarding it on failure.
179    pub fn publish_slot(&self, slot: EventsSlot) -> Result<(), Error> {
180        match self.try_publish_slot(slot) {
181            Ok(()) => Ok(()),
182            Err(e) => Err(e.error),
183        }
184    }
185}
186
187impl ChainMpscProducer {
188    #[inline]
189    fn alloc_slot(&self) -> Result<u32, Error> {
190        unsafe { internal::events::pool_alloc_ptr(self.pool) }.ok_or(Error::Full)
191    }
192
193    #[inline]
194    /// Publish a byte payload.
195    pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
196        if data.len() > INDEXBUS_SLOT_DATA_SIZE {
197            return Err(Error::TooLarge {
198                max: INDEXBUS_SLOT_DATA_SIZE,
199                len: data.len(),
200            });
201        }
202
203        let slot = self.alloc_slot()?;
204        unsafe {
205            let s = &mut *core::ptr::addr_of_mut!((*self.pool).slots[slot as usize]);
206            s.len = data.len() as u32;
207            core::ptr::copy_nonoverlapping(data.as_ptr(), s.data.as_mut_ptr(), data.len());
208            internal::events::mpsc_queue_push_ptr(self.queue, slot).map_err(|_| {
209                internal::events::pool_free_ptr(self.pool, slot);
210                Error::Full
211            })?;
212        }
213
214        Ok(())
215    }
216
217    #[inline]
218    /// Publish by writing directly into the slot buffer.
219    pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
220    where
221        F: FnOnce(&mut [u8]) -> Result<usize, E>,
222    {
223        let slot = self.alloc_slot().map_err(PublishWithError::Core)?;
224
225        let len = unsafe {
226            let s = &mut *core::ptr::addr_of_mut!((*self.pool).slots[slot as usize]);
227            let len = match f(&mut s.data) {
228                Ok(n) => n,
229                Err(e) => {
230                    internal::events::pool_free_ptr(self.pool, slot);
231                    return Err(PublishWithError::Encode(e));
232                }
233            };
234
235            if len > INDEXBUS_SLOT_DATA_SIZE {
236                internal::events::pool_free_ptr(self.pool, slot);
237                return Err(PublishWithError::Core(Error::TooLarge {
238                    max: INDEXBUS_SLOT_DATA_SIZE,
239                    len,
240                }));
241            }
242
243            s.len = len as u32;
244            len
245        };
246
247        unsafe {
248            internal::events::mpsc_queue_push_ptr(self.queue, slot).map_err(|_| {
249                internal::events::pool_free_ptr(self.pool, slot);
250                PublishWithError::Core(Error::Full)
251            })?;
252        }
253
254        let _ = len;
255        Ok(())
256    }
257
258    #[inline]
259    /// Publish a previously-received slot by forwarding its slot index into this producer's queue.
260    pub fn try_publish_slot(&self, mut slot: EventsSlot) -> Result<(), PublishSlotError> {
261        if slot.pool != self.pool {
262            return Err(PublishSlotError {
263                error: Error::IncompatibleLayout,
264                slot,
265            });
266        }
267
268        let pushed =
269            unsafe { internal::events::mpsc_queue_push_ptr(self.queue, slot.slot).is_ok() };
270        if !pushed {
271            return Err(PublishSlotError {
272                error: Error::Full,
273                slot,
274            });
275        }
276
277        slot.released = true;
278        Ok(())
279    }
280
281    /// Publish a previously-received slot, discarding it on failure.
282    pub fn publish_slot(&self, slot: EventsSlot) -> Result<(), Error> {
283        match self.try_publish_slot(slot) {
284            Ok(()) => Ok(()),
285            Err(e) => Err(e.error),
286        }
287    }
288}
289
290impl ChainSpscReceiver {
291    #[inline]
292    fn pop_slot(&self) -> Option<u32> {
293        unsafe { internal::events::queue_pop_ptr(self.queue) }
294    }
295
296    #[inline]
297    /// Receive into a caller-provided buffer.
298    pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, Error> {
299        let Some(slot) = self.pop_slot() else {
300            return Ok(None);
301        };
302
303        let len = unsafe {
304            let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
305            let len = s.len as usize;
306            if len > INDEXBUS_SLOT_DATA_SIZE {
307                internal::events::pool_free_ptr(self.pool, slot);
308                return Err(Error::IncompatibleLayout);
309            }
310            if out.len() < len {
311                internal::events::pool_free_ptr(self.pool, slot);
312                return Err(Error::BufferTooSmall {
313                    required: len,
314                    provided: out.len(),
315                });
316            }
317            core::ptr::copy_nonoverlapping(s.data.as_ptr(), out.as_mut_ptr(), len);
318            internal::events::pool_free_ptr(self.pool, slot);
319            len
320        };
321
322        Ok(Some(len))
323    }
324
325    #[inline]
326    /// Receive by borrowing the slot bytes for the duration of `f`.
327    pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, RecvWithError<E>>
328    where
329        F: FnOnce(&[u8]) -> Result<R, E>,
330    {
331        let Some(slot) = self.pop_slot() else {
332            return Ok(None);
333        };
334
335        let result = unsafe {
336            let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
337            let len = s.len as usize;
338            if len > INDEXBUS_SLOT_DATA_SIZE {
339                internal::events::pool_free_ptr(self.pool, slot);
340                return Err(RecvWithError::Core(Error::IncompatibleLayout));
341            }
342            let r = f(&s.data[..len]).map_err(RecvWithError::Decode);
343            internal::events::pool_free_ptr(self.pool, slot);
344            r
345        };
346
347        result.map(Some)
348    }
349
350    #[inline]
351    /// Receive a message as an owned slot handle.
352    pub fn try_recv_slot(&self) -> Result<Option<EventsSlot>, Error> {
353        let Some(slot) = self.pop_slot() else {
354            return Ok(None);
355        };
356
357        let len = unsafe {
358            let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
359            s.len as usize
360        };
361
362        if len > INDEXBUS_SLOT_DATA_SIZE {
363            unsafe { internal::events::pool_free_ptr(self.pool, slot) };
364            return Err(Error::IncompatibleLayout);
365        }
366
367        Ok(Some(EventsSlot {
368            pool: self.pool,
369            slot,
370            len: len as u32,
371            released: false,
372        }))
373    }
374
375    #[inline]
376    /// Convenience: best-effort poll (no error, returns `None` on empty or failure).
377    pub fn recv(&self, out: &mut [u8]) -> Option<usize> {
378        self.try_recv_into(out).ok().flatten()
379    }
380
381    #[inline]
382    /// Poll and yield a bounded batch of messages, resetting the queue head/tail cache-line state.
383    ///
384    /// This is intentionally minimal; higher layers should use `WaitStrategy` for idle behavior.
385    pub fn drain_batch_into(&self, out: &mut [u8], batch: usize) -> usize {
386        let mut n = 0;
387        let batch = batch.max(1);
388        for _ in 0..batch {
389            if self.try_recv_into(out).ok().flatten().is_some() {
390                n += 1;
391            } else {
392                break;
393            }
394        }
395        n
396    }
397}
398
399impl ChainMpscConsumer {
400    #[inline]
401    fn pop_slot(&self) -> Option<u32> {
402        unsafe { internal::events::mpsc_queue_pop_ptr(self.queue) }
403    }
404
405    #[inline]
406    /// Receive into a caller-provided buffer.
407    pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, Error> {
408        let Some(slot) = self.pop_slot() else {
409            return Ok(None);
410        };
411
412        let len = unsafe {
413            let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
414            let len = s.len as usize;
415            if len > INDEXBUS_SLOT_DATA_SIZE {
416                internal::events::pool_free_ptr(self.pool, slot);
417                return Err(Error::IncompatibleLayout);
418            }
419            if out.len() < len {
420                internal::events::pool_free_ptr(self.pool, slot);
421                return Err(Error::BufferTooSmall {
422                    required: len,
423                    provided: out.len(),
424                });
425            }
426            core::ptr::copy_nonoverlapping(s.data.as_ptr(), out.as_mut_ptr(), len);
427            internal::events::pool_free_ptr(self.pool, slot);
428            len
429        };
430
431        Ok(Some(len))
432    }
433
434    #[inline]
435    /// Receive by borrowing the slot bytes for the duration of `f`.
436    pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, RecvWithError<E>>
437    where
438        F: FnOnce(&[u8]) -> Result<R, E>,
439    {
440        let Some(slot) = self.pop_slot() else {
441            return Ok(None);
442        };
443
444        let result = unsafe {
445            let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
446            let len = s.len as usize;
447            if len > INDEXBUS_SLOT_DATA_SIZE {
448                internal::events::pool_free_ptr(self.pool, slot);
449                return Err(RecvWithError::Core(Error::IncompatibleLayout));
450            }
451            let r = f(&s.data[..len]).map_err(RecvWithError::Decode);
452            internal::events::pool_free_ptr(self.pool, slot);
453            r
454        };
455
456        result.map(Some)
457    }
458
459    #[inline]
460    /// Receive a message as an owned slot handle.
461    pub fn try_recv_slot(&self) -> Result<Option<EventsSlot>, Error> {
462        let Some(slot) = self.pop_slot() else {
463            return Ok(None);
464        };
465
466        let len = unsafe {
467            let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
468            s.len as usize
469        };
470
471        if len > INDEXBUS_SLOT_DATA_SIZE {
472            unsafe { internal::events::pool_free_ptr(self.pool, slot) };
473            return Err(Error::IncompatibleLayout);
474        }
475
476        Ok(Some(EventsSlot {
477            pool: self.pool,
478            slot,
479            len: len as u32,
480            released: false,
481        }))
482    }
483
484    #[inline]
485    /// Convenience: best-effort poll (no error, returns `None` on empty or failure).
486    pub fn recv(&self, out: &mut [u8]) -> Option<usize> {
487        self.try_recv_into(out).ok().flatten()
488    }
489}
490
491// Keep Ordering referenced so this file doesn't trip unused-import lints when feature sets change.
492const _: Ordering = Ordering::Relaxed;