indexbus_msg/
header.rs

1use crate::constants::{MAGIC, SLOT_CAPACITY, V1_HEADER_LEN};
2use crate::errors::Error;
3use crate::utils::bytes;
4
5/// Parsed v1 envelope header.
6///
7/// ## Field invariants
8///
9/// - `header_len` is the declared length of the header prefix in bytes.
10///   - v1 decoding accepts values `>= V1_HEADER_LEN` and may ignore appended header bytes.
11///   - v1 encoding currently only supports `header_len == V1_HEADER_LEN`.
12/// - `payload_len` is the declared payload length in bytes.
13/// - `header_len + payload_len` is validated against [`crate::SLOT_CAPACITY`].
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct Header {
16    /// Envelope flags; see [`crate::flags`].
17    pub flags: u16,
18    /// Codec identifier used to interpret the payload bytes.
19    pub codec_id: u8,
20    /// Header length in bytes (append-only evolution).
21    pub header_len: u8,
22
23    /// Schema identifier (application-defined).
24    pub schema_id: u64,
25    /// Message type identifier within a schema (application-defined).
26    pub msg_type: u32,
27    /// Message version within a schema/type (application-defined).
28    pub msg_version: u16,
29    /// Payload length in bytes.
30    pub payload_len: u16,
31}
32
33#[inline]
34/// Encode a v1 header into `out`.
35///
36/// ## Contract
37///
38/// - Writes exactly `V1_HEADER_LEN` bytes on success and returns `V1_HEADER_LEN`.
39/// - Currently requires `hdr.header_len == V1_HEADER_LEN`.
40/// - Rejects unknown flag bits for deterministic behavior.
41/// - Validates `hdr.header_len + hdr.payload_len <= SLOT_CAPACITY`.
42/// - Does not write any payload bytes; callers are expected to write the payload after the header.
43///
44/// ## Errors
45///
46/// Returns [`Error::BufferTooSmall`] if `out` is shorter than `V1_HEADER_LEN`. Returns
47/// [`Error::BadHeaderLen`], [`Error::BadPayloadLen`], or [`Error::UnknownFlags`] if the header is
48/// structurally invalid.
49pub fn encode_header_into(out: &mut [u8], hdr: Header) -> Result<usize, Error> {
50    if out.len() < V1_HEADER_LEN {
51        return Err(Error::BufferTooSmall {
52            required: V1_HEADER_LEN,
53            provided: out.len(),
54        });
55    }
56
57    // v1 currently only supports the minimum header.
58    if hdr.header_len as usize != V1_HEADER_LEN {
59        return Err(Error::BadHeaderLen {
60            header_len: hdr.header_len as usize,
61        });
62    }
63
64    let total = (hdr.header_len as usize)
65        .checked_add(hdr.payload_len as usize)
66        .ok_or(Error::BadPayloadLen {
67            payload_len: hdr.payload_len as usize,
68        })?;
69
70    if total > SLOT_CAPACITY {
71        return Err(Error::BadPayloadLen {
72            payload_len: hdr.payload_len as usize,
73        });
74    }
75
76    // Reject unknown flags for deterministic behavior.
77    let known = crate::flags::CRC32_PRESENT | crate::flags::COMPRESSED | crate::flags::SIGNED;
78    if (hdr.flags & !known) != 0 {
79        return Err(Error::UnknownFlags { flags: hdr.flags });
80    }
81
82    out[..4].copy_from_slice(&MAGIC);
83    bytes::write_u16_le(&mut out[4..6], hdr.flags);
84    out[6] = hdr.codec_id;
85    out[7] = hdr.header_len;
86    bytes::write_u64_le(&mut out[8..16], hdr.schema_id);
87    bytes::write_u32_le(&mut out[16..20], hdr.msg_type);
88    bytes::write_u16_le(&mut out[20..22], hdr.msg_version);
89    bytes::write_u16_le(&mut out[22..24], hdr.payload_len);
90
91    Ok(V1_HEADER_LEN)
92}
93
94#[inline]
95/// Decode a v1 header from `input`.
96///
97/// Returns the parsed header and a slice of the payload bytes.
98///
99/// ## Contract
100///
101/// - Accepts `header_len >= V1_HEADER_LEN` and ignores appended header bytes it does not
102///   understand.
103/// - Rejects unknown flag bits for deterministic behavior.
104/// - Validates `header_len + payload_len <= SLOT_CAPACITY`.
105/// - Returns a payload slice that borrows from `input`.
106///
107/// ## Errors
108///
109/// Returns [`Error::Truncated`] if `input` does not contain enough bytes for the declared header or
110/// payload. Returns [`Error::BadMagic`], [`Error::BadHeaderLen`], [`Error::BadPayloadLen`], or
111/// [`Error::UnknownFlags`] if the envelope is structurally invalid.
112pub fn decode_header(input: &[u8]) -> Result<(Header, &[u8]), Error> {
113    if input.len() < V1_HEADER_LEN {
114        return Err(Error::Truncated);
115    }
116
117    if input[..4] != MAGIC {
118        return Err(Error::BadMagic);
119    }
120
121    let flags = bytes::read_u16_le(&input[4..6]);
122    let codec_id = input[6];
123    let header_len = input[7] as usize;
124
125    if header_len < V1_HEADER_LEN {
126        return Err(Error::BadHeaderLen { header_len });
127    }
128
129    if header_len > input.len() {
130        return Err(Error::Truncated);
131    }
132
133    // Reject unknown flags for deterministic behavior.
134    let known = crate::flags::CRC32_PRESENT | crate::flags::COMPRESSED | crate::flags::SIGNED;
135    if (flags & !known) != 0 {
136        return Err(Error::UnknownFlags { flags });
137    }
138
139    // We only parse the v1 fixed prefix; appended bytes are ignored in v1.
140    let schema_id = bytes::read_u64_le(&input[8..16]);
141    let msg_type = bytes::read_u32_le(&input[16..20]);
142    let msg_version = bytes::read_u16_le(&input[20..22]);
143    let payload_len = bytes::read_u16_le(&input[22..24]) as usize;
144
145    if header_len + payload_len > SLOT_CAPACITY {
146        return Err(Error::BadPayloadLen { payload_len });
147    }
148
149    if header_len + payload_len > input.len() {
150        return Err(Error::Truncated);
151    }
152
153    let hdr = Header {
154        flags,
155        codec_id,
156        header_len: header_len as u8,
157        schema_id,
158        msg_type,
159        msg_version,
160        payload_len: payload_len as u16,
161    };
162
163    let payload = &input[header_len..(header_len + payload_len)];
164    Ok((hdr, payload))
165}