byteor_pipeline_backings_shm/
fanout.rs

1//! SHM-backed fanout lanes.
2
3use std::path::Path;
4
5use byteor_pipeline_kernel::{
6    LaneRx, LaneRxBorrow, LaneRxError, LaneTx, LaneTxError, LaneTxWith, LaneTxWithError,
7    LaneTxWithResult,
8};
9use indexbus_core::{RouteOnceResult, RouterMode, RouterSource};
10
11use crate::{AttachOptions, BackingError};
12
13/// SHM-backed fanout TX lane.
14pub struct ShmFanoutTx {
15    _region: indexbus_transport_shm::FanoutRegion<4>,
16    tx: indexbus_core::FanoutProducer<4>,
17}
18
19impl LaneTx for ShmFanoutTx {
20    fn publish(&mut self, bytes: &[u8]) -> Result<(), LaneTxError> {
21        match self.tx.publish(bytes) {
22            Ok(()) => Ok(()),
23            Err(indexbus_core::Error::Full) => Err(LaneTxError::Full),
24            Err(indexbus_core::Error::TooLarge { max, len }) => {
25                Err(LaneTxError::TooLarge { max, len })
26            }
27            Err(_) => Err(LaneTxError::Failed),
28        }
29    }
30}
31
32impl LaneTxWith for ShmFanoutTx {
33    fn publish_with<F>(&mut self, f: F) -> core::result::Result<(), LaneTxError>
34    where
35        F: FnOnce(&mut [u8]) -> usize,
36    {
37        match self
38            .tx
39            .publish_with(|slot| Ok::<usize, core::convert::Infallible>(f(slot)))
40        {
41            Ok(()) => Ok(()),
42            Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
43                Err(LaneTxError::Full)
44            }
45            Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::TooLarge {
46                max,
47                len,
48            })) => Err(LaneTxError::TooLarge { max, len }),
49            Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxError::Failed),
50            Err(indexbus_core::PublishWithError::Encode(_never)) => unreachable!(),
51        }
52    }
53}
54
55impl LaneTxWithResult for ShmFanoutTx {
56    fn publish_with_result<F, E>(&mut self, f: F) -> core::result::Result<(), LaneTxWithError<E>>
57    where
58        F: FnOnce(&mut [u8]) -> core::result::Result<usize, E>,
59    {
60        match self.tx.publish_with(f) {
61            Ok(()) => Ok(()),
62            Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
63                Err(LaneTxWithError::Full)
64            }
65            Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxWithError::Failed),
66            Err(indexbus_core::PublishWithError::Encode(e)) => Err(LaneTxWithError::Encode(e)),
67        }
68    }
69}
70
71impl ShmFanoutTx {
72    /// Publish by writing directly into the backing slot buffer.
73    pub fn publish_with<F, E>(&self, f: F) -> Result<(), indexbus_core::PublishWithError<E>>
74    where
75        F: FnOnce(&mut [u8]) -> Result<usize, E>,
76    {
77        self.tx.publish_with(f)
78    }
79}
80
81/// SHM-backed fanout router.
82///
83/// This is intentionally separate from producers/consumers to enforce the v1 role separation:
84/// the router role is the only component that touches fanout routing.
85pub struct ShmFanoutRouter {
86    _region: indexbus_transport_shm::FanoutRegion<4>,
87    router: indexbus_core::FanoutRouter<4>,
88}
89
90impl ShmFanoutRouter {
91    /// Route one event from the SPSC producer queue.
92    #[inline]
93    pub fn route_once(&self, mode: RouterMode) -> bool {
94        self.router.route_once_with(RouterSource::Spsc, mode)
95    }
96
97    /// Route one event from the SPSC producer queue and return delivery statistics.
98    #[inline]
99    pub fn route_once_with_stats(&self, mode: RouterMode) -> RouteOnceResult {
100        self.router.route_once_with_stats(RouterSource::Spsc, mode)
101    }
102
103    #[inline]
104    /// Convenience: broadcast one event.
105    pub fn route_once_broadcast(&self) -> bool {
106        self.route_once(RouterMode::Broadcast)
107    }
108
109    #[inline]
110    /// Convenience: work-queue one event.
111    pub fn route_once_workqueue(&self) -> bool {
112        self.route_once(RouterMode::WorkQueue)
113    }
114}
115
116/// SHM-backed fanout RX lane.
117pub struct ShmFanoutRx {
118    _region: indexbus_transport_shm::FanoutRegion<4>,
119    rx: indexbus_core::FanoutConsumer<4>,
120}
121
122impl LaneRx for ShmFanoutRx {
123    fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
124        self.rx.try_recv_into(buf).ok().flatten()
125    }
126}
127
128impl LaneRxBorrow for ShmFanoutRx {
129    fn recv_with<F, R>(&mut self, f: F) -> core::result::Result<Option<R>, LaneRxError>
130    where
131        F: FnOnce(&[u8]) -> R,
132    {
133        self.rx
134            .try_recv_with(|bytes| Ok::<R, core::convert::Infallible>(f(bytes)))
135            .map_err(|_| LaneRxError::Failed)
136    }
137}
138
139impl ShmFanoutRx {
140    /// Attempt to receive the next message into `out`.
141    pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, indexbus_core::Error> {
142        self.rx.try_recv_into(out)
143    }
144
145    /// Receive by borrowing the slot bytes for the duration of `f`.
146    pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, indexbus_core::RecvWithError<E>>
147    where
148        F: FnOnce(&[u8]) -> Result<R, E>,
149    {
150        self.rx.try_recv_with(f)
151    }
152}
153
154fn transport_open_options(opts: &AttachOptions) -> indexbus_transport_shm::OpenOptions {
155    let mut o = indexbus_transport_shm::OpenOptions::new().blocking(opts.blocking);
156    if let Some(p) = &opts.path {
157        o = o.path(p.clone());
158    }
159    o
160}
161
162fn resolve_path<'a>(opts: &'a AttachOptions, lane: &'a str) -> Option<&'a Path> {
163    let _ = lane;
164    opts.path.as_deref()
165}
166
167/// Attach to a fanout TX lane.
168pub fn attach_fanout_tx(opts: &AttachOptions, lane: &str) -> Result<ShmFanoutTx, BackingError> {
169    if lane.is_empty() {
170        return Err(BackingError::Invalid("lane name is empty"));
171    }
172
173    let o = transport_open_options(opts);
174    let mut region = if let Some(path) = resolve_path(opts, lane) {
175        indexbus_transport_shm::FanoutRegion::<4>::open_path_with(path, o)?
176    } else {
177        indexbus_transport_shm::FanoutRegion::<4>::open_with(lane, o)?
178    };
179
180    let (tx, _router, _rx) = region.handles(0)?;
181    Ok(ShmFanoutTx {
182        _region: region,
183        tx,
184    })
185}
186
187/// Attach to a fanout router.
188pub fn attach_fanout_router(
189    opts: &AttachOptions,
190    lane: &str,
191) -> Result<ShmFanoutRouter, BackingError> {
192    if lane.is_empty() {
193        return Err(BackingError::Invalid("lane name is empty"));
194    }
195
196    let o = transport_open_options(opts);
197    let mut region = if let Some(path) = resolve_path(opts, lane) {
198        indexbus_transport_shm::FanoutRegion::<4>::open_path_with(path, o)?
199    } else {
200        indexbus_transport_shm::FanoutRegion::<4>::open_with(lane, o)?
201    };
202
203    let (_tx, router, _rx) = region.handles(0)?;
204    Ok(ShmFanoutRouter {
205        _region: region,
206        router,
207    })
208}
209
210/// Attach to a fanout RX lane for consumer `opts.queue`.
211pub fn attach_fanout_rx(opts: &AttachOptions, lane: &str) -> Result<ShmFanoutRx, BackingError> {
212    if lane.is_empty() {
213        return Err(BackingError::Invalid("lane name is empty"));
214    }
215
216    let o = transport_open_options(opts);
217    let mut region = if let Some(path) = resolve_path(opts, lane) {
218        indexbus_transport_shm::FanoutRegion::<4>::open_path_with(path, o)?
219    } else {
220        indexbus_transport_shm::FanoutRegion::<4>::open_with(lane, o)?
221    };
222
223    let (_tx, _router, rx) = region.handles(opts.queue)?;
224    Ok(ShmFanoutRx {
225        _region: region,
226        rx,
227    })
228}