indexbus_kit/lanes/routing/
router.rs1use std::path::PathBuf;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4use std::thread;
5use std::time::Duration;
6
7use crate::errors::{Error, Result};
8
9use indexbus_core::{RouterMode, RouterSource, SpinWait, StdBackoff};
10use indexbus_route::{route_until, route_until_reporting, BackpressurePolicy, RouterLoopConfig};
11
12use super::open_shm_fanout_region;
13
14pub struct RouterThread {
18 stop: Arc<AtomicBool>,
19 handle: thread::JoinHandle<Result<indexbus_route::RouterCounters>>,
20}
21
22impl RouterThread {
23 pub fn stop(&self) {
25 self.stop.store(true, Ordering::Relaxed);
26 }
27
28 pub fn join(self) -> Result<indexbus_route::RouterCounters> {
32 self.handle
33 .join()
34 .map_err(|_| Error::msg("router thread panicked"))?
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum WaitStrategy {
41 StdBackoff,
43 Spin,
45}
46
47pub struct RouterThreadOptions {
49 pub blocking_open: bool,
51 pub wait: WaitStrategy,
53 pub report_period: Option<Duration>,
55 pub report: Option<Arc<dyn Fn(indexbus_route::RouterCounters) + Send + Sync + 'static>>,
57 pub pre_run: Option<Arc<dyn Fn() -> Result<()> + Send + Sync + 'static>>,
59}
60
61impl Default for RouterThreadOptions {
62 fn default() -> Self {
63 Self {
64 blocking_open: false,
65 wait: WaitStrategy::StdBackoff,
66 report_period: None,
67 report: None,
68 pre_run: None,
69 }
70 }
71}
72
73pub fn spawn_shm_router_with_stop<const N: usize>(
78 path: PathBuf,
79 cfg: RouterLoopConfig,
80 options: RouterThreadOptions,
81 stop: Arc<AtomicBool>,
82) -> Result<RouterThread> {
83 let stop2 = stop.clone();
84
85 let join = thread::spawn(move || -> Result<indexbus_route::RouterCounters> {
86 if let Some(pre_run) = &options.pre_run {
87 pre_run()?;
88 }
89
90 let mut region = open_shm_fanout_region::<N>(&path, options.blocking_open)?;
91 let (_p, router, _c0) = region.handles(0).map_err(Error::from)?;
92
93 let should_stop = || stop2.load(Ordering::Relaxed);
94
95 let counters = match (options.wait, options.report_period, options.report) {
96 (WaitStrategy::Spin, Some(period), Some(report)) => {
97 let report2 = move |c| (report)(c);
98 route_until_reporting(
99 &router,
100 cfg,
101 &mut SpinWait::default(),
102 should_stop,
103 period,
104 report2,
105 )
106 }
107 (WaitStrategy::StdBackoff, Some(period), Some(report)) => {
108 let report2 = move |c| (report)(c);
109 route_until_reporting(
110 &router,
111 cfg,
112 &mut StdBackoff::default(),
113 should_stop,
114 period,
115 report2,
116 )
117 }
118 (WaitStrategy::Spin, _, _) => {
119 route_until(&router, cfg, &mut SpinWait::default(), should_stop)
120 }
121 (WaitStrategy::StdBackoff, _, _) => {
122 route_until(&router, cfg, &mut StdBackoff::default(), should_stop)
123 }
124 };
125
126 drop(region);
127 Ok(counters)
128 });
129
130 Ok(RouterThread { stop, handle: join })
131}
132
133pub fn spawn_shm_router_with_config<const N: usize>(
137 path: PathBuf,
138 cfg: RouterLoopConfig,
139 options: RouterThreadOptions,
140) -> Result<RouterThread> {
141 let stop = Arc::new(AtomicBool::new(false));
142 spawn_shm_router_with_stop::<N>(path, cfg, options, stop)
143}
144
145pub fn spawn_shm_router<const N: usize>(
149 path: PathBuf,
150 source: RouterSource,
151 mode: RouterMode,
152 policy: BackpressurePolicy,
153) -> Result<RouterThread> {
154 let cfg = RouterLoopConfig {
155 source,
156 mode,
157 policy,
158
159 credit: None,
160
161 batch_max: 1,
162 batch_time_us: None,
163 yield_every: 0,
164 idle_spin_limit: 0,
165 };
166
167 spawn_shm_router_with_config::<N>(path, cfg, RouterThreadOptions::default())
168}