indexbus_wake/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2#![deny(missing_docs)]
3
4//! OS-backed wait/wake primitives for IndexBus.
5//!
6//! This crate provides thin wrappers around platform “wait on address” primitives for an aligned
7//! `u32`:
8//!
9//! - Linux: futex (`FUTEX_WAIT|PRIVATE` / `FUTEX_WAKE|PRIVATE`)
10//! - Windows: `WaitOnAddress` / `WakeByAddressSingle` / `WakeByAddressAll`
11//!
12//! These are intended for blocking adapters and higher-level async integration.
13//!
14//! ## Feature flags
15//!
16//! - `std` (default): enables `std::fmt::Display` and `std::error::Error` for [`WaitError`].
17//!   The wait/wake functions themselves only depend on `core`, but still require an OS.
18//!
19//! ## Correctness notes
20//!
21//! These primitives are parking mechanisms, not a full synchronization protocol:
22//!
23//! - Waits may return spuriously (a wake can unblock a waiter even if the value is still equal).
24//! - Wakes are not queued; a wake that happens before a thread actually sleeps can be lost.
25//! - `wait_u32_eq` does not establish a happens-before edge for your data by itself.
26//!
27//! Higher-level code should use the standard “sequence” pattern:
28//!
29//! - Writer: update shared data, then `store(seq + 1, Ordering::Release)`, then call `wake_*`.
30//! - Reader: `load(Ordering::Acquire)` and, if unchanged, call `wait_u32_eq` in a loop.
31//!
32//! ## Minimal usage
33//!
34//! ```no_run
35//! use core::time::Duration;
36//! use indexbus_wake::{wait_u32_eq, wake_one};
37//! use std::sync::atomic::{AtomicU32, Ordering};
38//!
39//! let seq = AtomicU32::new(0);
40//! let addr = &seq as *const AtomicU32 as *const u32;
41//!
42//! // SAFETY: `addr` is aligned/valid for the call, and the location is mutated race-free via
43//! // the `AtomicU32` at the same address.
44//! unsafe {
45//!     // Typical reader-side pattern: re-check the condition in a loop around this call.
46//!     let _ = wait_u32_eq(addr, 0, Some(Duration::from_millis(10)));
47//! }
48//!
49//! seq.store(1, Ordering::Release);
50//! unsafe { wake_one(addr) };
51//! ```
52
53mod errors;
54mod internal;
55
56pub use errors::WaitError;
57
58use core::time::Duration;
59
60/// Wait while the 32-bit value at `addr` equals `expected`.
61///
62/// Semantics (mirrors futex / `WaitOnAddress`):
63///
64/// - If `*addr != expected`, returns immediately with `Ok(())`.
65/// - Otherwise, blocks until woken, the value changes, or `timeout` elapses.
66///
67/// This call may return `Ok(())` even if the value is still equal (spurious wakeups); callers
68/// should always re-check the condition in a loop.
69///
70/// # Errors
71///
72/// - [`WaitError::TimedOut`]: `timeout` elapsed.
73/// - [`WaitError::Interrupted`]: interrupted by a signal (Linux).
74/// - [`WaitError::OsError`]: platform-specific failure (errno on Linux; `GetLastError` on Windows).
75/// - [`WaitError::Unsupported`]: this target has no implementation (non-Linux / non-Windows).
76///
77/// # Safety
78///
79/// - `addr` must be valid to read a `u32` for the duration of the call.
80/// - `addr` must be aligned to 4 bytes.
81/// - If other threads/processes mutate the value concurrently, the memory at `addr` must be
82///   accessed in a race-free way (typically by storing through an `AtomicU32` at the same
83///   address and passing its pointer cast to `*const u32`).
84#[inline]
85pub unsafe fn wait_u32_eq(
86    addr: *const u32,
87    expected: u32,
88    timeout: Option<Duration>,
89) -> Result<(), WaitError> {
90    internal::wait_u32_eq(addr, expected, timeout)
91}
92
93/// Wake a single waiter blocked on `addr`.
94///
95/// Wakes are best-effort: this may do nothing if there are no waiters, and a wake may be lost
96/// if it races with a waiter that has not yet parked.
97///
98/// # Safety
99///
100/// `addr` must be a valid address used by waiters (typically `&WakeCell.seq`).
101#[inline]
102pub unsafe fn wake_one(addr: *const u32) {
103    internal::wake_one(addr)
104}
105
106/// Wake all waiters blocked on `addr`.
107///
108/// Wakes are best-effort: this may do nothing if there are no waiters, and a wake may be lost
109/// if it races with a waiter that has not yet parked.
110///
111/// # Safety
112///
113/// `addr` must be a valid address used by waiters (typically `&WakeCell.seq`).
114#[inline]
115pub unsafe fn wake_all(addr: *const u32) {
116    internal::wake_all(addr)
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use std::sync::atomic::{AtomicU32, Ordering};
123    use std::sync::Arc;
124    use std::thread;
125
126    #[cfg(feature = "std")]
127    #[test]
128    fn wait_error_display_and_error_trait() {
129        let e = WaitError::TimedOut;
130        assert_eq!(e.to_string(), "timed out");
131
132        let e = WaitError::Interrupted;
133        assert_eq!(e.to_string(), "interrupted");
134
135        let e = WaitError::OsError(12);
136        assert_eq!(e.to_string(), "os error: 12");
137
138        let e = WaitError::Unsupported;
139        assert_eq!(e.to_string(), "unsupported platform");
140
141        let _as_error: &dyn std::error::Error = &WaitError::TimedOut;
142    }
143
144    #[test]
145    fn wait_wake_roundtrip() {
146        // Use an AtomicU32 as the shared cell; the wait API only needs the address.
147        let cell = Arc::new(AtomicU32::new(0));
148        let cell2 = cell.clone();
149
150        let t = thread::spawn(move || unsafe {
151            // Wait while value is 0; main thread will change it and wake.
152            wait_u32_eq(
153                cell2.as_ref() as *const AtomicU32 as *const u32,
154                0,
155                Some(Duration::from_secs(2)),
156            )
157            .expect("wait");
158        });
159
160        // Ensure waiter likely parks.
161        thread::sleep(Duration::from_millis(10));
162        cell.store(1, Ordering::Release);
163        unsafe { wake_all(cell.as_ref() as *const AtomicU32 as *const u32) };
164
165        t.join().unwrap();
166    }
167}