indexbus_platform_core/
lifecycle.rs1use crate::errors::{Error, Result};
18
19use core::time::Duration;
20
21#[cfg(feature = "std")]
22use std::{
23 sync::{
24 atomic::{AtomicBool, Ordering},
25 Arc, Condvar, Mutex,
26 },
27 thread,
28 thread::JoinHandle,
29};
30
31#[cfg(feature = "std")]
32#[derive(Debug)]
33struct ShutdownInner {
34 flag: AtomicBool,
35 mu: Mutex<()>,
36 cv: Condvar,
37}
38
39#[cfg(feature = "std")]
40impl ShutdownInner {
41 fn new() -> Self {
42 Self {
43 flag: AtomicBool::new(false),
44 mu: Mutex::new(()),
45 cv: Condvar::new(),
46 }
47 }
48}
49
50#[derive(Clone, Debug)]
58pub struct ShutdownToken {
59 #[cfg(feature = "std")]
60 inner: Arc<ShutdownInner>,
61}
62
63impl ShutdownToken {
64 pub fn is_shutdown(&self) -> bool {
68 #[cfg(feature = "std")]
69 {
70 self.inner.flag.load(Ordering::Acquire)
71 }
72
73 #[cfg(not(feature = "std"))]
74 {
75 false
76 }
77 }
78
79 #[cfg(feature = "std")]
83 pub fn wait(&self) {
84 if self.is_shutdown() {
85 return;
86 }
87
88 let guard = self.inner.mu.lock().expect("shutdown mutex poisoned");
89 let _guard = self
90 .inner
91 .cv
92 .wait_while(guard, |_| !self.is_shutdown())
93 .expect("shutdown mutex poisoned");
94 }
95
96 #[cfg(feature = "std")]
102 pub fn wait_timeout(&self, timeout: Duration) -> bool {
103 if self.is_shutdown() {
104 return true;
105 }
106
107 let guard = self.inner.mu.lock().expect("shutdown mutex poisoned");
108 let (_guard, _timeout) = self
109 .inner
110 .cv
111 .wait_timeout_while(guard, timeout, |_| !self.is_shutdown())
112 .expect("shutdown mutex poisoned");
113 self.is_shutdown()
114 }
115}
116
117pub struct Shutdown {
125 #[cfg(feature = "std")]
126 inner: Arc<ShutdownInner>,
127}
128
129impl Shutdown {
130 pub fn new() -> Self {
132 #[cfg(feature = "std")]
133 {
134 let inner = Arc::new(ShutdownInner::new());
135 Self { inner }
136 }
137
138 #[cfg(not(feature = "std"))]
139 {
140 Self {}
141 }
142 }
143
144 pub fn token(&self) -> ShutdownToken {
146 #[cfg(feature = "std")]
147 {
148 ShutdownToken {
149 inner: self.inner.clone(),
150 }
151 }
152
153 #[cfg(not(feature = "std"))]
154 {
155 ShutdownToken {}
156 }
157 }
158
159 pub fn trigger(&self) -> bool {
165 #[cfg(feature = "std")]
166 {
167 let was = self.inner.flag.swap(true, Ordering::Release);
168
169 self.inner.cv.notify_all();
171
172 !was
173 }
174
175 #[cfg(not(feature = "std"))]
176 {
177 false
178 }
179 }
180}
181
182impl Default for Shutdown {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188#[derive(Debug, Clone, Copy)]
192pub struct SupervisorConfig {
193 pub max_restarts: u32,
195
196 pub restart_backoff_initial: Duration,
198
199 pub restart_backoff_max: Duration,
201}
202
203impl Default for SupervisorConfig {
204 fn default() -> Self {
205 Self {
206 max_restarts: 10,
207 restart_backoff_initial: Duration::from_millis(50),
208 restart_backoff_max: Duration::from_secs(5),
209 }
210 }
211}
212
213#[cfg(feature = "std")]
214struct SupervisorInner {
215 cfg: SupervisorConfig,
216 shutdown: Shutdown,
217 children: Mutex<Vec<Child>>,
218}
219
220#[cfg(feature = "std")]
222#[derive(Debug)]
223struct Child {
224 name: String,
225 join: JoinHandle<Result<()>>,
226}
227
228#[derive(Clone)]
242pub struct Supervisor {
243 #[cfg(feature = "std")]
244 inner: Arc<SupervisorInner>,
245
246 #[cfg(not(feature = "std"))]
247 _private: (),
248}
249
250impl Supervisor {
251 pub fn new(cfg: SupervisorConfig) -> Self {
255 #[cfg(feature = "std")]
256 {
257 Self {
258 inner: Arc::new(SupervisorInner {
259 cfg,
260 shutdown: Shutdown::new(),
261 children: Mutex::new(Vec::new()),
262 }),
263 }
264 }
265
266 #[cfg(not(feature = "std"))]
267 {
268 let _ = cfg;
269 Self { _private: () }
270 }
271 }
272
273 pub fn config(&self) -> SupervisorConfig {
275 #[cfg(feature = "std")]
276 {
277 self.inner.cfg
278 }
279
280 #[cfg(not(feature = "std"))]
281 {
282 SupervisorConfig::default()
283 }
284 }
285
286 pub fn shutdown(&self) -> Shutdown {
288 #[cfg(feature = "std")]
289 {
290 Shutdown {
291 inner: self.inner.shutdown.inner.clone(),
292 }
293 }
294
295 #[cfg(not(feature = "std"))]
296 {
297 Shutdown::new()
298 }
299 }
300
301 pub fn token(&self) -> ShutdownToken {
303 self.shutdown().token()
304 }
305
306 pub fn validate(&self) -> Result<()> {
310 let cfg = self.config();
311 if cfg.max_restarts == 0 {
312 return Err(Error::msg("max_restarts must be non-zero"));
313 }
314 if cfg.restart_backoff_initial.is_zero() {
315 return Err(Error::msg("restart_backoff_initial must be non-zero"));
316 }
317 if cfg.restart_backoff_max < cfg.restart_backoff_initial {
318 return Err(Error::msg(
319 "restart_backoff_max must be >= restart_backoff_initial",
320 ));
321 }
322 Ok(())
323 }
324
325 #[cfg(feature = "std")]
335 pub fn spawn_thread<F>(&self, name: impl Into<String>, f: F) -> Result<()>
336 where
337 F: FnOnce(ShutdownToken) -> Result<()> + Send + 'static,
338 {
339 self.validate()?;
340
341 let name = name.into();
342 let token = self.token();
343 let join = thread::Builder::new()
344 .name(name.clone())
345 .spawn(move || f(token))
346 .map_err(|e| Error::msg(format!("failed to spawn thread '{name}': {e}")))?;
347
348 self.inner
349 .children
350 .lock()
351 .expect("children mutex poisoned")
352 .push(Child { name, join });
353 Ok(())
354 }
355
356 #[cfg(feature = "std")]
368 pub fn spawn_restartable_thread<F>(&self, name: impl Into<String>, f: F) -> Result<()>
369 where
370 F: Fn(ShutdownToken) -> Result<()> + Send + Sync + 'static,
371 {
372 self.validate()?;
373
374 let name = name.into();
375 let token = self.token();
376 let cfg = self.config();
377
378 let f = Arc::new(f);
379 let name2 = name.clone();
380 let join = thread::Builder::new()
381 .name(name.clone())
382 .spawn(move || {
383 let mut restarts: u32 = 0;
384 let mut backoff = cfg.restart_backoff_initial;
385
386 loop {
387 if token.is_shutdown() {
388 return Ok(());
389 }
390
391 match (f)(token.clone()) {
392 Ok(()) => return Ok(()),
393 Err(e) => {
394 if token.is_shutdown() {
395 return Ok(());
396 }
397
398 if restarts >= cfg.max_restarts {
399 return Err(Error::msg(format!(
400 "restartable child '{name2}' exceeded max_restarts ({}) after error: {e}",
401 cfg.max_restarts
402 )));
403 }
404
405 restarts = restarts.saturating_add(1);
406 thread::sleep(backoff);
407 backoff = (backoff + backoff).min(cfg.restart_backoff_max);
408 }
409 }
410 }
411 })
412 .map_err(|e| Error::msg(format!("failed to spawn thread '{name}': {e}")))?;
413
414 self.inner
415 .children
416 .lock()
417 .expect("children mutex poisoned")
418 .push(Child { name, join });
419 Ok(())
420 }
421
422 #[cfg(feature = "std")]
431 pub fn join_all(&self) -> Result<()> {
432 let mut children = self.inner.children.lock().expect("children mutex poisoned");
433 let mut errs: Vec<String> = Vec::new();
434
435 for child in children.drain(..) {
436 match child.join.join() {
437 Ok(Ok(())) => {}
438 Ok(Err(e)) => errs.push(format!("{}: {e}", child.name)),
439 Err(panic) => errs.push(format!("{}: {}", child.name, Error::panic_message(panic))),
440 }
441 }
442
443 if errs.is_empty() {
444 Ok(())
445 } else if errs.len() == 1 {
446 Err(Error::msg(errs.remove(0)))
447 } else {
448 Err(Error::msg(format!(
449 "multiple child failures:\n- {}",
450 errs.join("\n- ")
451 )))
452 }
453 }
454
455 #[cfg(feature = "std")]
460 pub fn shutdown_and_join(&self) -> Result<()> {
461 let _ = self.shutdown().trigger();
462 self.join_all()
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469
470 #[cfg(feature = "std")]
471 use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
472
473 #[cfg(feature = "std")]
474 use std::sync::Arc;
475
476 #[test]
477 fn shutdown_token_observes_trigger() {
478 let shutdown = Shutdown::new();
479 let token = shutdown.token();
480
481 assert!(!token.is_shutdown());
482 assert!(shutdown.trigger());
483 assert!(token.is_shutdown());
484 }
485
486 #[test]
487 fn shutdown_trigger_is_idempotent() {
488 let shutdown = Shutdown::new();
489 let token = shutdown.token();
490
491 assert!(shutdown.trigger());
492 assert!(!shutdown.trigger());
493 assert!(token.is_shutdown());
494 }
495
496 #[cfg(feature = "std")]
497 #[test]
498 fn shutdown_token_wait_unblocks() {
499 let shutdown = Shutdown::new();
500 let token = shutdown.token();
501
502 let t = std::thread::spawn(move || {
503 token.wait();
504 });
505
506 std::thread::sleep(Duration::from_millis(10));
507 shutdown.trigger();
508 t.join().unwrap();
509 }
510
511 #[test]
512 fn supervisor_validate_rejects_zero_restarts() {
513 let s = Supervisor::new(SupervisorConfig {
514 max_restarts: 0,
515 ..SupervisorConfig::default()
516 });
517 assert!(s.validate().is_err());
518 }
519
520 #[cfg(feature = "std")]
521 #[test]
522 fn supervisor_tracks_and_joins_children() {
523 let sup = Supervisor::new(SupervisorConfig::default());
524 let token = sup.token();
525
526 let seen = Arc::new(AtomicU32::new(0));
527 let seen2 = seen.clone();
528 sup.spawn_thread("child", move |t| {
529 while !t.is_shutdown() {
530 std::hint::spin_loop();
531 }
532 seen2.store(1, AtomicOrdering::Release);
533 Ok(())
534 })
535 .unwrap();
536
537 assert!(!token.is_shutdown());
538 sup.shutdown().trigger();
539 sup.join_all().unwrap();
540 assert_eq!(seen.load(AtomicOrdering::Acquire), 1);
541 }
542
543 #[cfg(feature = "std")]
544 #[test]
545 fn restartable_thread_restarts_then_succeeds() {
546 let sup = Supervisor::new(SupervisorConfig {
547 max_restarts: 3,
548 restart_backoff_initial: Duration::from_millis(1),
549 restart_backoff_max: Duration::from_millis(5),
550 });
551
552 let attempts = Arc::new(AtomicU32::new(0));
553 let attempts2 = attempts.clone();
554
555 sup.spawn_restartable_thread("restartable", move |_t| {
556 let n = attempts2.fetch_add(1, AtomicOrdering::AcqRel);
557 if n < 2 {
558 return Err(Error::msg("boom"));
559 }
560 Ok(())
561 })
562 .unwrap();
563
564 sup.join_all().unwrap();
565 assert!(attempts.load(AtomicOrdering::Acquire) >= 3);
566 }
567
568 #[cfg(feature = "std")]
569 #[test]
570 fn restartable_thread_stops_after_exceeding_max_restarts() {
571 let sup = Supervisor::new(SupervisorConfig {
572 max_restarts: 1,
573 restart_backoff_initial: Duration::from_millis(1),
574 restart_backoff_max: Duration::from_millis(1),
575 });
576
577 let attempts = Arc::new(AtomicU32::new(0));
578 let attempts2 = attempts.clone();
579
580 sup.spawn_restartable_thread("restartable", move |_t| {
581 attempts2.fetch_add(1, AtomicOrdering::AcqRel);
582 Err(Error::msg("boom"))
583 })
584 .unwrap();
585
586 let err = sup.join_all().unwrap_err();
587 assert!(err.to_string().contains("exceeded max_restarts"));
588 assert_eq!(attempts.load(AtomicOrdering::Acquire), 2);
589 }
590
591 #[cfg(feature = "std")]
592 #[test]
593 fn restartable_thread_exits_cleanly_on_shutdown() {
594 let sup = Supervisor::new(SupervisorConfig {
595 max_restarts: 100,
596 restart_backoff_initial: Duration::from_millis(1),
597 restart_backoff_max: Duration::from_millis(2),
598 });
599
600 sup.spawn_restartable_thread("restartable", move |_t| Err(Error::msg("boom")))
603 .unwrap();
604
605 sup.shutdown().trigger();
606 sup.join_all().unwrap();
607 }
608
609 #[cfg(feature = "std")]
610 #[test]
611 fn join_all_aggregates_errors_and_panics() {
612 let sup = Supervisor::new(SupervisorConfig::default());
613
614 sup.spawn_thread("err", move |_t| Err(Error::msg("nope")))
615 .unwrap();
616 sup.spawn_thread("panic", move |_t| -> Result<()> { panic!("boom") })
617 .unwrap();
618
619 let err = sup.join_all().unwrap_err();
620 let msg = err.to_string();
621 assert!(
622 msg.contains("multiple child failures")
623 || msg.contains("err:")
624 || msg.contains("panic:")
625 );
626 assert!(msg.contains("err"));
627 assert!(msg.contains("panic"));
628 }
629
630 #[cfg(feature = "std")]
631 #[test]
632 fn shutdown_token_wait_timeout_times_out() {
633 let shutdown = Shutdown::new();
634 let token = shutdown.token();
635 assert!(!token.wait_timeout(Duration::from_millis(5)));
636 }
637}