Crate indexbus_route

Crate indexbus_route 

Source
Expand description

Router runner (std-only adapter) for IndexBus fanout regions.

indexbus-core provides the lock-free data structures and a nonblocking [indexbus_core::FanoutRouter] handle that can route a single message (route_once_with). This crate adds a hardened, reusable routing loop and an optional CLI binary.

§Router-as-single-writer

The routing loop is intended to be the primary policy engine: a single router thread/process owns the routing decisions and performs consumer enqueues.

Operational guidance (v1):

  • Run one router per region (and ideally one per NUMA node) to avoid multi-writer effects on the consumer queues.
  • Producers only enqueue into the producer→router queue; consumers only dequeue from their consumer queues.

§Batching semantics

The routing loop supports bounded batching via RouterLoopConfig::batch_max and (with the std feature) an optional RouterLoopConfig::batch_time_us cap.

  • Ordering is preserved within a given source stream (the router dequeues from the source queue in order and routes sequentially).
  • Cross-producer ordering is explicitly not guaranteed when using the MPSC source.

§v1 conformance mapping

This crate participates in the workspace-level v1 conformance suite. The mapping from spec checklist items to concrete tests lives in:

  • docs/guides/v1-conformance-tests.md

When changing router-loop policy semantics (credits, backpressure, wake/blocking), keep the mapping and the corresponding tests under crates/indexbus-route/tests/ up to date.

§Minimal routing loop

use indexbus_abi::layouts::SharedFanoutLayout;
use indexbus_core::{fanout_handles, init_shared_fanout_layout, SharedFanoutLayoutCell};
use indexbus_core::{RouterMode, RouterSource, SpinWait};
use indexbus_route::{route_until, BackpressurePolicy, RouterLoopConfig};

const N: usize = 1;

// Allocate the shared layout on the heap (large type).
let mut shared: Box<SharedFanoutLayout<N>> = Box::new(unsafe { core::mem::zeroed() });
init_shared_fanout_layout::<N>(&mut shared);
let cell = SharedFanoutLayoutCell::from_mut(&mut shared);
let (producer, router, consumer) = fanout_handles::<N>(&cell, 0).unwrap();

// Producer publishes into the producer→router queue.
producer.publish(b"hi").unwrap();

let cfg = RouterLoopConfig {
    source: RouterSource::Spsc,
    mode: RouterMode::Broadcast,
    policy: BackpressurePolicy::Drop,
    batch_max: 1,
    ..Default::default()
};
let mut wait = SpinWait::new(10_000);

// Route until we've observed one consumer message.
let mut out = [0u8; 64];
let mut got_n: Option<usize> = None;
let _counters = route_until::<N, _>(&router, cfg, &mut wait, || {
    if let Ok(Some(n)) = consumer.try_recv_into(&mut out) {
        got_n = Some(n);
        return true;
    }
    false
});

assert_eq!(&out[..got_n.unwrap()], b"hi");

Structs§

CreditConfig
Credit configuration (router-enforced credits).
CreditStats
Per-consumer credit stats (router-enforced credits).
RouterCounters
Counters returned by routing loops.
RouterLoopConfig
Configuration for a routing loop.

Enums§

BackpressurePolicy
Backpressure policy to apply when routing to consumers.
CreditPolicy
Credit policy (router-enforced credits).

Functions§

route_for
Run the router for a fixed duration.
route_until
Run a fanout router until should_stop() returns true.
route_until_reporting
Run a fanout router until should_stop() returns true, periodically reporting counters.
route_until_with_credit_stats
Run a fanout router until should_stop() returns true, returning aggregate counters.