byteor_pipeline_backings_mem/
lib.rs

1#![deny(missing_docs)]
2#![deny(unreachable_pub, rust_2018_idioms)]
3#![forbid(unsafe_code)]
4
5//! In-memory lane backings for deterministic worker-side flows.
6//!
7//! These backings avoid file-backed SHM entirely and are intended for Plane B style workflows
8//! such as compile/validate/replay helpers where OS-level shared memory is unnecessary.
9
10use std::collections::VecDeque;
11use std::sync::{Arc, Mutex};
12
13use byteor_pipeline_kernel::{
14    LaneRx, LaneRxBorrow, LaneRxError, LaneTx, LaneTxError, LaneTxWith, LaneTxWithError,
15    LaneTxWithResult,
16};
17
18/// Options for an in-memory events lane.
19#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub struct MemLaneOptions {
21    /// Maximum queued messages before publishes return `LaneTxError::Full`.
22    pub capacity: usize,
23    /// Maximum message length accepted by the lane.
24    pub max_message_len: usize,
25}
26
27impl Default for MemLaneOptions {
28    fn default() -> Self {
29        Self {
30            capacity: 1024,
31            max_message_len: indexbus_core::INDEXBUS_SLOT_DATA_SIZE,
32        }
33    }
34}
35
36#[derive(Default)]
37struct QueueState {
38    msgs: VecDeque<Vec<u8>>,
39}
40
41/// In-memory events TX lane.
42#[derive(Clone)]
43pub struct MemEventsTx {
44    state: Arc<Mutex<QueueState>>,
45    options: MemLaneOptions,
46}
47
48/// In-memory events RX lane.
49#[derive(Clone)]
50pub struct MemEventsRx {
51    state: Arc<Mutex<QueueState>>,
52}
53
54/// Create an in-memory events channel with default message sizing.
55pub fn events_channel(capacity: usize) -> (MemEventsTx, MemEventsRx) {
56    events_channel_with_options(MemLaneOptions {
57        capacity,
58        ..MemLaneOptions::default()
59    })
60}
61
62/// Create an in-memory events channel with explicit options.
63pub fn events_channel_with_options(options: MemLaneOptions) -> (MemEventsTx, MemEventsRx) {
64    let state = Arc::new(Mutex::new(QueueState::default()));
65    (
66        MemEventsTx {
67            state: Arc::clone(&state),
68            options,
69        },
70        MemEventsRx { state },
71    )
72}
73
74impl MemEventsRx {
75    /// Return the current queued depth.
76    pub fn len(&self) -> usize {
77        self.state.lock().expect("mem lane poisoned").msgs.len()
78    }
79
80    /// Whether the lane is empty.
81    pub fn is_empty(&self) -> bool {
82        self.len() == 0
83    }
84}
85
86impl LaneRx for MemEventsRx {
87    fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
88        let msg = self
89            .state
90            .lock()
91            .expect("mem lane poisoned")
92            .msgs
93            .pop_front()?;
94        let n = msg.len().min(buf.len());
95        buf[..n].copy_from_slice(&msg[..n]);
96        Some(n)
97    }
98}
99
100impl LaneRxBorrow for MemEventsRx {
101    fn recv_with<F, R>(&mut self, f: F) -> Result<Option<R>, LaneRxError>
102    where
103        F: FnOnce(&[u8]) -> R,
104    {
105        let Some(msg) = self
106            .state
107            .lock()
108            .expect("mem lane poisoned")
109            .msgs
110            .pop_front()
111        else {
112            return Ok(None);
113        };
114        Ok(Some(f(&msg)))
115    }
116}
117
118impl LaneTx for MemEventsTx {
119    fn publish(&mut self, msg: &[u8]) -> Result<(), LaneTxError> {
120        if msg.len() > self.options.max_message_len {
121            return Err(LaneTxError::TooLarge {
122                max: self.options.max_message_len,
123                len: msg.len(),
124            });
125        }
126
127        let mut state = self.state.lock().expect("mem lane poisoned");
128        if state.msgs.len() >= self.options.capacity {
129            return Err(LaneTxError::Full);
130        }
131        state.msgs.push_back(msg.to_vec());
132        Ok(())
133    }
134}
135
136impl LaneTxWith for MemEventsTx {
137    fn publish_with<F>(&mut self, f: F) -> Result<(), LaneTxError>
138    where
139        F: FnOnce(&mut [u8]) -> usize,
140    {
141        let mut slot = vec![0u8; self.options.max_message_len];
142        let n = f(&mut slot);
143        if n > self.options.max_message_len {
144            return Err(LaneTxError::TooLarge {
145                max: self.options.max_message_len,
146                len: n,
147            });
148        }
149        self.publish(&slot[..n])
150    }
151}
152
153impl LaneTxWithResult for MemEventsTx {
154    fn publish_with_result<F, E>(&mut self, f: F) -> Result<(), LaneTxWithError<E>>
155    where
156        F: FnOnce(&mut [u8]) -> Result<usize, E>,
157    {
158        let mut slot = vec![0u8; self.options.max_message_len];
159        let n = f(&mut slot).map_err(LaneTxWithError::Encode)?;
160        if n > self.options.max_message_len {
161            return Err(LaneTxWithError::Failed);
162        }
163        self.publish(&slot[..n]).map_err(Into::into)
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[test]
172    fn mem_events_channel_round_trips_fifo() {
173        let (mut tx, mut rx) = events_channel(4);
174        tx.publish(b"a").unwrap();
175        tx.publish(b"b").unwrap();
176
177        let mut buf = [0u8; 8];
178        let n = rx.recv(&mut buf).unwrap();
179        assert_eq!(&buf[..n], b"a");
180        let n = rx.recv(&mut buf).unwrap();
181        assert_eq!(&buf[..n], b"b");
182        assert!(rx.recv(&mut buf).is_none());
183    }
184
185    #[test]
186    fn mem_events_channel_reports_backpressure() {
187        let (mut tx, rx) = events_channel(1);
188        tx.publish(b"a").unwrap();
189        assert_eq!(tx.publish(b"b"), Err(LaneTxError::Full));
190        assert_eq!(rx.len(), 1);
191    }
192
193    #[test]
194    fn mem_events_channel_supports_borrow_and_slot_writes() {
195        let (mut tx, mut rx) = events_channel_with_options(MemLaneOptions {
196            capacity: 4,
197            max_message_len: 16,
198        });
199
200        tx.publish_with(|slot| {
201            slot[..3].copy_from_slice(b"abc");
202            3
203        })
204        .unwrap();
205
206        let n = rx.recv_with(|msg| msg.len()).unwrap().unwrap();
207        assert_eq!(n, 3);
208    }
209
210    #[test]
211    fn mem_events_channel_supports_fallible_slot_writes() {
212        let (mut tx, mut rx) = events_channel(4);
213        tx.publish_with_result(|slot| {
214            slot[..2].copy_from_slice(b"ok");
215            Ok::<usize, &'static str>(2)
216        })
217        .unwrap();
218
219        let msg = rx.recv_with(|msg| msg.to_vec()).unwrap().unwrap();
220        assert_eq!(msg, b"ok".to_vec());
221
222        let err = tx
223            .publish_with_result(|_slot| Err::<usize, _>("encode failed"))
224            .unwrap_err();
225        assert_eq!(err, LaneTxWithError::Encode("encode failed"));
226    }
227}