1use core::sync::atomic::Ordering;
2
3use indexbus_abi::layouts::{IndexQueue, MpscQueue, SlotPoolLayout};
4
5use crate::internal;
6use crate::{
7 Error, EventsSlot, PublishSlotError, PublishWithError, RecvWithError, INDEXBUS_SLOT_DATA_SIZE,
8};
9
10pub struct ChainSpscSender {
15 pool: *mut SlotPoolLayout,
16 queue: *mut IndexQueue,
17}
18
19pub struct ChainSpscReceiver {
21 pool: *mut SlotPoolLayout,
22 queue: *mut IndexQueue,
23}
24
25pub struct ChainMpscProducer {
30 pool: *mut SlotPoolLayout,
31 queue: *mut MpscQueue,
32}
33
34pub struct ChainMpscConsumer {
36 pool: *mut SlotPoolLayout,
37 queue: *mut MpscQueue,
38}
39
40unsafe impl Send for ChainSpscSender {}
41unsafe impl Send for ChainSpscReceiver {}
42
43unsafe impl Send for ChainMpscProducer {}
44unsafe impl Sync for ChainMpscProducer {}
45unsafe impl Send for ChainMpscConsumer {}
46
47#[inline]
55pub unsafe fn split_chain_spsc(
56 pool: *mut SlotPoolLayout,
57 queue: *mut IndexQueue,
58) -> (ChainSpscSender, ChainSpscReceiver) {
59 (
60 ChainSpscSender { pool, queue },
61 ChainSpscReceiver { pool, queue },
62 )
63}
64
65#[inline]
74pub unsafe fn split_chain_mpsc(
75 pool: *mut SlotPoolLayout,
76 queue: *mut MpscQueue,
77) -> (ChainMpscProducer, ChainMpscConsumer) {
78 (
79 ChainMpscProducer { pool, queue },
80 ChainMpscConsumer { pool, queue },
81 )
82}
83
84impl ChainSpscSender {
85 #[inline]
86 fn alloc_slot(&self) -> Result<u32, Error> {
87 unsafe { internal::events::pool_alloc_ptr(self.pool) }.ok_or(Error::Full)
88 }
89
90 #[inline]
91 pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
93 if data.len() > INDEXBUS_SLOT_DATA_SIZE {
94 return Err(Error::TooLarge {
95 max: INDEXBUS_SLOT_DATA_SIZE,
96 len: data.len(),
97 });
98 }
99
100 let slot = self.alloc_slot()?;
101 unsafe {
102 let s = &mut *core::ptr::addr_of_mut!((*self.pool).slots[slot as usize]);
103 s.len = data.len() as u32;
104 core::ptr::copy_nonoverlapping(data.as_ptr(), s.data.as_mut_ptr(), data.len());
105 internal::events::queue_push_ptr(self.queue, slot).map_err(|_| {
106 internal::events::pool_free_ptr(self.pool, slot);
107 Error::Full
108 })?;
109 }
110
111 Ok(())
112 }
113
114 #[inline]
115 pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
117 where
118 F: FnOnce(&mut [u8]) -> Result<usize, E>,
119 {
120 let slot = self.alloc_slot().map_err(PublishWithError::Core)?;
121
122 let len = unsafe {
123 let s = &mut *core::ptr::addr_of_mut!((*self.pool).slots[slot as usize]);
124 let len = match f(&mut s.data) {
125 Ok(n) => n,
126 Err(e) => {
127 internal::events::pool_free_ptr(self.pool, slot);
128 return Err(PublishWithError::Encode(e));
129 }
130 };
131
132 if len > INDEXBUS_SLOT_DATA_SIZE {
133 internal::events::pool_free_ptr(self.pool, slot);
134 return Err(PublishWithError::Core(Error::TooLarge {
135 max: INDEXBUS_SLOT_DATA_SIZE,
136 len,
137 }));
138 }
139
140 s.len = len as u32;
141 len
142 };
143
144 unsafe {
145 internal::events::queue_push_ptr(self.queue, slot).map_err(|_| {
146 internal::events::pool_free_ptr(self.pool, slot);
147 PublishWithError::Core(Error::Full)
148 })?;
149 }
150
151 let _ = len;
152 Ok(())
153 }
154
155 #[inline]
156 pub fn try_publish_slot(&self, mut slot: EventsSlot) -> Result<(), PublishSlotError> {
158 if slot.pool != self.pool {
159 return Err(PublishSlotError {
160 error: Error::IncompatibleLayout,
161 slot,
162 });
163 }
164
165 let pushed = unsafe { internal::events::queue_push_ptr(self.queue, slot.slot).is_ok() };
166 if !pushed {
167 return Err(PublishSlotError {
168 error: Error::Full,
169 slot,
170 });
171 }
172
173 slot.released = true;
174 Ok(())
175 }
176
177 #[inline]
178 pub fn publish_slot(&self, slot: EventsSlot) -> Result<(), Error> {
180 match self.try_publish_slot(slot) {
181 Ok(()) => Ok(()),
182 Err(e) => Err(e.error),
183 }
184 }
185}
186
187impl ChainMpscProducer {
188 #[inline]
189 fn alloc_slot(&self) -> Result<u32, Error> {
190 unsafe { internal::events::pool_alloc_ptr(self.pool) }.ok_or(Error::Full)
191 }
192
193 #[inline]
194 pub fn publish(&self, data: &[u8]) -> Result<(), Error> {
196 if data.len() > INDEXBUS_SLOT_DATA_SIZE {
197 return Err(Error::TooLarge {
198 max: INDEXBUS_SLOT_DATA_SIZE,
199 len: data.len(),
200 });
201 }
202
203 let slot = self.alloc_slot()?;
204 unsafe {
205 let s = &mut *core::ptr::addr_of_mut!((*self.pool).slots[slot as usize]);
206 s.len = data.len() as u32;
207 core::ptr::copy_nonoverlapping(data.as_ptr(), s.data.as_mut_ptr(), data.len());
208 internal::events::mpsc_queue_push_ptr(self.queue, slot).map_err(|_| {
209 internal::events::pool_free_ptr(self.pool, slot);
210 Error::Full
211 })?;
212 }
213
214 Ok(())
215 }
216
217 #[inline]
218 pub fn publish_with<F, E>(&self, f: F) -> Result<(), PublishWithError<E>>
220 where
221 F: FnOnce(&mut [u8]) -> Result<usize, E>,
222 {
223 let slot = self.alloc_slot().map_err(PublishWithError::Core)?;
224
225 let len = unsafe {
226 let s = &mut *core::ptr::addr_of_mut!((*self.pool).slots[slot as usize]);
227 let len = match f(&mut s.data) {
228 Ok(n) => n,
229 Err(e) => {
230 internal::events::pool_free_ptr(self.pool, slot);
231 return Err(PublishWithError::Encode(e));
232 }
233 };
234
235 if len > INDEXBUS_SLOT_DATA_SIZE {
236 internal::events::pool_free_ptr(self.pool, slot);
237 return Err(PublishWithError::Core(Error::TooLarge {
238 max: INDEXBUS_SLOT_DATA_SIZE,
239 len,
240 }));
241 }
242
243 s.len = len as u32;
244 len
245 };
246
247 unsafe {
248 internal::events::mpsc_queue_push_ptr(self.queue, slot).map_err(|_| {
249 internal::events::pool_free_ptr(self.pool, slot);
250 PublishWithError::Core(Error::Full)
251 })?;
252 }
253
254 let _ = len;
255 Ok(())
256 }
257
258 #[inline]
259 pub fn try_publish_slot(&self, mut slot: EventsSlot) -> Result<(), PublishSlotError> {
261 if slot.pool != self.pool {
262 return Err(PublishSlotError {
263 error: Error::IncompatibleLayout,
264 slot,
265 });
266 }
267
268 let pushed =
269 unsafe { internal::events::mpsc_queue_push_ptr(self.queue, slot.slot).is_ok() };
270 if !pushed {
271 return Err(PublishSlotError {
272 error: Error::Full,
273 slot,
274 });
275 }
276
277 slot.released = true;
278 Ok(())
279 }
280
281 pub fn publish_slot(&self, slot: EventsSlot) -> Result<(), Error> {
283 match self.try_publish_slot(slot) {
284 Ok(()) => Ok(()),
285 Err(e) => Err(e.error),
286 }
287 }
288}
289
290impl ChainSpscReceiver {
291 #[inline]
292 fn pop_slot(&self) -> Option<u32> {
293 unsafe { internal::events::queue_pop_ptr(self.queue) }
294 }
295
296 #[inline]
297 pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, Error> {
299 let Some(slot) = self.pop_slot() else {
300 return Ok(None);
301 };
302
303 let len = unsafe {
304 let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
305 let len = s.len as usize;
306 if len > INDEXBUS_SLOT_DATA_SIZE {
307 internal::events::pool_free_ptr(self.pool, slot);
308 return Err(Error::IncompatibleLayout);
309 }
310 if out.len() < len {
311 internal::events::pool_free_ptr(self.pool, slot);
312 return Err(Error::BufferTooSmall {
313 required: len,
314 provided: out.len(),
315 });
316 }
317 core::ptr::copy_nonoverlapping(s.data.as_ptr(), out.as_mut_ptr(), len);
318 internal::events::pool_free_ptr(self.pool, slot);
319 len
320 };
321
322 Ok(Some(len))
323 }
324
325 #[inline]
326 pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, RecvWithError<E>>
328 where
329 F: FnOnce(&[u8]) -> Result<R, E>,
330 {
331 let Some(slot) = self.pop_slot() else {
332 return Ok(None);
333 };
334
335 let result = unsafe {
336 let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
337 let len = s.len as usize;
338 if len > INDEXBUS_SLOT_DATA_SIZE {
339 internal::events::pool_free_ptr(self.pool, slot);
340 return Err(RecvWithError::Core(Error::IncompatibleLayout));
341 }
342 let r = f(&s.data[..len]).map_err(RecvWithError::Decode);
343 internal::events::pool_free_ptr(self.pool, slot);
344 r
345 };
346
347 result.map(Some)
348 }
349
350 #[inline]
351 pub fn try_recv_slot(&self) -> Result<Option<EventsSlot>, Error> {
353 let Some(slot) = self.pop_slot() else {
354 return Ok(None);
355 };
356
357 let len = unsafe {
358 let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
359 s.len as usize
360 };
361
362 if len > INDEXBUS_SLOT_DATA_SIZE {
363 unsafe { internal::events::pool_free_ptr(self.pool, slot) };
364 return Err(Error::IncompatibleLayout);
365 }
366
367 Ok(Some(EventsSlot {
368 pool: self.pool,
369 slot,
370 len: len as u32,
371 released: false,
372 }))
373 }
374
375 #[inline]
376 pub fn recv(&self, out: &mut [u8]) -> Option<usize> {
378 self.try_recv_into(out).ok().flatten()
379 }
380
381 #[inline]
382 pub fn drain_batch_into(&self, out: &mut [u8], batch: usize) -> usize {
386 let mut n = 0;
387 let batch = batch.max(1);
388 for _ in 0..batch {
389 if self.try_recv_into(out).ok().flatten().is_some() {
390 n += 1;
391 } else {
392 break;
393 }
394 }
395 n
396 }
397}
398
399impl ChainMpscConsumer {
400 #[inline]
401 fn pop_slot(&self) -> Option<u32> {
402 unsafe { internal::events::mpsc_queue_pop_ptr(self.queue) }
403 }
404
405 #[inline]
406 pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, Error> {
408 let Some(slot) = self.pop_slot() else {
409 return Ok(None);
410 };
411
412 let len = unsafe {
413 let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
414 let len = s.len as usize;
415 if len > INDEXBUS_SLOT_DATA_SIZE {
416 internal::events::pool_free_ptr(self.pool, slot);
417 return Err(Error::IncompatibleLayout);
418 }
419 if out.len() < len {
420 internal::events::pool_free_ptr(self.pool, slot);
421 return Err(Error::BufferTooSmall {
422 required: len,
423 provided: out.len(),
424 });
425 }
426 core::ptr::copy_nonoverlapping(s.data.as_ptr(), out.as_mut_ptr(), len);
427 internal::events::pool_free_ptr(self.pool, slot);
428 len
429 };
430
431 Ok(Some(len))
432 }
433
434 #[inline]
435 pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, RecvWithError<E>>
437 where
438 F: FnOnce(&[u8]) -> Result<R, E>,
439 {
440 let Some(slot) = self.pop_slot() else {
441 return Ok(None);
442 };
443
444 let result = unsafe {
445 let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
446 let len = s.len as usize;
447 if len > INDEXBUS_SLOT_DATA_SIZE {
448 internal::events::pool_free_ptr(self.pool, slot);
449 return Err(RecvWithError::Core(Error::IncompatibleLayout));
450 }
451 let r = f(&s.data[..len]).map_err(RecvWithError::Decode);
452 internal::events::pool_free_ptr(self.pool, slot);
453 r
454 };
455
456 result.map(Some)
457 }
458
459 #[inline]
460 pub fn try_recv_slot(&self) -> Result<Option<EventsSlot>, Error> {
462 let Some(slot) = self.pop_slot() else {
463 return Ok(None);
464 };
465
466 let len = unsafe {
467 let s = &*core::ptr::addr_of!((*self.pool).slots[slot as usize]);
468 s.len as usize
469 };
470
471 if len > INDEXBUS_SLOT_DATA_SIZE {
472 unsafe { internal::events::pool_free_ptr(self.pool, slot) };
473 return Err(Error::IncompatibleLayout);
474 }
475
476 Ok(Some(EventsSlot {
477 pool: self.pool,
478 slot,
479 len: len as u32,
480 released: false,
481 }))
482 }
483
484 #[inline]
485 pub fn recv(&self, out: &mut [u8]) -> Option<usize> {
487 self.try_recv_into(out).ok().flatten()
488 }
489}
490
491const _: Ordering = Ordering::Relaxed;