indexbus_typed/
typed.rs

1use indexbus_abi::INDEXBUS_SLOT_DATA_SIZE;
2use indexbus_codec::{CodecId, DecodeBorrowed, DecodeFrom, EncodeInto};
3use indexbus_core::{
4    FanoutConsumer, FanoutMpscProducer, FanoutProducer, MpscConsumer, MpscProducer, SpscReceiver,
5    SpscSender,
6};
7use indexbus_msg::{decode_header, encode_header_into, Header, V1_HEADER_LEN};
8
9use crate::{Error, IndexbusMsgSpec, MsgMeta};
10
11fn validate_meta(meta: MsgMeta, hdr: Header) -> Result<(), Error> {
12    if hdr.schema_id != meta.schema_id {
13        return Err(Error::SchemaMismatch {
14            expected: meta.schema_id,
15            found: hdr.schema_id,
16        });
17    }
18    if hdr.msg_type != meta.msg_type {
19        return Err(Error::TypeMismatch {
20            expected: meta.msg_type,
21            found: hdr.msg_type,
22        });
23    }
24    if hdr.msg_version != meta.msg_version {
25        return Err(Error::VersionMismatch {
26            expected: meta.msg_version,
27            found: hdr.msg_version,
28        });
29    }
30    Ok(())
31}
32
33/// Typed SPSC sender that publishes v1-framed messages using codec `C`.
34///
35/// ## Contract
36///
37/// - Publishes a v1 header prefix (`indexbus_msg`) plus codec-defined payload bytes.
38/// - If encoding fails, no message is published.
39pub struct TypedSpscSender<C> {
40    inner: SpscSender,
41    codec: C,
42}
43
44/// Typed SPSC receiver that decodes v1-framed messages using codec `C`.
45///
46/// ## Partial effects
47///
48/// Receive helpers follow `indexbus-core` semantics: if a message is dequeued and validation or
49/// decoding fails, the message is still considered consumed.
50pub struct TypedSpscReceiver<C> {
51    inner: SpscReceiver,
52    codec: C,
53}
54
55/// Typed MPSC producer that publishes v1-framed messages using codec `C`.
56pub struct TypedMpscProducer<C> {
57    inner: MpscProducer,
58    codec: C,
59}
60
61/// Typed MPSC consumer that decodes v1-framed messages using codec `C`.
62///
63/// See [`TypedSpscReceiver`] for receive-side partial-effect semantics.
64pub struct TypedMpscConsumer<C> {
65    inner: MpscConsumer,
66    codec: C,
67}
68
69/// Typed fanout SPSC producer that publishes into the producer→router queue.
70pub struct TypedFanoutProducer<const N: usize, C> {
71    inner: FanoutProducer<N>,
72    codec: C,
73}
74
75/// Typed fanout MPSC producer that publishes into the producer→router MPSC queue.
76pub struct TypedFanoutMpscProducer<const N: usize, C> {
77    inner: FanoutMpscProducer<N>,
78    codec: C,
79}
80
81/// Typed fanout consumer that decodes v1-framed messages using codec `C`.
82///
83/// See [`TypedSpscReceiver`] for receive-side partial-effect semantics.
84pub struct TypedFanoutConsumer<const N: usize, C> {
85    inner: FanoutConsumer<N>,
86    codec: C,
87}
88
89impl<C> TypedSpscSender<C> {
90    #[inline]
91    /// Wrap an `indexbus-core` SPSC sender with a codec.
92    pub fn new(inner: SpscSender, codec: C) -> Self {
93        Self { inner, codec }
94    }
95}
96
97impl<C> TypedSpscReceiver<C> {
98    #[inline]
99    /// Wrap an `indexbus-core` SPSC receiver with a codec.
100    pub fn new(inner: SpscReceiver, codec: C) -> Self {
101        Self { inner, codec }
102    }
103}
104
105impl<C> TypedMpscProducer<C> {
106    #[inline]
107    /// Wrap an `indexbus-core` MPSC producer with a codec.
108    pub fn new(inner: MpscProducer, codec: C) -> Self {
109        Self { inner, codec }
110    }
111}
112
113impl<C> TypedMpscConsumer<C> {
114    #[inline]
115    /// Wrap an `indexbus-core` MPSC consumer with a codec.
116    pub fn new(inner: MpscConsumer, codec: C) -> Self {
117        Self { inner, codec }
118    }
119}
120
121impl<const N: usize, C> TypedFanoutProducer<N, C> {
122    #[inline]
123    /// Wrap an `indexbus-core` fanout producer with a codec.
124    pub fn new(inner: FanoutProducer<N>, codec: C) -> Self {
125        Self { inner, codec }
126    }
127}
128
129impl<const N: usize, C> TypedFanoutMpscProducer<N, C> {
130    #[inline]
131    /// Wrap an `indexbus-core` fanout MPSC producer with a codec.
132    pub fn new(inner: FanoutMpscProducer<N>, codec: C) -> Self {
133        Self { inner, codec }
134    }
135}
136
137impl<const N: usize, C> TypedFanoutConsumer<N, C> {
138    #[inline]
139    /// Wrap an `indexbus-core` fanout consumer with a codec.
140    pub fn new(inner: FanoutConsumer<N>, codec: C) -> Self {
141        Self { inner, codec }
142    }
143}
144
145impl<C> TypedSpscSender<C> {
146    #[inline]
147    /// Publish `value` with the provided message identity.
148    ///
149    /// This encodes a v1 header into the slot prefix, encodes `value` into the payload region,
150    /// and publishes the resulting slot bytes.
151    ///
152    /// # Errors
153    ///
154    /// - [`Error::TooLarge`]: the encoded payload does not fit in a single slot.
155    /// - [`Error::Codec`]/[`Error::Envelope`]: encoding failed.
156    pub fn publish<T>(&self, meta: MsgMeta, value: &T) -> Result<(), Error>
157    where
158        C: CodecId + EncodeInto<T>,
159    {
160        self.inner
161            .publish_with(|slot_buf| -> Result<usize, Error> {
162                debug_assert_eq!(slot_buf.len(), INDEXBUS_SLOT_DATA_SIZE);
163
164                let max_payload = INDEXBUS_SLOT_DATA_SIZE - V1_HEADER_LEN;
165                let payload_start = V1_HEADER_LEN;
166
167                let payload_len = match self
168                    .codec
169                    .encode_into(value, &mut slot_buf[payload_start..])
170                {
171                    Ok(n) => n,
172                    Err(indexbus_codec::CodecError::BufferTooSmall { required, .. }) => {
173                        return Err(Error::TooLarge {
174                            max: max_payload,
175                            len: required,
176                        });
177                    }
178                    Err(e) => return Err(Error::Codec(e)),
179                };
180
181                if payload_len > max_payload {
182                    return Err(Error::TooLarge {
183                        max: max_payload,
184                        len: payload_len,
185                    });
186                }
187
188                let hdr = Header {
189                    flags: meta.flags,
190                    codec_id: C::CODEC_ID,
191                    header_len: V1_HEADER_LEN as u8,
192                    schema_id: meta.schema_id,
193                    msg_type: meta.msg_type,
194                    msg_version: meta.msg_version,
195                    payload_len: payload_len as u16,
196                };
197
198                encode_header_into(&mut slot_buf[..V1_HEADER_LEN], hdr).map_err(Error::from)?;
199                Ok(V1_HEADER_LEN + payload_len)
200            })
201            .map_err(Error::from)
202    }
203
204    /// Publish a typed message using compile-time identity (`T::META`).
205    ///
206    /// This is sugar over [`Self::publish`]. Use `publish(meta, value)` if you need
207    /// dynamic identity or non-default flags.
208    #[inline]
209    pub fn publish_msg<T>(&self, value: &T) -> Result<(), Error>
210    where
211        T: IndexbusMsgSpec,
212        C: CodecId + EncodeInto<T>,
213    {
214        self.publish(<T as IndexbusMsgSpec>::META, value)
215    }
216}
217
218impl<C> TypedMpscProducer<C> {
219    #[inline]
220    /// Publish `value` with the provided message identity.
221    ///
222    /// See [`TypedSpscSender::publish`] for error semantics.
223    pub fn publish<T>(&self, meta: MsgMeta, value: &T) -> Result<(), Error>
224    where
225        C: CodecId + EncodeInto<T>,
226    {
227        self.inner
228            .publish_with(|slot_buf| -> Result<usize, Error> {
229                let max_payload = INDEXBUS_SLOT_DATA_SIZE - V1_HEADER_LEN;
230                let payload_start = V1_HEADER_LEN;
231
232                let payload_len = match self
233                    .codec
234                    .encode_into(value, &mut slot_buf[payload_start..])
235                {
236                    Ok(n) => n,
237                    Err(indexbus_codec::CodecError::BufferTooSmall { required, .. }) => {
238                        return Err(Error::TooLarge {
239                            max: max_payload,
240                            len: required,
241                        });
242                    }
243                    Err(e) => return Err(Error::Codec(e)),
244                };
245
246                if payload_len > max_payload {
247                    return Err(Error::TooLarge {
248                        max: max_payload,
249                        len: payload_len,
250                    });
251                }
252
253                let hdr = Header {
254                    flags: meta.flags,
255                    codec_id: C::CODEC_ID,
256                    header_len: V1_HEADER_LEN as u8,
257                    schema_id: meta.schema_id,
258                    msg_type: meta.msg_type,
259                    msg_version: meta.msg_version,
260                    payload_len: payload_len as u16,
261                };
262
263                encode_header_into(&mut slot_buf[..V1_HEADER_LEN], hdr).map_err(Error::from)?;
264                Ok(V1_HEADER_LEN + payload_len)
265            })
266            .map_err(Error::from)
267    }
268
269    /// Publish a typed message using compile-time identity (`T::META`).
270    #[inline]
271    pub fn publish_msg<T>(&self, value: &T) -> Result<(), Error>
272    where
273        T: IndexbusMsgSpec,
274        C: CodecId + EncodeInto<T>,
275    {
276        self.publish(<T as IndexbusMsgSpec>::META, value)
277    }
278}
279
280impl<const N: usize, C> TypedFanoutProducer<N, C> {
281    #[inline]
282    /// Publish `value` with the provided message identity.
283    pub fn publish<T>(&self, meta: MsgMeta, value: &T) -> Result<(), Error>
284    where
285        C: CodecId + EncodeInto<T>,
286    {
287        self.inner
288            .publish_with(|slot_buf| -> Result<usize, Error> {
289                let max_payload = INDEXBUS_SLOT_DATA_SIZE - V1_HEADER_LEN;
290                let payload_start = V1_HEADER_LEN;
291
292                let payload_len = match self
293                    .codec
294                    .encode_into(value, &mut slot_buf[payload_start..])
295                {
296                    Ok(n) => n,
297                    Err(indexbus_codec::CodecError::BufferTooSmall { required, .. }) => {
298                        return Err(Error::TooLarge {
299                            max: max_payload,
300                            len: required,
301                        });
302                    }
303                    Err(e) => return Err(Error::Codec(e)),
304                };
305
306                if payload_len > max_payload {
307                    return Err(Error::TooLarge {
308                        max: max_payload,
309                        len: payload_len,
310                    });
311                }
312
313                let hdr = Header {
314                    flags: meta.flags,
315                    codec_id: C::CODEC_ID,
316                    header_len: V1_HEADER_LEN as u8,
317                    schema_id: meta.schema_id,
318                    msg_type: meta.msg_type,
319                    msg_version: meta.msg_version,
320                    payload_len: payload_len as u16,
321                };
322
323                encode_header_into(&mut slot_buf[..V1_HEADER_LEN], hdr).map_err(Error::from)?;
324                Ok(V1_HEADER_LEN + payload_len)
325            })
326            .map_err(Error::from)
327    }
328
329    /// Publish a typed message using compile-time identity (`T::META`).
330    #[inline]
331    pub fn publish_msg<T>(&self, value: &T) -> Result<(), Error>
332    where
333        T: IndexbusMsgSpec,
334        C: CodecId + EncodeInto<T>,
335    {
336        self.publish(<T as IndexbusMsgSpec>::META, value)
337    }
338}
339
340impl<const N: usize, C> TypedFanoutMpscProducer<N, C> {
341    #[inline]
342    /// Publish `value` with the provided message identity.
343    pub fn publish<T>(&self, meta: MsgMeta, value: &T) -> Result<(), Error>
344    where
345        C: CodecId + EncodeInto<T>,
346    {
347        self.inner
348            .publish_with(|slot_buf| -> Result<usize, Error> {
349                let max_payload = INDEXBUS_SLOT_DATA_SIZE - V1_HEADER_LEN;
350                let payload_start = V1_HEADER_LEN;
351
352                let payload_len = match self
353                    .codec
354                    .encode_into(value, &mut slot_buf[payload_start..])
355                {
356                    Ok(n) => n,
357                    Err(indexbus_codec::CodecError::BufferTooSmall { required, .. }) => {
358                        return Err(Error::TooLarge {
359                            max: max_payload,
360                            len: required,
361                        });
362                    }
363                    Err(e) => return Err(Error::Codec(e)),
364                };
365
366                if payload_len > max_payload {
367                    return Err(Error::TooLarge {
368                        max: max_payload,
369                        len: payload_len,
370                    });
371                }
372
373                let hdr = Header {
374                    flags: meta.flags,
375                    codec_id: C::CODEC_ID,
376                    header_len: V1_HEADER_LEN as u8,
377                    schema_id: meta.schema_id,
378                    msg_type: meta.msg_type,
379                    msg_version: meta.msg_version,
380                    payload_len: payload_len as u16,
381                };
382
383                encode_header_into(&mut slot_buf[..V1_HEADER_LEN], hdr).map_err(Error::from)?;
384                Ok(V1_HEADER_LEN + payload_len)
385            })
386            .map_err(Error::from)
387    }
388
389    /// Publish a typed message using compile-time identity (`T::META`).
390    #[inline]
391    pub fn publish_msg<T>(&self, value: &T) -> Result<(), Error>
392    where
393        T: IndexbusMsgSpec,
394        C: CodecId + EncodeInto<T>,
395    {
396        self.publish(<T as IndexbusMsgSpec>::META, value)
397    }
398}
399
400impl<C> TypedSpscReceiver<C> {
401    /// Receive a typed borrowed view and process it within `f`.
402    ///
403    /// The borrowed view is only valid for the duration of the call.
404    ///
405    /// ## Partial effects
406    ///
407    /// If a message is dequeued, any error returned by `f` (or by envelope/meta/codec validation)
408    /// still consumes that message.
409    #[inline]
410    pub fn try_recv_borrowed_with<T, F, R>(
411        &self,
412        meta: MsgMeta,
413        mut f: F,
414    ) -> Result<Option<R>, Error>
415    where
416        C: CodecId + DecodeBorrowed<T>,
417        F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
418    {
419        self.inner
420            .try_recv_with(|bytes| -> Result<R, Error> {
421                let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
422                if hdr.codec_id != C::CODEC_ID {
423                    return Err(Error::UnsupportedCodec {
424                        codec_id: hdr.codec_id,
425                    });
426                }
427                validate_meta(meta, hdr)?;
428                let view = self.codec.decode_borrowed(payload).map_err(Error::from)?;
429                f(hdr, view)
430            })
431            .map_err(Error::from)
432    }
433
434    /// Receive a typed borrowed view using compile-time identity (`T::META`).
435    #[inline]
436    pub fn try_recv_borrowed_msg_with<T, F, R>(&self, f: F) -> Result<Option<R>, Error>
437    where
438        T: IndexbusMsgSpec,
439        C: CodecId + DecodeBorrowed<T>,
440        F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
441    {
442        self.try_recv_borrowed_with(<T as IndexbusMsgSpec>::META, f)
443    }
444}
445
446impl<C> TypedMpscConsumer<C> {
447    /// Receive a typed borrowed view and process it within `f`.
448    ///
449    /// See [`TypedSpscReceiver::try_recv_borrowed_with`] for partial-effect semantics.
450    #[inline]
451    pub fn try_recv_borrowed_with<T, F, R>(
452        &self,
453        meta: MsgMeta,
454        mut f: F,
455    ) -> Result<Option<R>, Error>
456    where
457        C: CodecId + DecodeBorrowed<T>,
458        F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
459    {
460        self.inner
461            .try_recv_with(|bytes| -> Result<R, Error> {
462                let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
463                if hdr.codec_id != C::CODEC_ID {
464                    return Err(Error::UnsupportedCodec {
465                        codec_id: hdr.codec_id,
466                    });
467                }
468                validate_meta(meta, hdr)?;
469                let view = self.codec.decode_borrowed(payload).map_err(Error::from)?;
470                f(hdr, view)
471            })
472            .map_err(Error::from)
473    }
474
475    /// Receive a typed borrowed view using compile-time identity (`T::META`).
476    #[inline]
477    pub fn try_recv_borrowed_msg_with<T, F, R>(&self, f: F) -> Result<Option<R>, Error>
478    where
479        T: IndexbusMsgSpec,
480        C: CodecId + DecodeBorrowed<T>,
481        F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
482    {
483        self.try_recv_borrowed_with(<T as IndexbusMsgSpec>::META, f)
484    }
485}
486
487impl<const N: usize, C> TypedFanoutConsumer<N, C> {
488    /// Receive a typed borrowed view and process it within `f`.
489    ///
490    /// See [`TypedSpscReceiver::try_recv_borrowed_with`] for partial-effect semantics.
491    #[inline]
492    pub fn try_recv_borrowed_with<T, F, R>(
493        &self,
494        meta: MsgMeta,
495        mut f: F,
496    ) -> Result<Option<R>, Error>
497    where
498        C: CodecId + DecodeBorrowed<T>,
499        F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
500    {
501        self.inner
502            .try_recv_with(|bytes| -> Result<R, Error> {
503                let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
504                if hdr.codec_id != C::CODEC_ID {
505                    return Err(Error::UnsupportedCodec {
506                        codec_id: hdr.codec_id,
507                    });
508                }
509                validate_meta(meta, hdr)?;
510                let view = self.codec.decode_borrowed(payload).map_err(Error::from)?;
511                f(hdr, view)
512            })
513            .map_err(Error::from)
514    }
515
516    /// Receive a typed borrowed view using compile-time identity (`T::META`).
517    #[inline]
518    pub fn try_recv_borrowed_msg_with<T, F, R>(&self, f: F) -> Result<Option<R>, Error>
519    where
520        T: IndexbusMsgSpec,
521        C: CodecId + DecodeBorrowed<T>,
522        F: for<'a> FnMut(Header, <C as DecodeBorrowed<T>>::View<'a>) -> Result<R, Error>,
523    {
524        self.try_recv_borrowed_with(<T as IndexbusMsgSpec>::META, f)
525    }
526}
527
528impl<C> TypedSpscReceiver<C> {
529    #[inline]
530    /// Receive and decode a typed owned value.
531    ///
532    /// Returns `Ok(None)` when the queue is empty. On success returns the decoded v1 header and
533    /// decoded value.
534    ///
535    /// ## Partial effects
536    ///
537    /// If a message is dequeued, any error (envelope/meta/codec) still consumes that message.
538    pub fn try_recv_owned<T>(&self, meta: MsgMeta) -> Result<Option<(Header, T)>, Error>
539    where
540        C: CodecId + DecodeFrom<T>,
541    {
542        self.inner
543            .try_recv_with(|bytes| -> Result<(Header, T), Error> {
544                let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
545                if hdr.codec_id != C::CODEC_ID {
546                    return Err(Error::UnsupportedCodec {
547                        codec_id: hdr.codec_id,
548                    });
549                }
550                validate_meta(meta, hdr)?;
551                let value = self.codec.decode_from(payload).map_err(Error::from)?;
552                Ok((hdr, value))
553            })
554            .map_err(Error::from)
555    }
556
557    /// Receive a typed owned value using compile-time identity (`T::META`).
558    #[inline]
559    pub fn try_recv_owned_msg<T>(&self) -> Result<Option<(Header, T)>, Error>
560    where
561        T: IndexbusMsgSpec,
562        C: CodecId + DecodeFrom<T>,
563    {
564        self.try_recv_owned(<T as IndexbusMsgSpec>::META)
565    }
566}
567
568impl<C> TypedMpscConsumer<C> {
569    #[inline]
570    /// Receive and decode a typed owned value.
571    ///
572    /// Returns `Ok(None)` when the queue is empty. On success returns the decoded v1 header and
573    /// decoded value.
574    ///
575    /// See [`TypedSpscReceiver::try_recv_owned`] for partial-effect semantics.
576    pub fn try_recv_owned<T>(&self, meta: MsgMeta) -> Result<Option<(Header, T)>, Error>
577    where
578        C: CodecId + DecodeFrom<T>,
579    {
580        self.inner
581            .try_recv_with(|bytes| -> Result<(Header, T), Error> {
582                let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
583                if hdr.codec_id != C::CODEC_ID {
584                    return Err(Error::UnsupportedCodec {
585                        codec_id: hdr.codec_id,
586                    });
587                }
588                validate_meta(meta, hdr)?;
589                let value = self.codec.decode_from(payload).map_err(Error::from)?;
590                Ok((hdr, value))
591            })
592            .map_err(Error::from)
593    }
594
595    /// Receive a typed owned value using compile-time identity (`T::META`).
596    #[inline]
597    pub fn try_recv_owned_msg<T>(&self) -> Result<Option<(Header, T)>, Error>
598    where
599        T: IndexbusMsgSpec,
600        C: CodecId + DecodeFrom<T>,
601    {
602        self.try_recv_owned(<T as IndexbusMsgSpec>::META)
603    }
604}
605
606impl<const N: usize, C> TypedFanoutConsumer<N, C> {
607    #[inline]
608    /// Receive and decode a typed owned value.
609    ///
610    /// Returns `Ok(None)` when empty (or when the consumer index is out of range).
611    ///
612    /// See [`TypedSpscReceiver::try_recv_owned`] for partial-effect semantics.
613    pub fn try_recv_owned<T>(&self, meta: MsgMeta) -> Result<Option<(Header, T)>, Error>
614    where
615        C: CodecId + DecodeFrom<T>,
616    {
617        self.inner
618            .try_recv_with(|bytes| -> Result<(Header, T), Error> {
619                let (hdr, payload) = decode_header(bytes).map_err(Error::from)?;
620                if hdr.codec_id != C::CODEC_ID {
621                    return Err(Error::UnsupportedCodec {
622                        codec_id: hdr.codec_id,
623                    });
624                }
625                validate_meta(meta, hdr)?;
626                let value = self.codec.decode_from(payload).map_err(Error::from)?;
627                Ok((hdr, value))
628            })
629            .map_err(Error::from)
630    }
631
632    /// Receive a typed owned value using compile-time identity (`T::META`).
633    #[inline]
634    pub fn try_recv_owned_msg<T>(&self) -> Result<Option<(Header, T)>, Error>
635    where
636        T: IndexbusMsgSpec,
637        C: CodecId + DecodeFrom<T>,
638    {
639        self.try_recv_owned(<T as IndexbusMsgSpec>::META)
640    }
641}