indexbus_platform_core/
lifecycle.rs

1//! Lifecycle + supervision primitives.
2//!
3//! This module provides a small set of **std-only** coordination helpers intended for
4//! long-running services:
5//!
6//! - [`Shutdown`](crate::lifecycle::Shutdown) / [`ShutdownToken`](crate::lifecycle::ShutdownToken):
7//!   cooperative shutdown signaling.
8//! - [`Supervisor`](crate::lifecycle::Supervisor): spawn and join supervised threads, optionally
9//!   restartable with backoff.
10//!
11//! ## Contracts
12//!
13//! - These primitives coordinate *control flow* (shutdown / join), not data synchronization.
14//! - Child tasks must be written to observe shutdown and return in a bounded amount of time.
15//! - `Supervisor::join_all` drains the supervisor's child list; it is intended to be called once.
16
17use crate::errors::{Error, Result};
18
19use core::time::Duration;
20
21#[cfg(feature = "std")]
22use std::{
23    sync::{
24        atomic::{AtomicBool, Ordering},
25        Arc, Condvar, Mutex,
26    },
27    thread,
28    thread::JoinHandle,
29};
30
31#[cfg(feature = "std")]
32#[derive(Debug)]
33struct ShutdownInner {
34    flag: AtomicBool,
35    mu: Mutex<()>,
36    cv: Condvar,
37}
38
39#[cfg(feature = "std")]
40impl ShutdownInner {
41    fn new() -> Self {
42        Self {
43            flag: AtomicBool::new(false),
44            mu: Mutex::new(()),
45            cv: Condvar::new(),
46        }
47    }
48}
49
50/// A clonable shutdown token that tasks can poll or wait on.
51///
52/// **Contract:** this token is a cooperative cancellation primitive.
53/// Tasks/threads are expected to poll [`ShutdownToken::is_shutdown`] and exit promptly when it
54/// becomes `true`.
55///
56/// **Features:** waiting APIs require `std`.
57#[derive(Clone, Debug)]
58pub struct ShutdownToken {
59    #[cfg(feature = "std")]
60    inner: Arc<ShutdownInner>,
61}
62
63impl ShutdownToken {
64    /// Returns `true` iff shutdown has been triggered.
65    ///
66    /// **Contract:** once `true`, this will remain `true` for the lifetime of the token.
67    pub fn is_shutdown(&self) -> bool {
68        #[cfg(feature = "std")]
69        {
70            self.inner.flag.load(Ordering::Acquire)
71        }
72
73        #[cfg(not(feature = "std"))]
74        {
75            false
76        }
77    }
78
79    /// Block the current thread until shutdown is triggered.
80    ///
81    /// **Panics:** panics if the internal mutex is poisoned.
82    #[cfg(feature = "std")]
83    pub fn wait(&self) {
84        if self.is_shutdown() {
85            return;
86        }
87
88        let guard = self.inner.mu.lock().expect("shutdown mutex poisoned");
89        let _guard = self
90            .inner
91            .cv
92            .wait_while(guard, |_| !self.is_shutdown())
93            .expect("shutdown mutex poisoned");
94    }
95
96    /// Block the current thread until shutdown is triggered, or the timeout elapses.
97    ///
98    /// Returns `true` if shutdown was observed.
99    ///
100    /// **Panics:** panics if the internal mutex is poisoned.
101    #[cfg(feature = "std")]
102    pub fn wait_timeout(&self, timeout: Duration) -> bool {
103        if self.is_shutdown() {
104            return true;
105        }
106
107        let guard = self.inner.mu.lock().expect("shutdown mutex poisoned");
108        let (_guard, _timeout) = self
109            .inner
110            .cv
111            .wait_timeout_while(guard, timeout, |_| !self.is_shutdown())
112            .expect("shutdown mutex poisoned");
113        self.is_shutdown()
114    }
115}
116
117/// Shutdown coordinator.
118///
119/// A [`Shutdown`] can create many [`ShutdownToken`] clones; calling [`Shutdown::trigger`]
120/// transitions the shutdown flag to "true" and wakes any threads blocked in
121/// [`ShutdownToken::wait`] / [`ShutdownToken::wait_timeout`].
122///
123/// **Contract:** shutdown is one-way and idempotent.
124pub struct Shutdown {
125    #[cfg(feature = "std")]
126    inner: Arc<ShutdownInner>,
127}
128
129impl Shutdown {
130    /// Create a new shutdown coordinator.
131    pub fn new() -> Self {
132        #[cfg(feature = "std")]
133        {
134            let inner = Arc::new(ShutdownInner::new());
135            Self { inner }
136        }
137
138        #[cfg(not(feature = "std"))]
139        {
140            Self {}
141        }
142    }
143
144    /// Create a clonable token that can observe shutdown.
145    pub fn token(&self) -> ShutdownToken {
146        #[cfg(feature = "std")]
147        {
148            ShutdownToken {
149                inner: self.inner.clone(),
150            }
151        }
152
153        #[cfg(not(feature = "std"))]
154        {
155            ShutdownToken {}
156        }
157    }
158
159    /// Trigger shutdown.
160    ///
161    /// Returns `true` if this call transitioned from not-shutdown to shutdown.
162    ///
163    /// **Notes:** this notifies all waiters even if shutdown was already triggered.
164    pub fn trigger(&self) -> bool {
165        #[cfg(feature = "std")]
166        {
167            let was = self.inner.flag.swap(true, Ordering::Release);
168
169            // Wake any waiters. We always notify to avoid edge cases around late waits.
170            self.inner.cv.notify_all();
171
172            !was
173        }
174
175        #[cfg(not(feature = "std"))]
176        {
177            false
178        }
179    }
180}
181
182impl Default for Shutdown {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188/// Supervisor configuration.
189///
190/// **Contract:** values are validated by [`Supervisor::validate`].
191#[derive(Debug, Clone, Copy)]
192pub struct SupervisorConfig {
193    /// Maximum number of restarts for restartable children.
194    pub max_restarts: u32,
195
196    /// Initial backoff when restarting a child.
197    pub restart_backoff_initial: Duration,
198
199    /// Maximum backoff when restarting a child.
200    pub restart_backoff_max: Duration,
201}
202
203impl Default for SupervisorConfig {
204    fn default() -> Self {
205        Self {
206            max_restarts: 10,
207            restart_backoff_initial: Duration::from_millis(50),
208            restart_backoff_max: Duration::from_secs(5),
209        }
210    }
211}
212
213#[cfg(feature = "std")]
214struct SupervisorInner {
215    cfg: SupervisorConfig,
216    shutdown: Shutdown,
217    children: Mutex<Vec<Child>>,
218}
219
220/// A supervised child.
221#[cfg(feature = "std")]
222#[derive(Debug)]
223struct Child {
224    name: String,
225    join: JoinHandle<Result<()>>,
226}
227
228/// A minimal supervisor that coordinates shutdown and tracks child threads.
229///
230/// **Blessed pattern:**
231/// - Create one [`Supervisor`] at process start.
232/// - Spawn long-running threads with [`Supervisor::spawn_thread`] (or
233///   [`Supervisor::spawn_restartable_thread`]), and exit the thread when shutdown is requested.
234/// - On shutdown (signal / admin request), call [`Supervisor::shutdown_and_join`].
235///
236/// ## Error semantics
237///
238/// - A child thread that returns `Err` causes [`Supervisor::join_all`] to return `Err`.
239/// - A child thread that panics also causes [`Supervisor::join_all`] to return `Err`.
240///   When multiple children fail, errors are aggregated into a single message.
241#[derive(Clone)]
242pub struct Supervisor {
243    #[cfg(feature = "std")]
244    inner: Arc<SupervisorInner>,
245
246    #[cfg(not(feature = "std"))]
247    _private: (),
248}
249
250impl Supervisor {
251    /// Create a new supervisor with the provided configuration.
252    ///
253    /// **Contract:** configuration is validated when spawning children.
254    pub fn new(cfg: SupervisorConfig) -> Self {
255        #[cfg(feature = "std")]
256        {
257            Self {
258                inner: Arc::new(SupervisorInner {
259                    cfg,
260                    shutdown: Shutdown::new(),
261                    children: Mutex::new(Vec::new()),
262                }),
263            }
264        }
265
266        #[cfg(not(feature = "std"))]
267        {
268            let _ = cfg;
269            Self { _private: () }
270        }
271    }
272
273    /// Get the supervisor configuration.
274    pub fn config(&self) -> SupervisorConfig {
275        #[cfg(feature = "std")]
276        {
277            self.inner.cfg
278        }
279
280        #[cfg(not(feature = "std"))]
281        {
282            SupervisorConfig::default()
283        }
284    }
285
286    /// Access the supervisor's shutdown coordinator.
287    pub fn shutdown(&self) -> Shutdown {
288        #[cfg(feature = "std")]
289        {
290            Shutdown {
291                inner: self.inner.shutdown.inner.clone(),
292            }
293        }
294
295        #[cfg(not(feature = "std"))]
296        {
297            Shutdown::new()
298        }
299    }
300
301    /// Convenience accessor for the supervisor shutdown token.
302    pub fn token(&self) -> ShutdownToken {
303        self.shutdown().token()
304    }
305
306    /// Validate the current configuration.
307    ///
308    /// **Errors:** returns [`Error`] if the configuration is internally inconsistent.
309    pub fn validate(&self) -> Result<()> {
310        let cfg = self.config();
311        if cfg.max_restarts == 0 {
312            return Err(Error::msg("max_restarts must be non-zero"));
313        }
314        if cfg.restart_backoff_initial.is_zero() {
315            return Err(Error::msg("restart_backoff_initial must be non-zero"));
316        }
317        if cfg.restart_backoff_max < cfg.restart_backoff_initial {
318            return Err(Error::msg(
319                "restart_backoff_max must be >= restart_backoff_initial",
320            ));
321        }
322        Ok(())
323    }
324
325    /// Spawn a supervised thread.
326    ///
327    /// **Contract:** the closure should return when it observes shutdown.
328    ///
329    /// **Errors:**
330    /// - Returns an error if configuration validation fails.
331    /// - Returns an error if the OS thread cannot be spawned.
332    ///
333    /// If the closure returns `Err`, [`Supervisor::join_all`] will return `Err`.
334    #[cfg(feature = "std")]
335    pub fn spawn_thread<F>(&self, name: impl Into<String>, f: F) -> Result<()>
336    where
337        F: FnOnce(ShutdownToken) -> Result<()> + Send + 'static,
338    {
339        self.validate()?;
340
341        let name = name.into();
342        let token = self.token();
343        let join = thread::Builder::new()
344            .name(name.clone())
345            .spawn(move || f(token))
346            .map_err(|e| Error::msg(format!("failed to spawn thread '{name}': {e}")))?;
347
348        self.inner
349            .children
350            .lock()
351            .expect("children mutex poisoned")
352            .push(Child { name, join });
353        Ok(())
354    }
355
356    /// Spawn a supervised thread that will be restarted with backoff on error.
357    ///
358    /// **Contract:**
359    /// - The child exits successfully if shutdown is requested.
360    /// - If the closure returns `Err` and shutdown is not requested, the supervisor sleeps for a
361    ///   backoff duration and retries.
362    ///
363    /// **Errors:**
364    /// - Returns an error if configuration validation fails.
365    /// - Returns an error if the OS thread cannot be spawned.
366    /// - Returns an error if the child exceeds `max_restarts`.
367    #[cfg(feature = "std")]
368    pub fn spawn_restartable_thread<F>(&self, name: impl Into<String>, f: F) -> Result<()>
369    where
370        F: Fn(ShutdownToken) -> Result<()> + Send + Sync + 'static,
371    {
372        self.validate()?;
373
374        let name = name.into();
375        let token = self.token();
376        let cfg = self.config();
377
378        let f = Arc::new(f);
379        let name2 = name.clone();
380        let join = thread::Builder::new()
381            .name(name.clone())
382            .spawn(move || {
383                let mut restarts: u32 = 0;
384                let mut backoff = cfg.restart_backoff_initial;
385
386                loop {
387                    if token.is_shutdown() {
388                        return Ok(());
389                    }
390
391                    match (f)(token.clone()) {
392                        Ok(()) => return Ok(()),
393                        Err(e) => {
394                            if token.is_shutdown() {
395                                return Ok(());
396                            }
397
398                            if restarts >= cfg.max_restarts {
399                                return Err(Error::msg(format!(
400                                    "restartable child '{name2}' exceeded max_restarts ({}) after error: {e}",
401                                    cfg.max_restarts
402                                )));
403                            }
404
405                            restarts = restarts.saturating_add(1);
406                            thread::sleep(backoff);
407                            backoff = (backoff + backoff).min(cfg.restart_backoff_max);
408                        }
409                    }
410                }
411            })
412            .map_err(|e| Error::msg(format!("failed to spawn thread '{name}': {e}")))?;
413
414        self.inner
415            .children
416            .lock()
417            .expect("children mutex poisoned")
418            .push(Child { name, join });
419        Ok(())
420    }
421
422    /// Join all supervised children.
423    ///
424    /// **Contract:** this drains the supervisor's child list. A subsequent call will join zero
425    /// children and return `Ok(())`.
426    ///
427    /// **Errors:**
428    /// - If any child returns an error, returns an aggregated [`Error`].
429    /// - If any child panics, returns an aggregated [`Error`].
430    #[cfg(feature = "std")]
431    pub fn join_all(&self) -> Result<()> {
432        let mut children = self.inner.children.lock().expect("children mutex poisoned");
433        let mut errs: Vec<String> = Vec::new();
434
435        for child in children.drain(..) {
436            match child.join.join() {
437                Ok(Ok(())) => {}
438                Ok(Err(e)) => errs.push(format!("{}: {e}", child.name)),
439                Err(panic) => errs.push(format!("{}: {}", child.name, Error::panic_message(panic))),
440            }
441        }
442
443        if errs.is_empty() {
444            Ok(())
445        } else if errs.len() == 1 {
446            Err(Error::msg(errs.remove(0)))
447        } else {
448            Err(Error::msg(format!(
449                "multiple child failures:\n- {}",
450                errs.join("\n- ")
451            )))
452        }
453    }
454
455    /// Convenience: trigger shutdown and then join all children.
456    ///
457    /// **Contract:** this is equivalent to calling [`Shutdown::trigger`] and then
458    /// [`Supervisor::join_all`].
459    #[cfg(feature = "std")]
460    pub fn shutdown_and_join(&self) -> Result<()> {
461        let _ = self.shutdown().trigger();
462        self.join_all()
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[cfg(feature = "std")]
471    use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
472
473    #[cfg(feature = "std")]
474    use std::sync::Arc;
475
476    #[test]
477    fn shutdown_token_observes_trigger() {
478        let shutdown = Shutdown::new();
479        let token = shutdown.token();
480
481        assert!(!token.is_shutdown());
482        assert!(shutdown.trigger());
483        assert!(token.is_shutdown());
484    }
485
486    #[test]
487    fn shutdown_trigger_is_idempotent() {
488        let shutdown = Shutdown::new();
489        let token = shutdown.token();
490
491        assert!(shutdown.trigger());
492        assert!(!shutdown.trigger());
493        assert!(token.is_shutdown());
494    }
495
496    #[cfg(feature = "std")]
497    #[test]
498    fn shutdown_token_wait_unblocks() {
499        let shutdown = Shutdown::new();
500        let token = shutdown.token();
501
502        let t = std::thread::spawn(move || {
503            token.wait();
504        });
505
506        std::thread::sleep(Duration::from_millis(10));
507        shutdown.trigger();
508        t.join().unwrap();
509    }
510
511    #[test]
512    fn supervisor_validate_rejects_zero_restarts() {
513        let s = Supervisor::new(SupervisorConfig {
514            max_restarts: 0,
515            ..SupervisorConfig::default()
516        });
517        assert!(s.validate().is_err());
518    }
519
520    #[cfg(feature = "std")]
521    #[test]
522    fn supervisor_tracks_and_joins_children() {
523        let sup = Supervisor::new(SupervisorConfig::default());
524        let token = sup.token();
525
526        let seen = Arc::new(AtomicU32::new(0));
527        let seen2 = seen.clone();
528        sup.spawn_thread("child", move |t| {
529            while !t.is_shutdown() {
530                std::hint::spin_loop();
531            }
532            seen2.store(1, AtomicOrdering::Release);
533            Ok(())
534        })
535        .unwrap();
536
537        assert!(!token.is_shutdown());
538        sup.shutdown().trigger();
539        sup.join_all().unwrap();
540        assert_eq!(seen.load(AtomicOrdering::Acquire), 1);
541    }
542
543    #[cfg(feature = "std")]
544    #[test]
545    fn restartable_thread_restarts_then_succeeds() {
546        let sup = Supervisor::new(SupervisorConfig {
547            max_restarts: 3,
548            restart_backoff_initial: Duration::from_millis(1),
549            restart_backoff_max: Duration::from_millis(5),
550        });
551
552        let attempts = Arc::new(AtomicU32::new(0));
553        let attempts2 = attempts.clone();
554
555        sup.spawn_restartable_thread("restartable", move |_t| {
556            let n = attempts2.fetch_add(1, AtomicOrdering::AcqRel);
557            if n < 2 {
558                return Err(Error::msg("boom"));
559            }
560            Ok(())
561        })
562        .unwrap();
563
564        sup.join_all().unwrap();
565        assert!(attempts.load(AtomicOrdering::Acquire) >= 3);
566    }
567
568    #[cfg(feature = "std")]
569    #[test]
570    fn restartable_thread_stops_after_exceeding_max_restarts() {
571        let sup = Supervisor::new(SupervisorConfig {
572            max_restarts: 1,
573            restart_backoff_initial: Duration::from_millis(1),
574            restart_backoff_max: Duration::from_millis(1),
575        });
576
577        let attempts = Arc::new(AtomicU32::new(0));
578        let attempts2 = attempts.clone();
579
580        sup.spawn_restartable_thread("restartable", move |_t| {
581            attempts2.fetch_add(1, AtomicOrdering::AcqRel);
582            Err(Error::msg("boom"))
583        })
584        .unwrap();
585
586        let err = sup.join_all().unwrap_err();
587        assert!(err.to_string().contains("exceeded max_restarts"));
588        assert_eq!(attempts.load(AtomicOrdering::Acquire), 2);
589    }
590
591    #[cfg(feature = "std")]
592    #[test]
593    fn restartable_thread_exits_cleanly_on_shutdown() {
594        let sup = Supervisor::new(SupervisorConfig {
595            max_restarts: 100,
596            restart_backoff_initial: Duration::from_millis(1),
597            restart_backoff_max: Duration::from_millis(2),
598        });
599
600        // This child always errors when invoked; the supervisor loop should exit Ok once
601        // shutdown is observed.
602        sup.spawn_restartable_thread("restartable", move |_t| Err(Error::msg("boom")))
603            .unwrap();
604
605        sup.shutdown().trigger();
606        sup.join_all().unwrap();
607    }
608
609    #[cfg(feature = "std")]
610    #[test]
611    fn join_all_aggregates_errors_and_panics() {
612        let sup = Supervisor::new(SupervisorConfig::default());
613
614        sup.spawn_thread("err", move |_t| Err(Error::msg("nope")))
615            .unwrap();
616        sup.spawn_thread("panic", move |_t| -> Result<()> { panic!("boom") })
617            .unwrap();
618
619        let err = sup.join_all().unwrap_err();
620        let msg = err.to_string();
621        assert!(
622            msg.contains("multiple child failures")
623                || msg.contains("err:")
624                || msg.contains("panic:")
625        );
626        assert!(msg.contains("err"));
627        assert!(msg.contains("panic"));
628    }
629
630    #[cfg(feature = "std")]
631    #[test]
632    fn shutdown_token_wait_timeout_times_out() {
633        let shutdown = Shutdown::new();
634        let token = shutdown.token();
635        assert!(!token.wait_timeout(Duration::from_millis(5)));
636    }
637}