byteor_pipeline_backings_shm/
fanout.rs1use std::path::Path;
4
5use byteor_pipeline_kernel::{
6 LaneRx, LaneRxBorrow, LaneRxError, LaneTx, LaneTxError, LaneTxWith, LaneTxWithError,
7 LaneTxWithResult,
8};
9use indexbus_core::{RouteOnceResult, RouterMode, RouterSource};
10
11use crate::{AttachOptions, BackingError};
12
13pub struct ShmFanoutTx {
15 _region: indexbus_transport_shm::FanoutRegion<4>,
16 tx: indexbus_core::FanoutProducer<4>,
17}
18
19impl LaneTx for ShmFanoutTx {
20 fn publish(&mut self, bytes: &[u8]) -> Result<(), LaneTxError> {
21 match self.tx.publish(bytes) {
22 Ok(()) => Ok(()),
23 Err(indexbus_core::Error::Full) => Err(LaneTxError::Full),
24 Err(indexbus_core::Error::TooLarge { max, len }) => {
25 Err(LaneTxError::TooLarge { max, len })
26 }
27 Err(_) => Err(LaneTxError::Failed),
28 }
29 }
30}
31
32impl LaneTxWith for ShmFanoutTx {
33 fn publish_with<F>(&mut self, f: F) -> core::result::Result<(), LaneTxError>
34 where
35 F: FnOnce(&mut [u8]) -> usize,
36 {
37 match self
38 .tx
39 .publish_with(|slot| Ok::<usize, core::convert::Infallible>(f(slot)))
40 {
41 Ok(()) => Ok(()),
42 Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
43 Err(LaneTxError::Full)
44 }
45 Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::TooLarge {
46 max,
47 len,
48 })) => Err(LaneTxError::TooLarge { max, len }),
49 Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxError::Failed),
50 Err(indexbus_core::PublishWithError::Encode(_never)) => unreachable!(),
51 }
52 }
53}
54
55impl LaneTxWithResult for ShmFanoutTx {
56 fn publish_with_result<F, E>(&mut self, f: F) -> core::result::Result<(), LaneTxWithError<E>>
57 where
58 F: FnOnce(&mut [u8]) -> core::result::Result<usize, E>,
59 {
60 match self.tx.publish_with(f) {
61 Ok(()) => Ok(()),
62 Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
63 Err(LaneTxWithError::Full)
64 }
65 Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxWithError::Failed),
66 Err(indexbus_core::PublishWithError::Encode(e)) => Err(LaneTxWithError::Encode(e)),
67 }
68 }
69}
70
71impl ShmFanoutTx {
72 pub fn publish_with<F, E>(&self, f: F) -> Result<(), indexbus_core::PublishWithError<E>>
74 where
75 F: FnOnce(&mut [u8]) -> Result<usize, E>,
76 {
77 self.tx.publish_with(f)
78 }
79}
80
81pub struct ShmFanoutRouter {
86 _region: indexbus_transport_shm::FanoutRegion<4>,
87 router: indexbus_core::FanoutRouter<4>,
88}
89
90impl ShmFanoutRouter {
91 #[inline]
93 pub fn route_once(&self, mode: RouterMode) -> bool {
94 self.router.route_once_with(RouterSource::Spsc, mode)
95 }
96
97 #[inline]
99 pub fn route_once_with_stats(&self, mode: RouterMode) -> RouteOnceResult {
100 self.router.route_once_with_stats(RouterSource::Spsc, mode)
101 }
102
103 #[inline]
104 pub fn route_once_broadcast(&self) -> bool {
106 self.route_once(RouterMode::Broadcast)
107 }
108
109 #[inline]
110 pub fn route_once_workqueue(&self) -> bool {
112 self.route_once(RouterMode::WorkQueue)
113 }
114}
115
116pub struct ShmFanoutRx {
118 _region: indexbus_transport_shm::FanoutRegion<4>,
119 rx: indexbus_core::FanoutConsumer<4>,
120}
121
122impl LaneRx for ShmFanoutRx {
123 fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
124 self.rx.try_recv_into(buf).ok().flatten()
125 }
126}
127
128impl LaneRxBorrow for ShmFanoutRx {
129 fn recv_with<F, R>(&mut self, f: F) -> core::result::Result<Option<R>, LaneRxError>
130 where
131 F: FnOnce(&[u8]) -> R,
132 {
133 self.rx
134 .try_recv_with(|bytes| Ok::<R, core::convert::Infallible>(f(bytes)))
135 .map_err(|_| LaneRxError::Failed)
136 }
137}
138
139impl ShmFanoutRx {
140 pub fn try_recv_into(&self, out: &mut [u8]) -> Result<Option<usize>, indexbus_core::Error> {
142 self.rx.try_recv_into(out)
143 }
144
145 pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, indexbus_core::RecvWithError<E>>
147 where
148 F: FnOnce(&[u8]) -> Result<R, E>,
149 {
150 self.rx.try_recv_with(f)
151 }
152}
153
154fn transport_open_options(opts: &AttachOptions) -> indexbus_transport_shm::OpenOptions {
155 let mut o = indexbus_transport_shm::OpenOptions::new().blocking(opts.blocking);
156 if let Some(p) = &opts.path {
157 o = o.path(p.clone());
158 }
159 o
160}
161
162fn resolve_path<'a>(opts: &'a AttachOptions, lane: &'a str) -> Option<&'a Path> {
163 let _ = lane;
164 opts.path.as_deref()
165}
166
167pub fn attach_fanout_tx(opts: &AttachOptions, lane: &str) -> Result<ShmFanoutTx, BackingError> {
169 if lane.is_empty() {
170 return Err(BackingError::Invalid("lane name is empty"));
171 }
172
173 let o = transport_open_options(opts);
174 let mut region = if let Some(path) = resolve_path(opts, lane) {
175 indexbus_transport_shm::FanoutRegion::<4>::open_path_with(path, o)?
176 } else {
177 indexbus_transport_shm::FanoutRegion::<4>::open_with(lane, o)?
178 };
179
180 let (tx, _router, _rx) = region.handles(0)?;
181 Ok(ShmFanoutTx {
182 _region: region,
183 tx,
184 })
185}
186
187pub fn attach_fanout_router(
189 opts: &AttachOptions,
190 lane: &str,
191) -> Result<ShmFanoutRouter, BackingError> {
192 if lane.is_empty() {
193 return Err(BackingError::Invalid("lane name is empty"));
194 }
195
196 let o = transport_open_options(opts);
197 let mut region = if let Some(path) = resolve_path(opts, lane) {
198 indexbus_transport_shm::FanoutRegion::<4>::open_path_with(path, o)?
199 } else {
200 indexbus_transport_shm::FanoutRegion::<4>::open_with(lane, o)?
201 };
202
203 let (_tx, router, _rx) = region.handles(0)?;
204 Ok(ShmFanoutRouter {
205 _region: region,
206 router,
207 })
208}
209
210pub fn attach_fanout_rx(opts: &AttachOptions, lane: &str) -> Result<ShmFanoutRx, BackingError> {
212 if lane.is_empty() {
213 return Err(BackingError::Invalid("lane name is empty"));
214 }
215
216 let o = transport_open_options(opts);
217 let mut region = if let Some(path) = resolve_path(opts, lane) {
218 indexbus_transport_shm::FanoutRegion::<4>::open_path_with(path, o)?
219 } else {
220 indexbus_transport_shm::FanoutRegion::<4>::open_with(lane, o)?
221 };
222
223 let (_tx, _router, rx) = region.handles(opts.queue)?;
224 Ok(ShmFanoutRx {
225 _region: region,
226 rx,
227 })
228}