byteor_pipeline_backings_shm/
events_chain.rs

1//! SHM-backed EventsChain lanes.
2
3use std::path::Path;
4use std::sync::Arc;
5
6use byteor_pipeline_kernel::{
7    LaneRx, LaneRxBorrow, LaneRxError, LaneRxSlot, LaneTx, LaneTxError, LaneTxSlot,
8    LaneTxSlotError, LaneTxWith, LaneTxWithError, LaneTxWithResult,
9};
10
11use crate::{AttachOptions, BackingError};
12
13/// Shared `EventsChainLayout4` mapping that can be split into multiple queues.
14///
15/// Slot forwarding APIs (`try_recv_slot` / `publish_slot`) require that all
16/// participating producers/consumers share the same mmap, because the slot
17/// handle contains raw pointers into the mapping.
18pub struct ShmEventsChainRegion {
19    region: Arc<byteor_transport_shm::EventsChainRegion<4>>,
20}
21
22impl ShmEventsChainRegion {
23    /// Open (or create) an `EventsChainLayout4` mapping.
24    pub fn open(opts: &AttachOptions, lane: &str) -> Result<Self, BackingError> {
25        if opts.blocking {
26            return Err(BackingError::Unsupported(
27                "events-chain does not support blocking in v1",
28            ));
29        }
30        if lane.is_empty() {
31            return Err(BackingError::Invalid("lane name is empty"));
32        }
33
34        let o = transport_open_options(opts);
35        let region = if let Some(path) = resolve_path(opts, lane) {
36            byteor_transport_shm::EventsChainRegion::<4>::open_path_with(path, o)?
37        } else {
38            byteor_transport_shm::EventsChainRegion::<4>::open_with(lane, o)?
39        };
40
41        Ok(Self {
42            region: Arc::new(region),
43        })
44    }
45
46    /// Split SPSC queue `idx` into (tx, rx) lanes that share this mapping.
47    pub fn queue_spsc(&self, idx: usize) -> Result<(ShmEventsTx, ShmEventsRx), BackingError> {
48        let (tx, rx) = self.region.queue_spsc(idx)?;
49        Ok((
50            ShmEventsTx {
51                _region: self.region.clone(),
52                tx,
53            },
54            ShmEventsRx {
55                _region: self.region.clone(),
56                rx,
57            },
58        ))
59    }
60
61    /// Split MPSC queue `idx` into (tx, rx) lanes that share this mapping.
62    pub fn queue_mpsc(
63        &self,
64        idx: usize,
65    ) -> Result<(ShmEventsMpscTx, ShmEventsMpscRx), BackingError> {
66        let (tx, rx) = self.region.queue_mpsc(idx)?;
67        Ok((
68            ShmEventsMpscTx {
69                _region: self.region.clone(),
70                tx,
71            },
72            ShmEventsMpscRx {
73                _region: self.region.clone(),
74                rx,
75            },
76        ))
77    }
78}
79
80/// SHM-backed events-chain RX lane over `EventsChainLayout4`.
81pub struct ShmEventsRx {
82    _region: Arc<byteor_transport_shm::EventsChainRegion<4>>,
83    rx: indexbus_core::ChainSpscReceiver,
84}
85
86impl LaneRx for ShmEventsRx {
87    fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
88        self.rx.recv(buf)
89    }
90}
91
92impl LaneRxBorrow for ShmEventsRx {
93    fn recv_with<F, R>(&mut self, f: F) -> core::result::Result<Option<R>, LaneRxError>
94    where
95        F: FnOnce(&[u8]) -> R,
96    {
97        self.rx
98            .try_recv_with(|bytes| Ok::<R, core::convert::Infallible>(f(bytes)))
99            .map_err(|_| LaneRxError::Failed)
100    }
101}
102
103impl LaneRxSlot for ShmEventsRx {
104    type Slot = indexbus_core::EventsSlot;
105
106    fn try_recv_slot(&mut self) -> core::result::Result<Option<Self::Slot>, LaneRxError> {
107        self.rx.try_recv_slot().map_err(|_| LaneRxError::Failed)
108    }
109}
110
111impl ShmEventsRx {
112    /// Receive by borrowing the slot bytes for the duration of `f`.
113    pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, indexbus_core::RecvWithError<E>>
114    where
115        F: FnOnce(&[u8]) -> Result<R, E>,
116    {
117        self.rx.try_recv_with(f)
118    }
119
120    /// Receive a message as an owned slot handle (for slot forwarding).
121    pub fn try_recv_slot(&self) -> Result<Option<indexbus_core::EventsSlot>, indexbus_core::Error> {
122        self.rx.try_recv_slot()
123    }
124}
125
126/// SHM-backed events-chain TX lane over `EventsChainLayout4`.
127pub struct ShmEventsTx {
128    _region: Arc<byteor_transport_shm::EventsChainRegion<4>>,
129    tx: indexbus_core::ChainSpscSender,
130}
131
132impl LaneTx for ShmEventsTx {
133    fn publish(&mut self, bytes: &[u8]) -> Result<(), LaneTxError> {
134        match self.tx.publish(bytes) {
135            Ok(()) => Ok(()),
136            Err(indexbus_core::Error::Full) => Err(LaneTxError::Full),
137            Err(indexbus_core::Error::TooLarge { max, len }) => {
138                Err(LaneTxError::TooLarge { max, len })
139            }
140            Err(_) => Err(LaneTxError::Failed),
141        }
142    }
143}
144
145impl LaneTxWith for ShmEventsTx {
146    fn publish_with<F>(&mut self, f: F) -> core::result::Result<(), LaneTxError>
147    where
148        F: FnOnce(&mut [u8]) -> usize,
149    {
150        match self
151            .tx
152            .publish_with(|slot| Ok::<usize, core::convert::Infallible>(f(slot)))
153        {
154            Ok(()) => Ok(()),
155            Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
156                Err(LaneTxError::Full)
157            }
158            Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::TooLarge {
159                max,
160                len,
161            })) => Err(LaneTxError::TooLarge { max, len }),
162            Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxError::Failed),
163            Err(indexbus_core::PublishWithError::Encode(_never)) => unreachable!(),
164        }
165    }
166}
167
168impl LaneTxWithResult for ShmEventsTx {
169    fn publish_with_result<F, E>(&mut self, f: F) -> core::result::Result<(), LaneTxWithError<E>>
170    where
171        F: FnOnce(&mut [u8]) -> core::result::Result<usize, E>,
172    {
173        match self.tx.publish_with(f) {
174            Ok(()) => Ok(()),
175            Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
176                Err(LaneTxWithError::Full)
177            }
178            Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxWithError::Failed),
179            Err(indexbus_core::PublishWithError::Encode(e)) => Err(LaneTxWithError::Encode(e)),
180        }
181    }
182}
183
184impl LaneTxSlot<indexbus_core::EventsSlot> for ShmEventsTx {
185    fn publish_slot(
186        &mut self,
187        slot: indexbus_core::EventsSlot,
188    ) -> core::result::Result<(), LaneTxSlotError<indexbus_core::EventsSlot>> {
189        match self.tx.try_publish_slot(slot) {
190            Ok(()) => Ok(()),
191            Err(e) => match e.error {
192                indexbus_core::Error::Full => Err(LaneTxSlotError::Full(e.slot)),
193                indexbus_core::Error::IncompatibleLayout => {
194                    Err(LaneTxSlotError::Incompatible(e.slot))
195                }
196                _ => Err(LaneTxSlotError::Failed(e.slot)),
197            },
198        }
199    }
200}
201
202impl ShmEventsTx {
203    /// Publish by writing directly into the backing slot buffer.
204    pub fn publish_with<F, E>(&self, f: F) -> Result<(), indexbus_core::PublishWithError<E>>
205    where
206        F: FnOnce(&mut [u8]) -> Result<usize, E>,
207    {
208        self.tx.publish_with(f)
209    }
210
211    /// Publish a previously-received slot (slot forwarding).
212    pub fn publish_slot(&self, slot: indexbus_core::EventsSlot) -> Result<(), LaneTxError> {
213        match self.tx.publish_slot(slot) {
214            Ok(()) => Ok(()),
215            Err(indexbus_core::Error::Full) => Err(LaneTxError::Full),
216            Err(_) => Err(LaneTxError::Failed),
217        }
218    }
219}
220
221/// SHM-backed events-chain RX lane (MPSC) over `EventsChainLayout4`.
222pub struct ShmEventsMpscRx {
223    _region: Arc<byteor_transport_shm::EventsChainRegion<4>>,
224    rx: indexbus_core::ChainMpscConsumer,
225}
226
227impl LaneRx for ShmEventsMpscRx {
228    fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
229        self.rx.recv(buf)
230    }
231}
232
233impl LaneRxBorrow for ShmEventsMpscRx {
234    fn recv_with<F, R>(&mut self, f: F) -> core::result::Result<Option<R>, LaneRxError>
235    where
236        F: FnOnce(&[u8]) -> R,
237    {
238        self.rx
239            .try_recv_with(|bytes| Ok::<R, core::convert::Infallible>(f(bytes)))
240            .map_err(|_| LaneRxError::Failed)
241    }
242}
243
244impl LaneRxSlot for ShmEventsMpscRx {
245    type Slot = indexbus_core::EventsSlot;
246
247    fn try_recv_slot(&mut self) -> core::result::Result<Option<Self::Slot>, LaneRxError> {
248        self.rx.try_recv_slot().map_err(|_| LaneRxError::Failed)
249    }
250}
251
252impl ShmEventsMpscRx {
253    /// Receive by borrowing the slot bytes for the duration of `f`.
254    pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, indexbus_core::RecvWithError<E>>
255    where
256        F: FnOnce(&[u8]) -> Result<R, E>,
257    {
258        self.rx.try_recv_with(f)
259    }
260
261    /// Receive a message as an owned slot handle (for slot forwarding).
262    pub fn try_recv_slot(&self) -> Result<Option<indexbus_core::EventsSlot>, indexbus_core::Error> {
263        self.rx.try_recv_slot()
264    }
265}
266
267/// SHM-backed events-chain TX lane (MPSC) over `EventsChainLayout4`.
268pub struct ShmEventsMpscTx {
269    _region: Arc<byteor_transport_shm::EventsChainRegion<4>>,
270    tx: indexbus_core::ChainMpscProducer,
271}
272
273impl LaneTx for ShmEventsMpscTx {
274    fn publish(&mut self, bytes: &[u8]) -> Result<(), LaneTxError> {
275        match self.tx.publish(bytes) {
276            Ok(()) => Ok(()),
277            Err(indexbus_core::Error::Full) => Err(LaneTxError::Full),
278            Err(indexbus_core::Error::TooLarge { max, len }) => {
279                Err(LaneTxError::TooLarge { max, len })
280            }
281            Err(_) => Err(LaneTxError::Failed),
282        }
283    }
284}
285
286impl LaneTxWith for ShmEventsMpscTx {
287    fn publish_with<F>(&mut self, f: F) -> core::result::Result<(), LaneTxError>
288    where
289        F: FnOnce(&mut [u8]) -> usize,
290    {
291        match self
292            .tx
293            .publish_with(|slot| Ok::<usize, core::convert::Infallible>(f(slot)))
294        {
295            Ok(()) => Ok(()),
296            Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
297                Err(LaneTxError::Full)
298            }
299            Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::TooLarge {
300                max,
301                len,
302            })) => Err(LaneTxError::TooLarge { max, len }),
303            Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxError::Failed),
304            Err(indexbus_core::PublishWithError::Encode(_never)) => unreachable!(),
305        }
306    }
307}
308
309impl LaneTxWithResult for ShmEventsMpscTx {
310    fn publish_with_result<F, E>(&mut self, f: F) -> core::result::Result<(), LaneTxWithError<E>>
311    where
312        F: FnOnce(&mut [u8]) -> core::result::Result<usize, E>,
313    {
314        match self.tx.publish_with(f) {
315            Ok(()) => Ok(()),
316            Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
317                Err(LaneTxWithError::Full)
318            }
319            Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxWithError::Failed),
320            Err(indexbus_core::PublishWithError::Encode(e)) => Err(LaneTxWithError::Encode(e)),
321        }
322    }
323}
324
325impl LaneTxSlot<indexbus_core::EventsSlot> for ShmEventsMpscTx {
326    fn publish_slot(
327        &mut self,
328        slot: indexbus_core::EventsSlot,
329    ) -> core::result::Result<(), LaneTxSlotError<indexbus_core::EventsSlot>> {
330        match self.tx.try_publish_slot(slot) {
331            Ok(()) => Ok(()),
332            Err(e) => match e.error {
333                indexbus_core::Error::Full => Err(LaneTxSlotError::Full(e.slot)),
334                indexbus_core::Error::IncompatibleLayout => {
335                    Err(LaneTxSlotError::Incompatible(e.slot))
336                }
337                _ => Err(LaneTxSlotError::Failed(e.slot)),
338            },
339        }
340    }
341}
342
343impl ShmEventsMpscTx {
344    /// Publish by writing directly into the backing slot buffer.
345    pub fn publish_with<F, E>(&self, f: F) -> Result<(), indexbus_core::PublishWithError<E>>
346    where
347        F: FnOnce(&mut [u8]) -> Result<usize, E>,
348    {
349        self.tx.publish_with(f)
350    }
351
352    /// Publish a previously-received slot (slot forwarding).
353    pub fn publish_slot(&self, slot: indexbus_core::EventsSlot) -> Result<(), LaneTxError> {
354        match self.tx.publish_slot(slot) {
355            Ok(()) => Ok(()),
356            Err(indexbus_core::Error::Full) => Err(LaneTxError::Full),
357            Err(_) => Err(LaneTxError::Failed),
358        }
359    }
360}
361
362fn transport_open_options(opts: &AttachOptions) -> byteor_transport_shm::OpenOptions {
363    let mut o = byteor_transport_shm::OpenOptions::new()
364        .blocking(opts.blocking)
365        .prefault(opts.prefault);
366    if let Some(p) = &opts.path {
367        o = o.path(p.clone());
368    }
369    o
370}
371
372fn resolve_path<'a>(opts: &'a AttachOptions, lane: &'a str) -> Option<&'a Path> {
373    let _ = lane;
374    opts.path.as_deref()
375}
376
377/// Attach to an EventsChain RX lane (queue `opts.queue`, default 0).
378pub fn attach_events_rx(opts: &AttachOptions, lane: &str) -> Result<ShmEventsRx, BackingError> {
379    if opts.blocking {
380        return Err(BackingError::Unsupported(
381            "events-chain does not support blocking in v1",
382        ));
383    }
384    if lane.is_empty() {
385        return Err(BackingError::Invalid("lane name is empty"));
386    }
387
388    let region = ShmEventsChainRegion::open(opts, lane)?.region;
389
390    let (_tx, rx) = region.queue_spsc(opts.queue)?;
391    Ok(ShmEventsRx {
392        _region: region,
393        rx,
394    })
395}
396
397/// Attach to an EventsChain TX lane (queue `opts.queue`, default 0).
398pub fn attach_events_tx(opts: &AttachOptions, lane: &str) -> Result<ShmEventsTx, BackingError> {
399    if opts.blocking {
400        return Err(BackingError::Unsupported(
401            "events-chain does not support blocking in v1",
402        ));
403    }
404    if lane.is_empty() {
405        return Err(BackingError::Invalid("lane name is empty"));
406    }
407
408    let region = ShmEventsChainRegion::open(opts, lane)?.region;
409
410    let (tx, _rx) = region.queue_spsc(opts.queue)?;
411    Ok(ShmEventsTx {
412        _region: region,
413        tx,
414    })
415}
416
417/// Attach to an EventsChain RX lane backed by an MPSC queue (queue `opts.queue`, default 0).
418pub fn attach_events_mpsc_rx(
419    opts: &AttachOptions,
420    lane: &str,
421) -> Result<ShmEventsMpscRx, BackingError> {
422    if opts.blocking {
423        return Err(BackingError::Unsupported(
424            "events-chain does not support blocking in v1",
425        ));
426    }
427    if lane.is_empty() {
428        return Err(BackingError::Invalid("lane name is empty"));
429    }
430
431    let region = ShmEventsChainRegion::open(opts, lane)?.region;
432
433    let (_tx, rx) = region.queue_mpsc(opts.queue)?;
434    Ok(ShmEventsMpscRx {
435        _region: region,
436        rx,
437    })
438}
439
440/// Attach to an EventsChain TX lane backed by an MPSC queue (queue `opts.queue`, default 0).
441pub fn attach_events_mpsc_tx(
442    opts: &AttachOptions,
443    lane: &str,
444) -> Result<ShmEventsMpscTx, BackingError> {
445    if opts.blocking {
446        return Err(BackingError::Unsupported(
447            "events-chain does not support blocking in v1",
448        ));
449    }
450    if lane.is_empty() {
451        return Err(BackingError::Invalid("lane name is empty"));
452    }
453
454    let region = ShmEventsChainRegion::open(opts, lane)?.region;
455
456    let (tx, _rx) = region.queue_mpsc(opts.queue)?;
457    Ok(ShmEventsMpscTx {
458        _region: region,
459        tx,
460    })
461}