byteor_pipeline_backings_mem/
lib.rs1#![deny(missing_docs)]
2#![deny(unreachable_pub, rust_2018_idioms)]
3#![forbid(unsafe_code)]
4
5use std::collections::VecDeque;
11use std::sync::{Arc, Mutex};
12
13use byteor_pipeline_kernel::{
14 LaneRx, LaneRxBorrow, LaneRxError, LaneTx, LaneTxError, LaneTxWith, LaneTxWithError,
15 LaneTxWithResult,
16};
17
18#[derive(Clone, Copy, Debug, PartialEq, Eq)]
20pub struct MemLaneOptions {
21 pub capacity: usize,
23 pub max_message_len: usize,
25}
26
27impl Default for MemLaneOptions {
28 fn default() -> Self {
29 Self {
30 capacity: 1024,
31 max_message_len: indexbus_core::INDEXBUS_SLOT_DATA_SIZE,
32 }
33 }
34}
35
36#[derive(Default)]
37struct QueueState {
38 msgs: VecDeque<Vec<u8>>,
39}
40
41#[derive(Clone)]
43pub struct MemEventsTx {
44 state: Arc<Mutex<QueueState>>,
45 options: MemLaneOptions,
46}
47
48#[derive(Clone)]
50pub struct MemEventsRx {
51 state: Arc<Mutex<QueueState>>,
52}
53
54pub fn events_channel(capacity: usize) -> (MemEventsTx, MemEventsRx) {
56 events_channel_with_options(MemLaneOptions {
57 capacity,
58 ..MemLaneOptions::default()
59 })
60}
61
62pub fn events_channel_with_options(options: MemLaneOptions) -> (MemEventsTx, MemEventsRx) {
64 let state = Arc::new(Mutex::new(QueueState::default()));
65 (
66 MemEventsTx {
67 state: Arc::clone(&state),
68 options,
69 },
70 MemEventsRx { state },
71 )
72}
73
74impl MemEventsRx {
75 pub fn len(&self) -> usize {
77 self.state.lock().expect("mem lane poisoned").msgs.len()
78 }
79
80 pub fn is_empty(&self) -> bool {
82 self.len() == 0
83 }
84}
85
86impl LaneRx for MemEventsRx {
87 fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
88 let msg = self
89 .state
90 .lock()
91 .expect("mem lane poisoned")
92 .msgs
93 .pop_front()?;
94 let n = msg.len().min(buf.len());
95 buf[..n].copy_from_slice(&msg[..n]);
96 Some(n)
97 }
98}
99
100impl LaneRxBorrow for MemEventsRx {
101 fn recv_with<F, R>(&mut self, f: F) -> Result<Option<R>, LaneRxError>
102 where
103 F: FnOnce(&[u8]) -> R,
104 {
105 let Some(msg) = self
106 .state
107 .lock()
108 .expect("mem lane poisoned")
109 .msgs
110 .pop_front()
111 else {
112 return Ok(None);
113 };
114 Ok(Some(f(&msg)))
115 }
116}
117
118impl LaneTx for MemEventsTx {
119 fn publish(&mut self, msg: &[u8]) -> Result<(), LaneTxError> {
120 if msg.len() > self.options.max_message_len {
121 return Err(LaneTxError::TooLarge {
122 max: self.options.max_message_len,
123 len: msg.len(),
124 });
125 }
126
127 let mut state = self.state.lock().expect("mem lane poisoned");
128 if state.msgs.len() >= self.options.capacity {
129 return Err(LaneTxError::Full);
130 }
131 state.msgs.push_back(msg.to_vec());
132 Ok(())
133 }
134}
135
136impl LaneTxWith for MemEventsTx {
137 fn publish_with<F>(&mut self, f: F) -> Result<(), LaneTxError>
138 where
139 F: FnOnce(&mut [u8]) -> usize,
140 {
141 let mut slot = vec![0u8; self.options.max_message_len];
142 let n = f(&mut slot);
143 if n > self.options.max_message_len {
144 return Err(LaneTxError::TooLarge {
145 max: self.options.max_message_len,
146 len: n,
147 });
148 }
149 self.publish(&slot[..n])
150 }
151}
152
153impl LaneTxWithResult for MemEventsTx {
154 fn publish_with_result<F, E>(&mut self, f: F) -> Result<(), LaneTxWithError<E>>
155 where
156 F: FnOnce(&mut [u8]) -> Result<usize, E>,
157 {
158 let mut slot = vec![0u8; self.options.max_message_len];
159 let n = f(&mut slot).map_err(LaneTxWithError::Encode)?;
160 if n > self.options.max_message_len {
161 return Err(LaneTxWithError::Failed);
162 }
163 self.publish(&slot[..n]).map_err(Into::into)
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[test]
172 fn mem_events_channel_round_trips_fifo() {
173 let (mut tx, mut rx) = events_channel(4);
174 tx.publish(b"a").unwrap();
175 tx.publish(b"b").unwrap();
176
177 let mut buf = [0u8; 8];
178 let n = rx.recv(&mut buf).unwrap();
179 assert_eq!(&buf[..n], b"a");
180 let n = rx.recv(&mut buf).unwrap();
181 assert_eq!(&buf[..n], b"b");
182 assert!(rx.recv(&mut buf).is_none());
183 }
184
185 #[test]
186 fn mem_events_channel_reports_backpressure() {
187 let (mut tx, rx) = events_channel(1);
188 tx.publish(b"a").unwrap();
189 assert_eq!(tx.publish(b"b"), Err(LaneTxError::Full));
190 assert_eq!(rx.len(), 1);
191 }
192
193 #[test]
194 fn mem_events_channel_supports_borrow_and_slot_writes() {
195 let (mut tx, mut rx) = events_channel_with_options(MemLaneOptions {
196 capacity: 4,
197 max_message_len: 16,
198 });
199
200 tx.publish_with(|slot| {
201 slot[..3].copy_from_slice(b"abc");
202 3
203 })
204 .unwrap();
205
206 let n = rx.recv_with(|msg| msg.len()).unwrap().unwrap();
207 assert_eq!(n, 3);
208 }
209
210 #[test]
211 fn mem_events_channel_supports_fallible_slot_writes() {
212 let (mut tx, mut rx) = events_channel(4);
213 tx.publish_with_result(|slot| {
214 slot[..2].copy_from_slice(b"ok");
215 Ok::<usize, &'static str>(2)
216 })
217 .unwrap();
218
219 let msg = rx.recv_with(|msg| msg.to_vec()).unwrap().unwrap();
220 assert_eq!(msg, b"ok".to_vec());
221
222 let err = tx
223 .publish_with_result(|_slot| Err::<usize, _>("encode failed"))
224 .unwrap_err();
225 assert_eq!(err, LaneTxWithError::Encode("encode failed"));
226 }
227}