indexbus_wake/lib.rs
1#![cfg_attr(not(feature = "std"), no_std)]
2#![deny(missing_docs)]
3
4//! OS-backed wait/wake primitives for IndexBus.
5//!
6//! This crate provides thin wrappers around platform “wait on address” primitives for an aligned
7//! `u32`:
8//!
9//! - Linux: futex (`FUTEX_WAIT|PRIVATE` / `FUTEX_WAKE|PRIVATE`)
10//! - Windows: `WaitOnAddress` / `WakeByAddressSingle` / `WakeByAddressAll`
11//!
12//! These are intended for blocking adapters and higher-level async integration.
13//!
14//! ## Feature flags
15//!
16//! - `std` (default): enables `std::fmt::Display` and `std::error::Error` for [`WaitError`].
17//! The wait/wake functions themselves only depend on `core`, but still require an OS.
18//!
19//! ## Correctness notes
20//!
21//! These primitives are parking mechanisms, not a full synchronization protocol:
22//!
23//! - Waits may return spuriously (a wake can unblock a waiter even if the value is still equal).
24//! - Wakes are not queued; a wake that happens before a thread actually sleeps can be lost.
25//! - `wait_u32_eq` does not establish a happens-before edge for your data by itself.
26//!
27//! Higher-level code should use the standard “sequence” pattern:
28//!
29//! - Writer: update shared data, then `store(seq + 1, Ordering::Release)`, then call `wake_*`.
30//! - Reader: `load(Ordering::Acquire)` and, if unchanged, call `wait_u32_eq` in a loop.
31//!
32//! ## Minimal usage
33//!
34//! ```no_run
35//! use core::time::Duration;
36//! use indexbus_wake::{wait_u32_eq, wake_one};
37//! use std::sync::atomic::{AtomicU32, Ordering};
38//!
39//! let seq = AtomicU32::new(0);
40//! let addr = &seq as *const AtomicU32 as *const u32;
41//!
42//! // SAFETY: `addr` is aligned/valid for the call, and the location is mutated race-free via
43//! // the `AtomicU32` at the same address.
44//! unsafe {
45//! // Typical reader-side pattern: re-check the condition in a loop around this call.
46//! let _ = wait_u32_eq(addr, 0, Some(Duration::from_millis(10)));
47//! }
48//!
49//! seq.store(1, Ordering::Release);
50//! unsafe { wake_one(addr) };
51//! ```
52
53mod errors;
54mod internal;
55
56pub use errors::WaitError;
57
58use core::time::Duration;
59
60/// Wait while the 32-bit value at `addr` equals `expected`.
61///
62/// Semantics (mirrors futex / `WaitOnAddress`):
63///
64/// - If `*addr != expected`, returns immediately with `Ok(())`.
65/// - Otherwise, blocks until woken, the value changes, or `timeout` elapses.
66///
67/// This call may return `Ok(())` even if the value is still equal (spurious wakeups); callers
68/// should always re-check the condition in a loop.
69///
70/// # Errors
71///
72/// - [`WaitError::TimedOut`]: `timeout` elapsed.
73/// - [`WaitError::Interrupted`]: interrupted by a signal (Linux).
74/// - [`WaitError::OsError`]: platform-specific failure (errno on Linux; `GetLastError` on Windows).
75/// - [`WaitError::Unsupported`]: this target has no implementation (non-Linux / non-Windows).
76///
77/// # Safety
78///
79/// - `addr` must be valid to read a `u32` for the duration of the call.
80/// - `addr` must be aligned to 4 bytes.
81/// - If other threads/processes mutate the value concurrently, the memory at `addr` must be
82/// accessed in a race-free way (typically by storing through an `AtomicU32` at the same
83/// address and passing its pointer cast to `*const u32`).
84#[inline]
85pub unsafe fn wait_u32_eq(
86 addr: *const u32,
87 expected: u32,
88 timeout: Option<Duration>,
89) -> Result<(), WaitError> {
90 internal::wait_u32_eq(addr, expected, timeout)
91}
92
93/// Wake a single waiter blocked on `addr`.
94///
95/// Wakes are best-effort: this may do nothing if there are no waiters, and a wake may be lost
96/// if it races with a waiter that has not yet parked.
97///
98/// # Safety
99///
100/// `addr` must be a valid address used by waiters (typically `&WakeCell.seq`).
101#[inline]
102pub unsafe fn wake_one(addr: *const u32) {
103 internal::wake_one(addr)
104}
105
106/// Wake all waiters blocked on `addr`.
107///
108/// Wakes are best-effort: this may do nothing if there are no waiters, and a wake may be lost
109/// if it races with a waiter that has not yet parked.
110///
111/// # Safety
112///
113/// `addr` must be a valid address used by waiters (typically `&WakeCell.seq`).
114#[inline]
115pub unsafe fn wake_all(addr: *const u32) {
116 internal::wake_all(addr)
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122 use std::sync::atomic::{AtomicU32, Ordering};
123 use std::sync::Arc;
124 use std::thread;
125
126 #[cfg(feature = "std")]
127 #[test]
128 fn wait_error_display_and_error_trait() {
129 let e = WaitError::TimedOut;
130 assert_eq!(e.to_string(), "timed out");
131
132 let e = WaitError::Interrupted;
133 assert_eq!(e.to_string(), "interrupted");
134
135 let e = WaitError::OsError(12);
136 assert_eq!(e.to_string(), "os error: 12");
137
138 let e = WaitError::Unsupported;
139 assert_eq!(e.to_string(), "unsupported platform");
140
141 let _as_error: &dyn std::error::Error = &WaitError::TimedOut;
142 }
143
144 #[test]
145 fn wait_wake_roundtrip() {
146 // Use an AtomicU32 as the shared cell; the wait API only needs the address.
147 let cell = Arc::new(AtomicU32::new(0));
148 let cell2 = cell.clone();
149
150 let t = thread::spawn(move || unsafe {
151 // Wait while value is 0; main thread will change it and wake.
152 wait_u32_eq(
153 cell2.as_ref() as *const AtomicU32 as *const u32,
154 0,
155 Some(Duration::from_secs(2)),
156 )
157 .expect("wait");
158 });
159
160 // Ensure waiter likely parks.
161 thread::sleep(Duration::from_millis(10));
162 cell.store(1, Ordering::Release);
163 unsafe { wake_all(cell.as_ref() as *const AtomicU32 as *const u32) };
164
165 t.join().unwrap();
166 }
167}