1use std::path::Path;
4use std::sync::Arc;
5
6use byteor_pipeline_kernel::{
7 LaneRx, LaneRxBorrow, LaneRxError, LaneRxSlot, LaneTx, LaneTxError, LaneTxSlot,
8 LaneTxSlotError, LaneTxWith, LaneTxWithError, LaneTxWithResult,
9};
10
11use crate::{AttachOptions, BackingError};
12
13pub struct ShmEventsChainRegion {
19 region: Arc<byteor_transport_shm::EventsChainRegion<4>>,
20}
21
22impl ShmEventsChainRegion {
23 pub fn open(opts: &AttachOptions, lane: &str) -> Result<Self, BackingError> {
25 if opts.blocking {
26 return Err(BackingError::Unsupported(
27 "events-chain does not support blocking in v1",
28 ));
29 }
30 if lane.is_empty() {
31 return Err(BackingError::Invalid("lane name is empty"));
32 }
33
34 let o = transport_open_options(opts);
35 let region = if let Some(path) = resolve_path(opts, lane) {
36 byteor_transport_shm::EventsChainRegion::<4>::open_path_with(path, o)?
37 } else {
38 byteor_transport_shm::EventsChainRegion::<4>::open_with(lane, o)?
39 };
40
41 Ok(Self {
42 region: Arc::new(region),
43 })
44 }
45
46 pub fn queue_spsc(&self, idx: usize) -> Result<(ShmEventsTx, ShmEventsRx), BackingError> {
48 let (tx, rx) = self.region.queue_spsc(idx)?;
49 Ok((
50 ShmEventsTx {
51 _region: self.region.clone(),
52 tx,
53 },
54 ShmEventsRx {
55 _region: self.region.clone(),
56 rx,
57 },
58 ))
59 }
60
61 pub fn queue_mpsc(
63 &self,
64 idx: usize,
65 ) -> Result<(ShmEventsMpscTx, ShmEventsMpscRx), BackingError> {
66 let (tx, rx) = self.region.queue_mpsc(idx)?;
67 Ok((
68 ShmEventsMpscTx {
69 _region: self.region.clone(),
70 tx,
71 },
72 ShmEventsMpscRx {
73 _region: self.region.clone(),
74 rx,
75 },
76 ))
77 }
78}
79
80pub struct ShmEventsRx {
82 _region: Arc<byteor_transport_shm::EventsChainRegion<4>>,
83 rx: indexbus_core::ChainSpscReceiver,
84}
85
86impl LaneRx for ShmEventsRx {
87 fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
88 self.rx.recv(buf)
89 }
90}
91
92impl LaneRxBorrow for ShmEventsRx {
93 fn recv_with<F, R>(&mut self, f: F) -> core::result::Result<Option<R>, LaneRxError>
94 where
95 F: FnOnce(&[u8]) -> R,
96 {
97 self.rx
98 .try_recv_with(|bytes| Ok::<R, core::convert::Infallible>(f(bytes)))
99 .map_err(|_| LaneRxError::Failed)
100 }
101}
102
103impl LaneRxSlot for ShmEventsRx {
104 type Slot = indexbus_core::EventsSlot;
105
106 fn try_recv_slot(&mut self) -> core::result::Result<Option<Self::Slot>, LaneRxError> {
107 self.rx.try_recv_slot().map_err(|_| LaneRxError::Failed)
108 }
109}
110
111impl ShmEventsRx {
112 pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, indexbus_core::RecvWithError<E>>
114 where
115 F: FnOnce(&[u8]) -> Result<R, E>,
116 {
117 self.rx.try_recv_with(f)
118 }
119
120 pub fn try_recv_slot(&self) -> Result<Option<indexbus_core::EventsSlot>, indexbus_core::Error> {
122 self.rx.try_recv_slot()
123 }
124}
125
126pub struct ShmEventsTx {
128 _region: Arc<byteor_transport_shm::EventsChainRegion<4>>,
129 tx: indexbus_core::ChainSpscSender,
130}
131
132impl LaneTx for ShmEventsTx {
133 fn publish(&mut self, bytes: &[u8]) -> Result<(), LaneTxError> {
134 match self.tx.publish(bytes) {
135 Ok(()) => Ok(()),
136 Err(indexbus_core::Error::Full) => Err(LaneTxError::Full),
137 Err(indexbus_core::Error::TooLarge { max, len }) => {
138 Err(LaneTxError::TooLarge { max, len })
139 }
140 Err(_) => Err(LaneTxError::Failed),
141 }
142 }
143}
144
145impl LaneTxWith for ShmEventsTx {
146 fn publish_with<F>(&mut self, f: F) -> core::result::Result<(), LaneTxError>
147 where
148 F: FnOnce(&mut [u8]) -> usize,
149 {
150 match self
151 .tx
152 .publish_with(|slot| Ok::<usize, core::convert::Infallible>(f(slot)))
153 {
154 Ok(()) => Ok(()),
155 Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
156 Err(LaneTxError::Full)
157 }
158 Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::TooLarge {
159 max,
160 len,
161 })) => Err(LaneTxError::TooLarge { max, len }),
162 Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxError::Failed),
163 Err(indexbus_core::PublishWithError::Encode(_never)) => unreachable!(),
164 }
165 }
166}
167
168impl LaneTxWithResult for ShmEventsTx {
169 fn publish_with_result<F, E>(&mut self, f: F) -> core::result::Result<(), LaneTxWithError<E>>
170 where
171 F: FnOnce(&mut [u8]) -> core::result::Result<usize, E>,
172 {
173 match self.tx.publish_with(f) {
174 Ok(()) => Ok(()),
175 Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
176 Err(LaneTxWithError::Full)
177 }
178 Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxWithError::Failed),
179 Err(indexbus_core::PublishWithError::Encode(e)) => Err(LaneTxWithError::Encode(e)),
180 }
181 }
182}
183
184impl LaneTxSlot<indexbus_core::EventsSlot> for ShmEventsTx {
185 fn publish_slot(
186 &mut self,
187 slot: indexbus_core::EventsSlot,
188 ) -> core::result::Result<(), LaneTxSlotError<indexbus_core::EventsSlot>> {
189 match self.tx.try_publish_slot(slot) {
190 Ok(()) => Ok(()),
191 Err(e) => match e.error {
192 indexbus_core::Error::Full => Err(LaneTxSlotError::Full(e.slot)),
193 indexbus_core::Error::IncompatibleLayout => {
194 Err(LaneTxSlotError::Incompatible(e.slot))
195 }
196 _ => Err(LaneTxSlotError::Failed(e.slot)),
197 },
198 }
199 }
200}
201
202impl ShmEventsTx {
203 pub fn publish_with<F, E>(&self, f: F) -> Result<(), indexbus_core::PublishWithError<E>>
205 where
206 F: FnOnce(&mut [u8]) -> Result<usize, E>,
207 {
208 self.tx.publish_with(f)
209 }
210
211 pub fn publish_slot(&self, slot: indexbus_core::EventsSlot) -> Result<(), LaneTxError> {
213 match self.tx.publish_slot(slot) {
214 Ok(()) => Ok(()),
215 Err(indexbus_core::Error::Full) => Err(LaneTxError::Full),
216 Err(_) => Err(LaneTxError::Failed),
217 }
218 }
219}
220
221pub struct ShmEventsMpscRx {
223 _region: Arc<byteor_transport_shm::EventsChainRegion<4>>,
224 rx: indexbus_core::ChainMpscConsumer,
225}
226
227impl LaneRx for ShmEventsMpscRx {
228 fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
229 self.rx.recv(buf)
230 }
231}
232
233impl LaneRxBorrow for ShmEventsMpscRx {
234 fn recv_with<F, R>(&mut self, f: F) -> core::result::Result<Option<R>, LaneRxError>
235 where
236 F: FnOnce(&[u8]) -> R,
237 {
238 self.rx
239 .try_recv_with(|bytes| Ok::<R, core::convert::Infallible>(f(bytes)))
240 .map_err(|_| LaneRxError::Failed)
241 }
242}
243
244impl LaneRxSlot for ShmEventsMpscRx {
245 type Slot = indexbus_core::EventsSlot;
246
247 fn try_recv_slot(&mut self) -> core::result::Result<Option<Self::Slot>, LaneRxError> {
248 self.rx.try_recv_slot().map_err(|_| LaneRxError::Failed)
249 }
250}
251
252impl ShmEventsMpscRx {
253 pub fn try_recv_with<F, R, E>(&self, f: F) -> Result<Option<R>, indexbus_core::RecvWithError<E>>
255 where
256 F: FnOnce(&[u8]) -> Result<R, E>,
257 {
258 self.rx.try_recv_with(f)
259 }
260
261 pub fn try_recv_slot(&self) -> Result<Option<indexbus_core::EventsSlot>, indexbus_core::Error> {
263 self.rx.try_recv_slot()
264 }
265}
266
267pub struct ShmEventsMpscTx {
269 _region: Arc<byteor_transport_shm::EventsChainRegion<4>>,
270 tx: indexbus_core::ChainMpscProducer,
271}
272
273impl LaneTx for ShmEventsMpscTx {
274 fn publish(&mut self, bytes: &[u8]) -> Result<(), LaneTxError> {
275 match self.tx.publish(bytes) {
276 Ok(()) => Ok(()),
277 Err(indexbus_core::Error::Full) => Err(LaneTxError::Full),
278 Err(indexbus_core::Error::TooLarge { max, len }) => {
279 Err(LaneTxError::TooLarge { max, len })
280 }
281 Err(_) => Err(LaneTxError::Failed),
282 }
283 }
284}
285
286impl LaneTxWith for ShmEventsMpscTx {
287 fn publish_with<F>(&mut self, f: F) -> core::result::Result<(), LaneTxError>
288 where
289 F: FnOnce(&mut [u8]) -> usize,
290 {
291 match self
292 .tx
293 .publish_with(|slot| Ok::<usize, core::convert::Infallible>(f(slot)))
294 {
295 Ok(()) => Ok(()),
296 Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
297 Err(LaneTxError::Full)
298 }
299 Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::TooLarge {
300 max,
301 len,
302 })) => Err(LaneTxError::TooLarge { max, len }),
303 Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxError::Failed),
304 Err(indexbus_core::PublishWithError::Encode(_never)) => unreachable!(),
305 }
306 }
307}
308
309impl LaneTxWithResult for ShmEventsMpscTx {
310 fn publish_with_result<F, E>(&mut self, f: F) -> core::result::Result<(), LaneTxWithError<E>>
311 where
312 F: FnOnce(&mut [u8]) -> core::result::Result<usize, E>,
313 {
314 match self.tx.publish_with(f) {
315 Ok(()) => Ok(()),
316 Err(indexbus_core::PublishWithError::Core(indexbus_core::Error::Full)) => {
317 Err(LaneTxWithError::Full)
318 }
319 Err(indexbus_core::PublishWithError::Core(_)) => Err(LaneTxWithError::Failed),
320 Err(indexbus_core::PublishWithError::Encode(e)) => Err(LaneTxWithError::Encode(e)),
321 }
322 }
323}
324
325impl LaneTxSlot<indexbus_core::EventsSlot> for ShmEventsMpscTx {
326 fn publish_slot(
327 &mut self,
328 slot: indexbus_core::EventsSlot,
329 ) -> core::result::Result<(), LaneTxSlotError<indexbus_core::EventsSlot>> {
330 match self.tx.try_publish_slot(slot) {
331 Ok(()) => Ok(()),
332 Err(e) => match e.error {
333 indexbus_core::Error::Full => Err(LaneTxSlotError::Full(e.slot)),
334 indexbus_core::Error::IncompatibleLayout => {
335 Err(LaneTxSlotError::Incompatible(e.slot))
336 }
337 _ => Err(LaneTxSlotError::Failed(e.slot)),
338 },
339 }
340 }
341}
342
343impl ShmEventsMpscTx {
344 pub fn publish_with<F, E>(&self, f: F) -> Result<(), indexbus_core::PublishWithError<E>>
346 where
347 F: FnOnce(&mut [u8]) -> Result<usize, E>,
348 {
349 self.tx.publish_with(f)
350 }
351
352 pub fn publish_slot(&self, slot: indexbus_core::EventsSlot) -> Result<(), LaneTxError> {
354 match self.tx.publish_slot(slot) {
355 Ok(()) => Ok(()),
356 Err(indexbus_core::Error::Full) => Err(LaneTxError::Full),
357 Err(_) => Err(LaneTxError::Failed),
358 }
359 }
360}
361
362fn transport_open_options(opts: &AttachOptions) -> byteor_transport_shm::OpenOptions {
363 let mut o = byteor_transport_shm::OpenOptions::new()
364 .blocking(opts.blocking)
365 .prefault(opts.prefault);
366 if let Some(p) = &opts.path {
367 o = o.path(p.clone());
368 }
369 o
370}
371
372fn resolve_path<'a>(opts: &'a AttachOptions, lane: &'a str) -> Option<&'a Path> {
373 let _ = lane;
374 opts.path.as_deref()
375}
376
377pub fn attach_events_rx(opts: &AttachOptions, lane: &str) -> Result<ShmEventsRx, BackingError> {
379 if opts.blocking {
380 return Err(BackingError::Unsupported(
381 "events-chain does not support blocking in v1",
382 ));
383 }
384 if lane.is_empty() {
385 return Err(BackingError::Invalid("lane name is empty"));
386 }
387
388 let region = ShmEventsChainRegion::open(opts, lane)?.region;
389
390 let (_tx, rx) = region.queue_spsc(opts.queue)?;
391 Ok(ShmEventsRx {
392 _region: region,
393 rx,
394 })
395}
396
397pub fn attach_events_tx(opts: &AttachOptions, lane: &str) -> Result<ShmEventsTx, BackingError> {
399 if opts.blocking {
400 return Err(BackingError::Unsupported(
401 "events-chain does not support blocking in v1",
402 ));
403 }
404 if lane.is_empty() {
405 return Err(BackingError::Invalid("lane name is empty"));
406 }
407
408 let region = ShmEventsChainRegion::open(opts, lane)?.region;
409
410 let (tx, _rx) = region.queue_spsc(opts.queue)?;
411 Ok(ShmEventsTx {
412 _region: region,
413 tx,
414 })
415}
416
417pub fn attach_events_mpsc_rx(
419 opts: &AttachOptions,
420 lane: &str,
421) -> Result<ShmEventsMpscRx, BackingError> {
422 if opts.blocking {
423 return Err(BackingError::Unsupported(
424 "events-chain does not support blocking in v1",
425 ));
426 }
427 if lane.is_empty() {
428 return Err(BackingError::Invalid("lane name is empty"));
429 }
430
431 let region = ShmEventsChainRegion::open(opts, lane)?.region;
432
433 let (_tx, rx) = region.queue_mpsc(opts.queue)?;
434 Ok(ShmEventsMpscRx {
435 _region: region,
436 rx,
437 })
438}
439
440pub fn attach_events_mpsc_tx(
442 opts: &AttachOptions,
443 lane: &str,
444) -> Result<ShmEventsMpscTx, BackingError> {
445 if opts.blocking {
446 return Err(BackingError::Unsupported(
447 "events-chain does not support blocking in v1",
448 ));
449 }
450 if lane.is_empty() {
451 return Err(BackingError::Invalid("lane name is empty"));
452 }
453
454 let region = ShmEventsChainRegion::open(opts, lane)?.region;
455
456 let (tx, _rx) = region.queue_mpsc(opts.queue)?;
457 Ok(ShmEventsMpscTx {
458 _region: region,
459 tx,
460 })
461}