indexbus_route/lib.rs
1#![cfg_attr(not(feature = "std"), no_std)]
2#![deny(unreachable_pub, rust_2018_idioms)]
3#![deny(missing_docs)]
4
5//! Router runner (std-only adapter) for IndexBus fanout regions.
6//!
7//! `indexbus-core` provides the lock-free data structures and a nonblocking
8//! [`indexbus_core::FanoutRouter`] handle that can route a single message (`route_once_with`).
9//! This crate adds a hardened, reusable **routing loop** and an optional CLI binary.
10//!
11//! ## Router-as-single-writer
12//!
13//! The routing loop is intended to be the primary **policy engine**: a single router
14//! thread/process owns the routing decisions and performs consumer enqueues.
15//!
16//! Operational guidance (v1):
17//! - Run **one router per region** (and ideally one per NUMA node) to avoid multi-writer effects
18//! on the consumer queues.
19//! - Producers only enqueue into the producer→router queue; consumers only dequeue from their
20//! consumer queues.
21//!
22//! ## Batching semantics
23//!
24//! The routing loop supports bounded batching via [`RouterLoopConfig::batch_max`] and (with the
25//! `std` feature) an optional [`RouterLoopConfig::batch_time_us`] cap.
26//!
27//! - Ordering is preserved **within a given source stream** (the router dequeues from the source
28//! queue in order and routes sequentially).
29//! - Cross-producer ordering is explicitly not guaranteed when using the MPSC source.
30//!
31//! ## v1 conformance mapping
32//!
33//! This crate participates in the workspace-level v1 conformance suite.
34//! The mapping from spec checklist items to concrete tests lives in:
35//! - `docs/guides/v1-conformance-tests.md`
36//!
37//! When changing router-loop policy semantics (credits, backpressure, wake/blocking), keep the
38//! mapping and the corresponding tests under `crates/indexbus-route/tests/` up to date.
39//!
40//! ## Minimal routing loop
41//!
42//! ```
43//! use indexbus_abi::layouts::SharedFanoutLayout;
44//! use indexbus_core::{fanout_handles, init_shared_fanout_layout, SharedFanoutLayoutCell};
45//! use indexbus_core::{RouterMode, RouterSource, SpinWait};
46//! use indexbus_route::{route_until, BackpressurePolicy, RouterLoopConfig};
47//!
48//! const N: usize = 1;
49//!
50//! // Allocate the shared layout on the heap (large type).
51//! let mut shared: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
52//! init_shared_fanout_layout::<N>(&mut shared);
53//! let cell = SharedFanoutLayoutCell::from_mut(&mut shared);
54//! let (producer, router, consumer) = fanout_handles::<N>(&cell, 0).unwrap();
55//!
56//! // Producer publishes into the producer→router queue.
57//! producer.publish(b"hi").unwrap();
58//!
59//! let cfg = RouterLoopConfig {
60//! source: RouterSource::Spsc,
61//! mode: RouterMode::Broadcast,
62//! policy: BackpressurePolicy::Drop,
63//! batch_max: 1,
64//! ..Default::default()
65//! };
66//! let mut wait = SpinWait::new(10_000);
67//!
68//! // Route until we've observed one consumer message.
69//! let mut out = [0u8; 64];
70//! let mut got_n: Option<usize> = None;
71//! let _counters = route_until::<N, _>(&router, cfg, &mut wait, || {
72//! if let Ok(Some(n)) = consumer.try_recv_into(&mut out) {
73//! got_n = Some(n);
74//! return true;
75//! }
76//! false
77//! });
78//!
79//! assert_eq!(&out[..got_n.unwrap()], b"hi");
80//! ```
81
82#[cfg(feature = "std")]
83extern crate std;
84
85mod internal;
86
87pub use internal::router::BackpressurePolicy;
88pub use internal::router::{
89 route_until, route_until_with_credit_stats, CreditConfig, CreditPolicy, CreditStats,
90 RouterCounters, RouterLoopConfig,
91};
92
93#[cfg(feature = "std")]
94pub use internal::router::route_for;
95
96#[cfg(feature = "std")]
97pub use internal::router::route_until_reporting;
98
99#[cfg(test)]
100mod tests;