indexbus_core/internal/
state.rs

1//! State-stream publisher/reader algorithms.
2//!
3//! The state layout uses a sequence-parity protocol:
4//! - writer increments `seq` to odd (write in progress)
5//! - writes bytes + publishes `len`
6//! - writer increments `seq` to even (stable)
7//!
8//! Readers only accept snapshots where `seq` is even and unchanged across the read.
9use core::cell::UnsafeCell;
10use core::sync::atomic::{AtomicU64, Ordering};
11
12use indexbus_abi::layouts::state::StateLayout;
13
14use crate::internal::atomics::{atomic_u32, atomic_u64};
15use crate::{internal, Error};
16
17/// `StateLayout` wrapper that explicitly opts into interior mutability.
18///
19/// The v1 state layout is designed for concurrent mutation through atomics and raw pointers.
20/// Rust requires such mutation through a shared reference to be mediated by [`UnsafeCell`].
21#[repr(transparent)]
22pub struct StateLayoutCell<const STATE_MAX: usize>(UnsafeCell<StateLayout<STATE_MAX>>);
23
24impl<const STATE_MAX: usize> StateLayoutCell<STATE_MAX> {
25    /// Borrow a `StateLayoutCell` view over a uniquely-mutable `StateLayout`.
26    #[inline]
27    pub fn from_mut(inner: &mut StateLayout<STATE_MAX>) -> &StateLayoutCell<STATE_MAX> {
28        // Safety: `StateLayoutCell` is `repr(transparent)` over `UnsafeCell<StateLayout<_>>`, and
29        // `UnsafeCell<T>` is `repr(transparent)` over `T`.
30        unsafe { &*(inner as *mut StateLayout<STATE_MAX> as *const StateLayoutCell<STATE_MAX>) }
31    }
32
33    /// Borrow a `StateLayoutCell` view over a raw pointer.
34    ///
35    /// # Safety
36    /// `ptr` must be valid for reads/writes for the lifetime `'a`.
37    #[inline]
38    pub unsafe fn from_ptr<'a>(ptr: *mut StateLayout<STATE_MAX>) -> &'a StateLayoutCell<STATE_MAX> {
39        unsafe { &*(ptr as *const StateLayoutCell<STATE_MAX>) }
40    }
41
42    #[inline]
43    fn as_ptr(&self) -> *mut StateLayout<STATE_MAX> {
44        self.0.get()
45    }
46}
47
48/// Publisher handle for a state layout.
49///
50/// ## Contract
51///
52/// - Intended for a single writer.
53/// - Each call to [`StatePublisher::publish`] overwrites the entire state value.
54/// - Publication uses the v1 parity protocol: readers should only accept snapshots observed at
55///   an even `seq` before and after the read.
56pub struct StatePublisher<const STATE_MAX: usize> {
57    inner: *mut StateLayout<STATE_MAX>,
58}
59
60/// Reader handle for a state layout.
61///
62/// Readers use the parity protocol to avoid torn reads: they only return a snapshot if `seq`
63/// was even and unchanged across the read.
64pub struct StateReader<const STATE_MAX: usize> {
65    inner: *mut StateLayout<STATE_MAX>,
66}
67
68// Thread-safety notes:
69// - Handles are thin wrappers around a raw pointer to a shared-memory mapping. The caller must
70//   ensure the mapped memory outlives all handles.
71// - `StatePublisher` is intended for a single writer thread at a time.
72// - `StateReader` only performs atomic loads + raw memory copies and may be used concurrently.
73unsafe impl<const STATE_MAX: usize> Send for StatePublisher<STATE_MAX> {}
74unsafe impl<const STATE_MAX: usize> Send for StateReader<STATE_MAX> {}
75unsafe impl<const STATE_MAX: usize> Sync for StateReader<STATE_MAX> {}
76
77impl<const STATE_MAX: usize> StatePublisher<STATE_MAX> {
78    #[inline]
79    /// Create a publisher over a mapped layout without validating it.
80    ///
81    /// Prefer `try_new` when constructing from an untrusted mapping.
82    pub fn new(inner: &StateLayoutCell<STATE_MAX>) -> Self {
83        Self {
84            inner: inner.as_ptr(),
85        }
86    }
87
88    /// Create a publisher after validating the mapped layout.
89    #[inline]
90    pub fn try_new(inner: &StateLayoutCell<STATE_MAX>) -> Result<Self, Error> {
91        let ptr = inner.as_ptr();
92        internal::validate::validate_state_layout::<STATE_MAX>(unsafe { &*ptr })?;
93        Ok(Self { inner: ptr })
94    }
95
96    /// Overwrite the current state and return the new stable sequence number.
97    ///
98    /// On success, the returned `seq` is even and can be used by readers as a monotonic marker.
99    ///
100    /// # Errors
101    ///
102    /// Returns `Err(required_len)` when `bytes.len() > STATE_MAX`. No bytes are published in
103    /// this case.
104    pub fn publish(&self, bytes: &[u8]) -> Result<u64, usize> {
105        if bytes.len() > STATE_MAX {
106            return Err(bytes.len());
107        }
108
109        // Make seq odd (writer in progress).
110        let inner = self.inner;
111        let seq0 = unsafe { atomic_u64(&(*inner).seq) }.fetch_add(1, Ordering::SeqCst) + 1;
112        debug_assert!(seq0 % 2 == 1);
113
114        // Publish payload using atomic word stores to avoid data races.
115        //
116        // Using per-byte atomics is Miri-clean but can allow a reader to observe a mixed snapshot
117        // while also observing a stable `seq` (due to weak ordering between independent atomics).
118        // Writing/reading the payload via `AtomicU64` words with `SeqCst` ordering keeps the
119        // seqlock-style protocol reliable under Miri's memory model.
120        unsafe {
121            let dst_u8 = core::ptr::addr_of_mut!((*inner).data).cast::<u8>();
122            debug_assert_eq!(dst_u8.align_offset(core::mem::align_of::<AtomicU64>()), 0);
123            let dst = dst_u8.cast::<AtomicU64>();
124
125            let full_words = bytes.len() / 8;
126            for i in 0..full_words {
127                let mut chunk = [0u8; 8];
128                chunk.copy_from_slice(&bytes[i * 8..i * 8 + 8]);
129                (*dst.add(i)).store(u64::from_le_bytes(chunk), Ordering::SeqCst);
130            }
131
132            let rem = bytes.len() % 8;
133            if rem != 0 {
134                let mut chunk = [0u8; 8];
135                chunk[..rem].copy_from_slice(&bytes[full_words * 8..]);
136                (*dst.add(full_words)).store(u64::from_le_bytes(chunk), Ordering::SeqCst);
137            }
138        }
139
140        unsafe { atomic_u32(&(*inner).len) }.store(bytes.len() as u32, Ordering::SeqCst);
141
142        // Make seq even (stable).
143        let seq1 = unsafe { atomic_u64(&(*inner).seq) }.fetch_add(1, Ordering::SeqCst) + 1;
144        debug_assert!(seq1 % 2 == 0);
145
146        Ok(seq1)
147    }
148}
149
150impl<const STATE_MAX: usize> StateReader<STATE_MAX> {
151    #[inline]
152    /// Create a reader over a mapped layout without validating it.
153    ///
154    /// Prefer `try_new` when constructing from an untrusted mapping.
155    pub fn new(inner: &StateLayoutCell<STATE_MAX>) -> Self {
156        Self {
157            inner: inner.as_ptr(),
158        }
159    }
160
161    /// Create a reader after validating the mapped layout.
162    #[inline]
163    pub fn try_new(inner: &StateLayoutCell<STATE_MAX>) -> Result<Self, Error> {
164        let ptr = inner.as_ptr();
165        internal::validate::validate_state_layout::<STATE_MAX>(unsafe { &*ptr })?;
166        Ok(Self { inner: ptr })
167    }
168
169    /// Load the latest stable state snapshot into `out`.
170    ///
171    /// Returns `Ok(None)` when:
172    /// - there is no published state yet (`len == 0`)
173    /// - a writer is in progress (odd `seq`)
174    /// - a write raced with the read (sequence changed)
175    ///
176    /// # Errors
177    ///
178    /// Returns `Err(required_len)` when `out` is too small to hold the current snapshot.
179    pub fn try_load_into(&self, out: &mut [u8]) -> Result<Option<(usize, u64)>, usize> {
180        let inner = self.inner;
181        let seq0 = unsafe { atomic_u64(&(*inner).seq) }.load(Ordering::SeqCst);
182        if seq0 % 2 == 1 {
183            return Ok(None);
184        }
185
186        let len = unsafe { atomic_u32(&(*inner).len) }.load(Ordering::SeqCst) as usize;
187        if len == 0 {
188            return Ok(None);
189        }
190        if out.len() < len {
191            return Err(len);
192        }
193
194        // Load payload using atomic word loads to avoid data races.
195        unsafe {
196            let src_u8 = core::ptr::addr_of!((*inner).data).cast::<u8>();
197            debug_assert_eq!(src_u8.align_offset(core::mem::align_of::<AtomicU64>()), 0);
198            let src = src_u8.cast::<AtomicU64>();
199
200            let full_words = len / 8;
201            for i in 0..full_words {
202                let word = (*src.add(i)).load(Ordering::SeqCst);
203                out[i * 8..i * 8 + 8].copy_from_slice(&word.to_le_bytes());
204            }
205
206            let rem = len % 8;
207            if rem != 0 {
208                let word = (*src.add(full_words)).load(Ordering::SeqCst);
209                out[full_words * 8..len].copy_from_slice(&word.to_le_bytes()[..rem]);
210            }
211        }
212
213        let seq1 = unsafe { atomic_u64(&(*inner).seq) }.load(Ordering::SeqCst);
214        if seq1 == seq0 && seq1 % 2 == 0 {
215            Ok(Some((len, seq1)))
216        } else {
217            Ok(None)
218        }
219    }
220}