indexbus_kit/runtime/
lifecycle.rs

1//! Deterministic startup/shutdown building blocks.
2//!
3//! These helpers are intentionally lightweight and backend-agnostic.
4//!
5//! Patterns covered:
6//! - readiness gating (aggregate multiple components into one "ready" latch)
7//! - stop flag fanout to threads
8//! - recording the first fatal error (root cause) and requesting stop
9//!
10//! Notes on router ordering:
11//! - request stop (set the shared stop flag)
12//! - let stages exit / optionally drain
13//! - stop routers and `join()` them (so routing loops terminate deterministically)
14//!
15//! Notes on drain windows:
16//! - when stop is requested, you may choose to keep running a stage loop for a bounded
17//!   duration (e.g. `Instant::now() < deadline`) to flush in-flight messages.
18
19use std::fmt;
20use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
21use std::sync::{Arc, OnceLock};
22
23/// Aggregates multiple components into a single readiness latch.
24///
25/// The gate becomes "ready" once `required` components have called `component_started()`.
26pub struct ReadinessGate {
27    required: usize,
28    started: AtomicUsize,
29    ready: AtomicBool,
30    on_ready: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
31}
32
33impl ReadinessGate {
34    /// Create a new gate.
35    ///
36    /// If `required` is 0, it is treated as 1.
37    pub fn new(required: usize) -> Self {
38        Self {
39            required: required.max(1),
40            started: AtomicUsize::new(0),
41            ready: AtomicBool::new(false),
42            on_ready: None,
43        }
44    }
45
46    /// Install an `on_ready` callback invoked once when the gate becomes ready.
47    pub fn with_on_ready(mut self, on_ready: Arc<dyn Fn() + Send + Sync + 'static>) -> Self {
48        self.on_ready = Some(on_ready);
49        self
50    }
51
52    /// Produce a token to be handed to a component/thread.
53    pub fn token(self: &Arc<Self>) -> ReadinessToken {
54        ReadinessToken {
55            gate: Some(Arc::clone(self)),
56        }
57    }
58
59    /// Whether the gate is ready.
60    pub fn is_ready(&self) -> bool {
61        self.ready.load(Ordering::Acquire)
62    }
63
64    fn component_started(&self) {
65        let n = self.started.fetch_add(1, Ordering::AcqRel) + 1;
66        if n >= self.required
67            && self
68                .ready
69                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
70                .is_ok()
71        {
72            if let Some(cb) = self.on_ready.as_deref() {
73                cb();
74            }
75        }
76    }
77}
78
79/// A per-component handle used to mark startup progress.
80#[derive(Clone, Default)]
81pub struct ReadinessToken {
82    gate: Option<Arc<ReadinessGate>>,
83}
84
85impl ReadinessToken {
86    /// Mark this component as started.
87    pub fn component_started(&self) {
88        if let Some(g) = self.gate.as_deref() {
89            g.component_started();
90        }
91    }
92}
93
94/// Captures the first fatal error and coordinates process-wide stop.
95#[derive(Clone, Debug)]
96pub struct RootCause {
97    stop: Arc<AtomicBool>,
98    cause: Arc<OnceLock<String>>,
99}
100
101impl RootCause {
102    /// Create a new root-cause recorder tied to a shared stop flag.
103    pub fn new(stop: Arc<AtomicBool>) -> Self {
104        Self {
105            stop,
106            cause: Arc::new(OnceLock::new()),
107        }
108    }
109
110    /// Request stop (idempotent).
111    pub fn request_stop(&self) {
112        self.stop.store(true, Ordering::Relaxed);
113    }
114
115    /// Whether stop has been requested.
116    pub fn stop_requested(&self) -> bool {
117        self.stop.load(Ordering::Acquire)
118    }
119
120    /// Record the first fatal error and request stop.
121    pub fn record_error(&self, who: &'static str, err: impl fmt::Display) {
122        let msg = format!("{who}: {err}");
123        let _ = self.cause.set(msg);
124        self.request_stop();
125    }
126
127    /// Get the recorded root cause (if any).
128    pub fn cause(&self) -> Option<String> {
129        self.cause.get().cloned()
130    }
131}