1use indexbus_abi::INDEXBUS_SLOT_DATA_SIZE;
2use indexbus_codec::{CodecId, DecodeBorrowed, DecodeFrom, EncodeInto};
3use indexbus_core::{
4 FanoutConsumer, FanoutMpscProducer, FanoutProducer, MpscConsumer, MpscProducer, SpscReceiver,
5 SpscSender,
6};
7use indexbus_msg::{decode_header, encode_header_into, Header, V1_HEADER_LEN};
8
9use crate::{Error, IndexbusMsgSpec, MsgMeta};
10
11fn validate_meta(meta: MsgMeta, hdr: Header) -> Result<(), Error> {
12 if hdr.schema_id != meta.schema_id {
13 return Err(Error::SchemaMismatch {
14 expected: meta.schema_id,
15 found: hdr.schema_id,
16 });
17 }
18 if hdr.msg_type != meta.msg_type {
19 return Err(Error::TypeMismatch {
20 expected: meta.msg_type,
21 found: hdr.msg_type,
22 });
23 }
24 if hdr.msg_version != meta.msg_version {
25 return Err(Error::VersionMismatch {
26 expected: meta.msg_version,
27 found: hdr.msg_version,
28 });
29 }
30 Ok(())
31}
32
33pub struct TypedSpscSender<C> {
40 inner: SpscSender,
41 codec: C,
42}
43
44pub struct TypedSpscReceiver<C> {
51 inner: SpscReceiver,
52 codec: C,
53}
54
55pub struct TypedMpscProducer<C> {
57 inner: MpscProducer,
58 codec: C,
59}
60
61pub struct TypedMpscConsumer<C> {
65 inner: MpscConsumer,
66 codec: C,
67}
68
69pub struct TypedFanoutProducer<const N: usize, C> {
71 inner: FanoutProducer<N>,
72 codec: C,
73}
74
75pub struct TypedFanoutMpscProducer<const N: usize, C> {
77 inner: FanoutMpscProducer<N>,
78 codec: C,
79}
80
81pub struct TypedFanoutConsumer<const N: usize, C> {
85 inner: FanoutConsumer<N>,
86 codec: C,
87}
88
89impl<C> TypedSpscSender<C> {
90 #[inline]
91 pub fn new(inner: SpscSender, codec: C) -> Self {
93 Self { inner, codec }
94 }
95}
96
97impl<C> TypedSpscReceiver<C> {
98 #[inline]
99 pub fn new(inner: SpscReceiver, codec: C) -> Self {
101 Self { inner, codec }
102 }
103}
104
105impl<C> TypedMpscProducer<C> {
106 #[inline]
107 pub fn new(inner: MpscProducer, codec: C) -> Self {
109 Self { inner, codec }
110 }
111}
112
113impl<C> TypedMpscConsumer<C> {
114 #[inline]
115 pub fn new(inner: MpscConsumer, codec: C) -> Self {
117 Self { inner, codec }
118 }
119}
120
121impl<const N: usize, C> TypedFanoutProducer<N, C> {
122 #[inline]
123 pub fn new(inner: FanoutProducer<N>, codec: C) -> Self {
125 Self { inner, codec }
126 }
127}
128
129impl<const N: usize, C> TypedFanoutMpscProducer<N, C> {
130 #[inline]
131 pub fn new(inner: FanoutMpscProducer<N>, codec: C) -> Self {
133 Self { inner, codec }
134 }
135}
136
137impl<const N: usize, C> TypedFanoutConsumer<N, C> {
138 #[inline]
139 pub fn new(inner: FanoutConsumer<N>, codec: C) -> Self {
141 Self { inner, codec }
142 }
143}
144
145impl<C> TypedSpscSender<C> {
146 #[inline]
147 pub fn publish<T>(&self, meta: MsgMeta, value: &T) -> Result<(), Error>
157 where
158 C: CodecId + EncodeInto<T>,
159 {
160 self.inner
161 .publish_with(|slot_buf| -> Result<usize, Error> {
162 debug_assert_eq!(slot_buf.len(), INDEXBUS_SLOT_DATA_SIZE);
163
164 let max_payload = INDEXBUS_SLOT_DATA_SIZE - V1_HEADER_LEN;
165 let payload_start = V1_HEADER_LEN;
166
167 let payload_len = match self
168 .codec
169 .encode_into(value, &mut slot_buf[payload_start..])
170 {
171 Ok(n) => n,
172 Err(indexbus_codec::CodecError::BufferTooSmall { required, .. }) => {
173 return Err(Error::TooLarge {
174 max: max_payload,
175 len: required,
176 });
177 }
178 Err(e) => return Err(Error::Codec(e)),
179 };
180
181 if payload_len > max_payload {
182 return Err(Error::TooLarge {
183 max: max_payload,
184 len: payload_len,
185 });
186 }
187
188 let hdr = Header {
189 flags: meta.flags,
190 codec_id: C::CODEC_ID,
191 header_len: V1_HEADER_LEN as u8,
192 schema_id: meta.schema_id,
193 msg_type: meta.msg_type,
194 msg_version: meta.msg_version,
195 payload_len: payload_len as u16,
196 };
197
198 encode_header_into(&mut slot_buf[..V1_HEADER_LEN], hdr).map_err(Error::from)?;
199 Ok(V1_HEADER_LEN + payload_len)
200 })
201 .map_err(Error::from)
202 }
203
204 #[inline]
209 pub fn publish_msg<T>(&self, value: &T) -> Result<(), Error>
210 where
211 T: IndexbusMsgSpec,
212 C: CodecId + EncodeInto<T>,
213 {
214 self.publish(<T as IndexbusMsgSpec>::META, value)
215 }
216}
217
218impl<C> TypedMpscProducer<C> {
219 #[inline]
220 pub fn publish<T>(&self, meta: MsgMeta, value: &T) -> Result<(), Error>
224 where
225 C: CodecId + EncodeInto<T>,
226 {
227 self.inner
228 .publish_with(|slot_buf| -> Result<usize, Error> {
229 let max_payload = INDEXBUS_SLOT_DATA_SIZE - V1_HEADER_LEN;
230 let payload_start = V1_HEADER_LEN;
231
232 let payload_len = match self
233 .codec
234 .encode_into(value, &mut slot_buf[payload_start..])
235 {
236 Ok(n) => n,
237 Err(indexbus_codec::CodecError::BufferTooSmall { required, .. }) => {
238 return Err(Error::TooLarge {
239 max: max_payload,
240 len: required,
241 });
242 }
243 Err(e) => return Err(Error::Codec(e)),
244 };
245
246 if payload_len > max_payload {
247 return Err(Error::TooLarge {
248 max: max_payload,
249 len: payload_len,
250 });
251 }
252
253 let hdr = Header {
254 flags: meta.flags,
255 codec_id: C::CODEC_ID,
256 header_len: V1_HEADER_LEN as u8,
257 schema_id: meta.schema_id,
258 msg_type: meta.msg_type,
259 msg_version: meta.msg_version,
260 payload_len: payload_len as u16,
261 };
262
263 encode_header_into(&mut slot_buf[..V1_HEADER_LEN], hdr).map_err(Error::from)?;
264 Ok(V1_HEADER_LEN + payload_len)
265 })
266 .map_err(Error::from)
267 }
268
269 #[inline]
271 pub fn publish_msg<T>(&self, value: &T) -> Result<(), Error>
272 where
273 T: IndexbusMsgSpec,
274 C: CodecId + EncodeInto<T>,
275 {
276 self.publish(<T as IndexbusMsgSpec>::META, value)
277 }
278}
279
280impl<const N: usize, C> TypedFanoutProducer<N, C> {
281 #[inline]
282 pub fn publish<T>(&self, meta: MsgMeta, value: &T) -> Result<(), Error>
284 where
285 C: CodecId + EncodeInto<T>,
286 {
287 self.inner
288 .publish_with(|slot_buf| -> Result<usize, Error> {
289 let max_payload = INDEXBUS_SLOT_DATA_SIZE - V1_HEADER_LEN;
290 let payload_start = V1_HEADER_LEN;
291
292 let payload_len = match self
293 .codec
294 .encode_into(value, &mut slot_buf[payload_start..])
295 {
296 Ok(n) => n,
297 Err(indexbus_codec::CodecError::BufferTooSmall { required, .. }) => {
298 return Err(Error::TooLarge {
299 max: max_payload,
300 len: required,
301 });
302 }
303 Err(e) => return Err(Error::Codec(e)),
304 };
305
306 if payload_len > max_payload {
307 return Err(Error::TooLarge {
308 max: max_payload,
309 len: payload_len,
310 });
311 }
312
313 let hdr = Header {
314 flags: meta.flags,
315 codec_id: C::CODEC_ID,
316 header_len: V1_HEADER_LEN as u8,
317 schema_id: meta.schema_id,
318 msg_type: meta.msg_type,
319 msg_version: meta.msg_version,
320 payload_len: payload_len as u16,
321 };
322
323 encode_header_into(&mut slot_buf[..V1_HEADER_LEN], hdr).map_err(Error::from)?;
324 Ok(V1_HEADER_LEN + payload_len)
325 })
326 .map_err(Error::from)
327 }
328
329 #[inline]
331 pub fn publish_msg<T>(&self, value: &T) -> Result<(), Error>
332 where
333 T: IndexbusMsgSpec,
334 C: CodecId + EncodeInto<T>,
335 {
336 self.publish(<T as IndexbusMsgSpec>::META, value)
337 }
338}
339
340impl<const N: usize, C> TypedFanoutMpscProducer<N, C> {
341 #[inline]
342 pub fn publish<T>(&self, meta: MsgMeta, value: &T) -> Result<(), Error>
344 where
345 C: CodecId + EncodeInto<T>,
346 {
347 self.inner
348 .publish_with(|slot_buf| -> Result<usize, Error> {
349 let max_payload = INDEXBUS_SLOT_DATA_SIZE - V1_HEADER_LEN;
350 let payload_start = V1_HEADER_LEN;
351
352 let payload_len = match self
353 .codec
354 .encode_into(value, &mut slot_buf[payload_start..])
355 {
356 Ok(n) => n,
357 Err(indexbus_codec::CodecError::BufferTooSmall { required, .. }) => {
358 return Err(Error::TooLarge {
359 max: max_payload,
360 len: required,
361 });
362 }
363 Err(e) => return Err(Error::Codec(e)),
364 };
365
366 if payload_len > max_payload {
367 return Err(Error::TooLarge {
368 max: max_payload,
369 len: payload_len,
370 });
371 }
372
373 let hdr = Header {
374 flags: meta.flags,
375 codec_id: C::CODEC_ID,
376 header_len: V1_HEADER_LEN as u8,
377 schema_id: meta.schema_id,
378 msg_type: meta.msg_type,
379 msg_version: meta.msg_version,
380 payload_len: payload_len as u16,
381 };
382
383 encode_header_into(&mut slot_buf[..V1_HEADER_LEN], hdr).map_err(Error::from)?;
384 Ok(V1_HEADER_LEN + payload_len)
385 })
386 .map_err(Error::from)
387 }
388
389 #[inline]
391 pub fn publish_msg<T>(&self, value: &T) -> Result<(), Error>
392 where
393 T: IndexbusMsgSpec,
394 C: CodecId + EncodeInto<T>,
395 {
396 self.publish(<T as IndexbusMsgSpec>::META, value)
397 }
398}
399
400impl<C> TypedSpscReceiver<C> {
401 #[inline]
410 pub fn try_recv_borrowed_with<T, F, R>(
411 &self,
412 meta: MsgMeta,
413 mut f: F,
414 ) -> Result<Option<R>, Error>
415 where
416 C: CodecId + DecodeBorrowed<T>,
417 F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
418 {
419 self.inner
420 .try_recv_with(|bytes| -> Result<R, Error> {
421 let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
422 if hdr.codec_id != C::CODEC_ID {
423 return Err(Error::UnsupportedCodec {
424 codec_id: hdr.codec_id,
425 });
426 }
427 validate_meta(meta, hdr)?;
428 let view = self.codec.decode_borrowed(payload).map_err(Error::from)?;
429 f(hdr, view)
430 })
431 .map_err(Error::from)
432 }
433
434 #[inline]
436 pub fn try_recv_borrowed_msg_with<T, F, R>(&self, f: F) -> Result<Option<R>, Error>
437 where
438 T: IndexbusMsgSpec,
439 C: CodecId + DecodeBorrowed<T>,
440 F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
441 {
442 self.try_recv_borrowed_with(<T as IndexbusMsgSpec>::META, f)
443 }
444}
445
446impl<C> TypedMpscConsumer<C> {
447 #[inline]
451 pub fn try_recv_borrowed_with<T, F, R>(
452 &self,
453 meta: MsgMeta,
454 mut f: F,
455 ) -> Result<Option<R>, Error>
456 where
457 C: CodecId + DecodeBorrowed<T>,
458 F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
459 {
460 self.inner
461 .try_recv_with(|bytes| -> Result<R, Error> {
462 let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
463 if hdr.codec_id != C::CODEC_ID {
464 return Err(Error::UnsupportedCodec {
465 codec_id: hdr.codec_id,
466 });
467 }
468 validate_meta(meta, hdr)?;
469 let view = self.codec.decode_borrowed(payload).map_err(Error::from)?;
470 f(hdr, view)
471 })
472 .map_err(Error::from)
473 }
474
475 #[inline]
477 pub fn try_recv_borrowed_msg_with<T, F, R>(&self, f: F) -> Result<Option<R>, Error>
478 where
479 T: IndexbusMsgSpec,
480 C: CodecId + DecodeBorrowed<T>,
481 F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
482 {
483 self.try_recv_borrowed_with(<T as IndexbusMsgSpec>::META, f)
484 }
485}
486
487impl<const N: usize, C> TypedFanoutConsumer<N, C> {
488 #[inline]
492 pub fn try_recv_borrowed_with<T, F, R>(
493 &self,
494 meta: MsgMeta,
495 mut f: F,
496 ) -> Result<Option<R>, Error>
497 where
498 C: CodecId + DecodeBorrowed<T>,
499 F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
500 {
501 self.inner
502 .try_recv_with(|bytes| -> Result<R, Error> {
503 let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
504 if hdr.codec_id != C::CODEC_ID {
505 return Err(Error::UnsupportedCodec {
506 codec_id: hdr.codec_id,
507 });
508 }
509 validate_meta(meta, hdr)?;
510 let view = self.codec.decode_borrowed(payload).map_err(Error::from)?;
511 f(hdr, view)
512 })
513 .map_err(Error::from)
514 }
515
516 #[inline]
518 pub fn try_recv_borrowed_msg_with<T, F, R>(&self, f: F) -> Result<Option<R>, Error>
519 where
520 T: IndexbusMsgSpec,
521 C: CodecId + DecodeBorrowed<T>,
522 F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
523 {
524 self.try_recv_borrowed_with(<T as IndexbusMsgSpec>::META, f)
525 }
526}
527
528impl<C> TypedSpscReceiver<C> {
529 #[inline]
530 pub fn try_recv_owned<T>(&self, meta: MsgMeta) -> Result<Option<(Header, T)>, Error>
539 where
540 C: CodecId + DecodeFrom<T>,
541 {
542 self.inner
543 .try_recv_with(|bytes| -> Result<(Header, T), Error> {
544 let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
545 if hdr.codec_id != C::CODEC_ID {
546 return Err(Error::UnsupportedCodec {
547 codec_id: hdr.codec_id,
548 });
549 }
550 validate_meta(meta, hdr)?;
551 let value = self.codec.decode_from(payload).map_err(Error::from)?;
552 Ok((hdr, value))
553 })
554 .map_err(Error::from)
555 }
556
557 #[inline]
559 pub fn try_recv_owned_msg<T>(&self) -> Result<Option<(Header, T)>, Error>
560 where
561 T: IndexbusMsgSpec,
562 C: CodecId + DecodeFrom<T>,
563 {
564 self.try_recv_owned(<T as IndexbusMsgSpec>::META)
565 }
566}
567
568impl<C> TypedMpscConsumer<C> {
569 #[inline]
570 pub fn try_recv_owned<T>(&self, meta: MsgMeta) -> Result<Option<(Header, T)>, Error>
577 where
578 C: CodecId + DecodeFrom<T>,
579 {
580 self.inner
581 .try_recv_with(|bytes| -> Result<(Header, T), Error> {
582 let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
583 if hdr.codec_id != C::CODEC_ID {
584 return Err(Error::UnsupportedCodec {
585 codec_id: hdr.codec_id,
586 });
587 }
588 validate_meta(meta, hdr)?;
589 let value = self.codec.decode_from(payload).map_err(Error::from)?;
590 Ok((hdr, value))
591 })
592 .map_err(Error::from)
593 }
594
595 #[inline]
597 pub fn try_recv_owned_msg<T>(&self) -> Result<Option<(Header, T)>, Error>
598 where
599 T: IndexbusMsgSpec,
600 C: CodecId + DecodeFrom<T>,
601 {
602 self.try_recv_owned(<T as IndexbusMsgSpec>::META)
603 }
604}
605
606impl<const N: usize, C> TypedFanoutConsumer<N, C> {
607 #[inline]
608 pub fn try_recv_owned<T>(&self, meta: MsgMeta) -> Result<Option<(Header, T)>, Error>
614 where
615 C: CodecId + DecodeFrom<T>,
616 {
617 self.inner
618 .try_recv_with(|bytes| -> Result<(Header, T), Error> {
619 let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
620 if hdr.codec_id != C::CODEC_ID {
621 return Err(Error::UnsupportedCodec {
622 codec_id: hdr.codec_id,
623 });
624 }
625 validate_meta(meta, hdr)?;
626 let value = self.codec.decode_from(payload).map_err(Error::from)?;
627 Ok((hdr, value))
628 })
629 .map_err(Error::from)
630 }
631
632 #[inline]
634 pub fn try_recv_owned_msg<T>(&self) -> Result<Option<(Header, T)>, Error>
635 where
636 T: IndexbusMsgSpec,
637 C: CodecId + DecodeFrom<T>,
638 {
639 self.try_recv_owned(<T as IndexbusMsgSpec>::META)
640 }
641}