indexbus_kit/lanes/routing/
router.rs

1use std::path::PathBuf;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4use std::thread;
5use std::time::Duration;
6
7use crate::errors::{Error, Result};
8
9use indexbus_core::{RouterMode, RouterSource, SpinWait, StdBackoff};
10use indexbus_route::{route_until, route_until_reporting, BackpressurePolicy, RouterLoopConfig};
11
12use super::open_shm_fanout_region;
13
14/// Handle to a spawned router loop thread.
15///
16/// The thread runs `indexbus-route`'s router loop until `stop()` is called or the loop returns.
17pub struct RouterThread {
18    stop: Arc<AtomicBool>,
19    handle: thread::JoinHandle<Result<indexbus_route::RouterCounters>>,
20}
21
22impl RouterThread {
23    /// Request the router loop to stop at the next check point.
24    pub fn stop(&self) {
25        self.stop.store(true, Ordering::Relaxed);
26    }
27
28    /// Join the router thread and return its final counters.
29    ///
30    /// Returns an error if the thread panicked.
31    pub fn join(self) -> Result<indexbus_route::RouterCounters> {
32        self.handle
33            .join()
34            .map_err(|_| Error::msg("router thread panicked"))?
35    }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39/// Wait strategy used by the router loop while polling.
40pub enum WaitStrategy {
41    /// Use a standard exponential-backoff style wait.
42    StdBackoff,
43    /// Spin (busy-wait) while polling.
44    Spin,
45}
46
47/// Options controlling router thread setup and loop behavior.
48pub struct RouterThreadOptions {
49    /// If true, open the region in blocking mode (requires wake sections in layouts).
50    pub blocking_open: bool,
51    /// Which wait strategy to use while routing.
52    pub wait: WaitStrategy,
53    /// If set, report counters periodically.
54    pub report_period: Option<Duration>,
55    /// Optional callback invoked with periodic counter snapshots.
56    pub report: Option<Arc<dyn Fn(indexbus_route::RouterCounters) + Send + Sync + 'static>>,
57    /// Optional hook called inside the router thread before opening the SHM region.
58    pub pre_run: Option<Arc<dyn Fn() -> Result<()> + Send + Sync + 'static>>,
59}
60
61impl Default for RouterThreadOptions {
62    fn default() -> Self {
63        Self {
64            blocking_open: false,
65            wait: WaitStrategy::StdBackoff,
66            report_period: None,
67            report: None,
68            pre_run: None,
69        }
70    }
71}
72
73/// Spawn a router thread using a caller-provided stop flag.
74///
75/// This is the lowest-level thread spawner: callers can share/coordinate `stop` across
76/// multiple threads.
77pub fn spawn_shm_router_with_stop<const N: usize>(
78    path: PathBuf,
79    cfg: RouterLoopConfig,
80    options: RouterThreadOptions,
81    stop: Arc<AtomicBool>,
82) -> Result<RouterThread> {
83    let stop2 = stop.clone();
84
85    let join = thread::spawn(move || -> Result<indexbus_route::RouterCounters> {
86        if let Some(pre_run) = &options.pre_run {
87            pre_run()?;
88        }
89
90        let mut region = open_shm_fanout_region::<N>(&path, options.blocking_open)?;
91        let (_p, router, _c0) = region.handles(0).map_err(Error::from)?;
92
93        let should_stop = || stop2.load(Ordering::Relaxed);
94
95        let counters = match (options.wait, options.report_period, options.report) {
96            (WaitStrategy::Spin, Some(period), Some(report)) => {
97                let report2 = move |c| (report)(c);
98                route_until_reporting(
99                    &router,
100                    cfg,
101                    &mut SpinWait::default(),
102                    should_stop,
103                    period,
104                    report2,
105                )
106            }
107            (WaitStrategy::StdBackoff, Some(period), Some(report)) => {
108                let report2 = move |c| (report)(c);
109                route_until_reporting(
110                    &router,
111                    cfg,
112                    &mut StdBackoff::default(),
113                    should_stop,
114                    period,
115                    report2,
116                )
117            }
118            (WaitStrategy::Spin, _, _) => {
119                route_until(&router, cfg, &mut SpinWait::default(), should_stop)
120            }
121            (WaitStrategy::StdBackoff, _, _) => {
122                route_until(&router, cfg, &mut StdBackoff::default(), should_stop)
123            }
124        };
125
126        drop(region);
127        Ok(counters)
128    });
129
130    Ok(RouterThread { stop, handle: join })
131}
132
133/// Spawn a router thread with the provided loop config and options.
134///
135/// The returned handle contains an internal stop flag which can be triggered via `RouterThread::stop`.
136pub fn spawn_shm_router_with_config<const N: usize>(
137    path: PathBuf,
138    cfg: RouterLoopConfig,
139    options: RouterThreadOptions,
140) -> Result<RouterThread> {
141    let stop = Arc::new(AtomicBool::new(false));
142    spawn_shm_router_with_stop::<N>(path, cfg, options, stop)
143}
144
145/// Spawn a router thread with a simple default loop config.
146///
147/// This is a convenience wrapper; for fine-grained control use `spawn_shm_router_with_config`.
148pub fn spawn_shm_router<const N: usize>(
149    path: PathBuf,
150    source: RouterSource,
151    mode: RouterMode,
152    policy: BackpressurePolicy,
153) -> Result<RouterThread> {
154    let cfg = RouterLoopConfig {
155        source,
156        mode,
157        policy,
158
159        credit: None,
160
161        batch_max: 1,
162        batch_time_us: None,
163        yield_every: 0,
164        idle_spin_limit: 0,
165    };
166
167    spawn_shm_router_with_config::<N>(path, cfg, RouterThreadOptions::default())
168}