byteor_pipeline_backings_shm/
journal.rs1use std::path::Path;
6
7use byteor_pipeline_kernel::{LaneRx, LaneTx, LaneTxError};
8
9use crate::{AttachOptions, BackingError};
10
11pub struct ShmJournalPublisher {
15 _region: indexbus_transport_shm::JournalRegion4,
16 pubr: indexbus_log::JournalPublisher,
17}
18
19impl ShmJournalPublisher {
20 #[inline]
22 pub fn try_append(&mut self, payload: &[u8]) -> Result<u64, BackingError> {
23 Ok(self.pubr.try_append(payload)?)
24 }
25
26 #[inline]
28 pub fn pub_pos(&self) -> u64 {
29 self.pubr.pub_pos()
30 }
31
32 #[inline]
34 pub fn max_payload(&self) -> u32 {
35 self.pubr.max_payload()
36 }
37}
38
39impl LaneTx for ShmJournalPublisher {
40 fn publish(&mut self, msg: &[u8]) -> Result<(), LaneTxError> {
41 let max = self.max_payload() as usize;
42 if msg.len() > max {
43 return Err(LaneTxError::TooLarge {
44 max,
45 len: msg.len(),
46 });
47 }
48
49 self.try_append(msg).map(|_| ()).map_err(|e| match e {
50 BackingError::Journal(indexbus_log::Error::Full) => LaneTxError::Full,
51 _ => LaneTxError::Failed,
52 })
53 }
54}
55
56pub struct ShmJournalSubscriber {
60 _region: indexbus_transport_shm::JournalRegion4,
61 sub: indexbus_log::JournalSubscriber,
62}
63
64impl ShmJournalSubscriber {
65 #[inline]
69 pub fn poll_next_len(&mut self, out: &mut [u8]) -> Result<Option<usize>, BackingError> {
70 Ok(self.sub.poll_next_len(out)?)
71 }
72
73 #[inline]
75 pub fn wait_next_len<W: indexbus_core::WaitStrategy>(
76 &mut self,
77 out: &mut [u8],
78 wait: &mut W,
79 ) -> Result<usize, BackingError> {
80 let slice = self.sub.wait_next(out, wait)?;
81 Ok(slice.len())
82 }
83
84 #[inline]
86 pub fn pos(&self) -> u64 {
87 self.sub.pos()
88 }
89
90 #[inline]
92 pub fn lag_bytes(&self) -> u64 {
93 self.sub.lag_bytes()
94 }
95}
96
97impl LaneRx for ShmJournalSubscriber {
98 fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
99 match self.poll_next_len(buf) {
100 Ok(Some(n)) => Some(n),
101 Ok(None) => None,
102 Err(_) => None,
103 }
104 }
105}
106
107fn transport_open_options(opts: &AttachOptions) -> indexbus_transport_shm::JournalOpenOptions {
108 let mut o = indexbus_transport_shm::JournalOpenOptions::new().blocking(opts.blocking);
109 if let Some(p) = &opts.path {
110 o = o.path(p.clone());
111 }
112 o
113}
114
115fn resolve_path<'a>(opts: &'a AttachOptions, lane: &'a str) -> Option<&'a Path> {
116 let _ = lane;
117 opts.path.as_deref()
118}
119
120pub fn attach_journal_publisher(
122 opts: &AttachOptions,
123 lane: &str,
124 cfg: indexbus_log::JournalPublisherConfig,
125) -> Result<ShmJournalPublisher, BackingError> {
126 if lane.is_empty() {
127 return Err(BackingError::Invalid("lane name is empty"));
128 }
129
130 let o = transport_open_options(opts);
131 let mut region = if let Some(path) = resolve_path(opts, lane) {
132 indexbus_transport_shm::JournalRegion4::open_path_with(path, o)?
133 } else {
134 indexbus_transport_shm::JournalRegion4::open_with(lane, o)?
135 };
136
137 let pubr = region.publisher(cfg)?;
138 Ok(ShmJournalPublisher {
139 _region: region,
140 pubr,
141 })
142}
143
144pub fn attach_journal_subscriber_from(
146 opts: &AttachOptions,
147 lane: &str,
148 start_pos: u64,
149) -> Result<ShmJournalSubscriber, BackingError> {
150 if lane.is_empty() {
151 return Err(BackingError::Invalid("lane name is empty"));
152 }
153
154 let o = transport_open_options(opts);
155 let mut region = if let Some(path) = resolve_path(opts, lane) {
156 indexbus_transport_shm::JournalRegion4::open_path_with(path, o)?
157 } else {
158 indexbus_transport_shm::JournalRegion4::open_with(lane, o)?
159 };
160
161 let sub = region.subscriber(opts.queue, start_pos)?;
162 Ok(ShmJournalSubscriber {
163 _region: region,
164 sub,
165 })
166}
167
168pub fn attach_journal_subscriber_tail(
173 opts: &AttachOptions,
174 lane: &str,
175) -> Result<ShmJournalSubscriber, BackingError> {
176 if lane.is_empty() {
177 return Err(BackingError::Invalid("lane name is empty"));
178 }
179
180 let o = transport_open_options(opts);
181 let mut region = if let Some(path) = resolve_path(opts, lane) {
182 indexbus_transport_shm::JournalRegion4::open_path_with(path, o)?
183 } else {
184 indexbus_transport_shm::JournalRegion4::open_with(lane, o)?
185 };
186
187 let tail = {
189 let pubr = region.publisher(indexbus_log::JournalPublisherConfig {
190 subs: 1,
191 ..Default::default()
192 })?;
193 pubr.pub_pos()
194 };
195
196 let sub = region.subscriber(opts.queue, tail)?;
197 Ok(ShmJournalSubscriber {
198 _region: region,
199 sub,
200 })
201}