byteor_pipeline_backings_shm/
journal.rs

1//! SHM-backed journal backings (v1, optional).
2//!
3//! This module is feature-gated because journals are optional in the v1 OSS lane-graph.
4
5use std::path::Path;
6
7use byteor_pipeline_kernel::{LaneRx, LaneTx, LaneTxError};
8
9use crate::{AttachOptions, BackingError};
10
11/// SHM-backed journal publisher.
12///
13/// Attach-only: opens/creates the mapping and exposes a publisher handle.
14pub struct ShmJournalPublisher {
15    _region: indexbus_transport_shm::JournalRegion4,
16    pubr: indexbus_log::JournalPublisher,
17}
18
19impl ShmJournalPublisher {
20    /// Append a payload (claim + copy + commit).
21    #[inline]
22    pub fn try_append(&mut self, payload: &[u8]) -> Result<u64, BackingError> {
23        Ok(self.pubr.try_append(payload)?)
24    }
25
26    /// Current publication position.
27    #[inline]
28    pub fn pub_pos(&self) -> u64 {
29        self.pubr.pub_pos()
30    }
31
32    /// Maximum payload size supported by this layout.
33    #[inline]
34    pub fn max_payload(&self) -> u32 {
35        self.pubr.max_payload()
36    }
37}
38
39impl LaneTx for ShmJournalPublisher {
40    fn publish(&mut self, msg: &[u8]) -> Result<(), LaneTxError> {
41        let max = self.max_payload() as usize;
42        if msg.len() > max {
43            return Err(LaneTxError::TooLarge {
44                max,
45                len: msg.len(),
46            });
47        }
48
49        self.try_append(msg).map(|_| ()).map_err(|e| match e {
50            BackingError::Journal(indexbus_log::Error::Full) => LaneTxError::Full,
51            _ => LaneTxError::Failed,
52        })
53    }
54}
55
56/// SHM-backed journal subscriber.
57///
58/// Attach-only: opens/creates the mapping and exposes a subscriber handle.
59pub struct ShmJournalSubscriber {
60    _region: indexbus_transport_shm::JournalRegion4,
61    sub: indexbus_log::JournalSubscriber,
62}
63
64impl ShmJournalSubscriber {
65    /// Poll for the next record, copying bytes into `out`.
66    ///
67    /// Returns `Ok(None)` when no record is available yet.
68    #[inline]
69    pub fn poll_next_len(&mut self, out: &mut [u8]) -> Result<Option<usize>, BackingError> {
70        Ok(self.sub.poll_next_len(out)?)
71    }
72
73    /// Spin/poll until the next record is available.
74    #[inline]
75    pub fn wait_next_len<W: indexbus_core::WaitStrategy>(
76        &mut self,
77        out: &mut [u8],
78        wait: &mut W,
79    ) -> Result<usize, BackingError> {
80        let slice = self.sub.wait_next(out, wait)?;
81        Ok(slice.len())
82    }
83
84    /// Current subscriber position.
85    #[inline]
86    pub fn pos(&self) -> u64 {
87        self.sub.pos()
88    }
89
90    /// Subscriber lag behind the publisher.
91    #[inline]
92    pub fn lag_bytes(&self) -> u64 {
93        self.sub.lag_bytes()
94    }
95}
96
97impl LaneRx for ShmJournalSubscriber {
98    fn recv(&mut self, buf: &mut [u8]) -> Option<usize> {
99        match self.poll_next_len(buf) {
100            Ok(Some(n)) => Some(n),
101            Ok(None) => None,
102            Err(_) => None,
103        }
104    }
105}
106
107fn transport_open_options(opts: &AttachOptions) -> indexbus_transport_shm::JournalOpenOptions {
108    let mut o = indexbus_transport_shm::JournalOpenOptions::new().blocking(opts.blocking);
109    if let Some(p) = &opts.path {
110        o = o.path(p.clone());
111    }
112    o
113}
114
115fn resolve_path<'a>(opts: &'a AttachOptions, lane: &'a str) -> Option<&'a Path> {
116    let _ = lane;
117    opts.path.as_deref()
118}
119
120/// Attach to a journal publisher.
121pub fn attach_journal_publisher(
122    opts: &AttachOptions,
123    lane: &str,
124    cfg: indexbus_log::JournalPublisherConfig,
125) -> Result<ShmJournalPublisher, BackingError> {
126    if lane.is_empty() {
127        return Err(BackingError::Invalid("lane name is empty"));
128    }
129
130    let o = transport_open_options(opts);
131    let mut region = if let Some(path) = resolve_path(opts, lane) {
132        indexbus_transport_shm::JournalRegion4::open_path_with(path, o)?
133    } else {
134        indexbus_transport_shm::JournalRegion4::open_with(lane, o)?
135    };
136
137    let pubr = region.publisher(cfg)?;
138    Ok(ShmJournalPublisher {
139        _region: region,
140        pubr,
141    })
142}
143
144/// Attach to a journal subscriber starting at an explicit position.
145pub fn attach_journal_subscriber_from(
146    opts: &AttachOptions,
147    lane: &str,
148    start_pos: u64,
149) -> Result<ShmJournalSubscriber, BackingError> {
150    if lane.is_empty() {
151        return Err(BackingError::Invalid("lane name is empty"));
152    }
153
154    let o = transport_open_options(opts);
155    let mut region = if let Some(path) = resolve_path(opts, lane) {
156        indexbus_transport_shm::JournalRegion4::open_path_with(path, o)?
157    } else {
158        indexbus_transport_shm::JournalRegion4::open_with(lane, o)?
159    };
160
161    let sub = region.subscriber(opts.queue, start_pos)?;
162    Ok(ShmJournalSubscriber {
163        _region: region,
164        sub,
165    })
166}
167
168/// Attach to a journal subscriber starting at the current tail (safe-by-default).
169///
170/// This avoids unintentionally replaying old data unless an operator explicitly requests it
171/// via [`attach_journal_subscriber_from`].
172pub fn attach_journal_subscriber_tail(
173    opts: &AttachOptions,
174    lane: &str,
175) -> Result<ShmJournalSubscriber, BackingError> {
176    if lane.is_empty() {
177        return Err(BackingError::Invalid("lane name is empty"));
178    }
179
180    let o = transport_open_options(opts);
181    let mut region = if let Some(path) = resolve_path(opts, lane) {
182        indexbus_transport_shm::JournalRegion4::open_path_with(path, o)?
183    } else {
184        indexbus_transport_shm::JournalRegion4::open_with(lane, o)?
185    };
186
187    // Determine current tail position without publishing.
188    let tail = {
189        let pubr = region.publisher(indexbus_log::JournalPublisherConfig {
190            subs: 1,
191            ..Default::default()
192        })?;
193        pubr.pub_pos()
194    };
195
196    let sub = region.subscriber(opts.queue, tail)?;
197    Ok(ShmJournalSubscriber {
198        _region: region,
199        sub,
200    })
201}