Expand description
Router runner (std-only adapter) for IndexBus fanout regions.
indexbus-core provides the lock-free data structures and a nonblocking
[indexbus_core::FanoutRouter] handle that can route a single message (route_once_with).
This crate adds a hardened, reusable routing loop and an optional CLI binary.
§Router-as-single-writer
The routing loop is intended to be the primary policy engine: a single router thread/process owns the routing decisions and performs consumer enqueues.
Operational guidance (v1):
- Run one router per region (and ideally one per NUMA node) to avoid multi-writer effects on the consumer queues.
- Producers only enqueue into the producer→router queue; consumers only dequeue from their consumer queues.
§Batching semantics
The routing loop supports bounded batching via RouterLoopConfig::batch_max and (with the
std feature) an optional RouterLoopConfig::batch_time_us cap.
- Ordering is preserved within a given source stream (the router dequeues from the source queue in order and routes sequentially).
- Cross-producer ordering is explicitly not guaranteed when using the MPSC source.
§v1 conformance mapping
This crate participates in the workspace-level v1 conformance suite. The mapping from spec checklist items to concrete tests lives in:
docs/guides/v1-conformance-tests.md
When changing router-loop policy semantics (credits, backpressure, wake/blocking), keep the
mapping and the corresponding tests under crates/indexbus-route/tests/ up to date.
§Minimal routing loop
use indexbus_abi::layouts::SharedFanoutLayout;
use indexbus_core::{fanout_handles, init_shared_fanout_layout, SharedFanoutLayoutCell};
use indexbus_core::{RouterMode, RouterSource, SpinWait};
use indexbus_route::{route_until, BackpressurePolicy, RouterLoopConfig};
const N: usize = 1;
// Allocate the shared layout on the heap (large type).
let mut shared: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
init_shared_fanout_layout::<N>(&mut shared);
let cell = SharedFanoutLayoutCell::from_mut(&mut shared);
let (producer, router, consumer) = fanout_handles::<N>(&cell, 0).unwrap();
// Producer publishes into the producer→router queue.
producer.publish(b"hi").unwrap();
let cfg = RouterLoopConfig {
source: RouterSource::Spsc,
mode: RouterMode::Broadcast,
policy: BackpressurePolicy::Drop,
batch_max: 1,
..Default::default()
};
let mut wait = SpinWait::new(10_000);
// Route until we've observed one consumer message.
let mut out = [0u8; 64];
let mut got_n: Option<usize> = None;
let _counters = route_until::<N, _>(&router, cfg, &mut wait, || {
if let Ok(Some(n)) = consumer.try_recv_into(&mut out) {
got_n = Some(n);
return true;
}
false
});
assert_eq!(&out[..got_n.unwrap()], b"hi");Structs§
- Credit
Config - Credit configuration (router-enforced credits).
- Credit
Stats - Per-consumer credit stats (router-enforced credits).
- Router
Counters - Counters returned by routing loops.
- Router
Loop Config - Configuration for a routing loop.
Enums§
- Backpressure
Policy - Backpressure policy to apply when routing to consumers.
- Credit
Policy - Credit policy (router-enforced credits).
Functions§
- route_
for - Run the router for a fixed duration.
- route_
until - Run a fanout router until
should_stop()returns true. - route_
until_ reporting - Run a fanout router until
should_stop()returns true, periodically reporting counters. - route_
until_ with_ credit_ stats - Run a fanout router until
should_stop()returns true, returning aggregate counters.