Code Examples

Working examples that demonstrate each IndexBus execution pattern.

Minimal Fanout Pipeline

The simplest IndexBus pipeline: one producer, one router, one consumer. The producer publishes 10,000 typed messages through a work-queue router, and the consumer drains them.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;

use indexbus::prelude::*;
use indexbus_codec_raw::RawPod;
use indexbus_core::{RouterMode, RouterSource};
use indexbus_route::BackpressurePolicy;
use indexbus_transport_shm::FanoutRegion;
use indexbus_typed::{meta, IndexbusMsgSpec};

const N: usize = 1; // one consumer

#[repr(C)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
struct Msg {
    seq: u64,
}
unsafe impl RawPod for Msg {}
impl IndexbusMsgSpec for Msg {
    const META: indexbus_typed::MsgMeta = meta(0xD0, 0x10, 1);
}

fn main() -> Result<()> {
    let kit = raw_kit();

    // 1. Create a SHM fanout region.
    let path = temp_mmap("ib_routing_lane");
    let _region: FanoutRegion<N> = open_shm_fanout_region(&path, false)?;

    // 2. Spawn a router (work-queue mode, drop on backpressure).
    let router = spawn_shm_router::<N>(
        path.clone(),
        RouterSource::Spsc,
        RouterMode::WorkQueue,
        BackpressurePolicy::Drop,
    )?;

    let stop = Arc::new(AtomicBool::new(false));

    // 3. Spawn a consumer thread.
    let consumer = {
        let stop = stop.clone();
        let path = path.clone();
        thread::spawn(move || -> Result<()> {
            let mut region: FanoutRegion<N> =
                open_shm_fanout_region(&path, false)?;
            let (_p, _r, c0) = region.handles(0).map_err(Error::from)?;
            let cons = kit.fanout_consumer(c0);

            let mut got: u64 = 0;
            while !stop.load(Ordering::Relaxed) {
                if let Some((_hdr, _msg)) =
                    cons.try_recv_owned_msg::<Msg>().map_err(Error::from)?
                {
                    got += 1;
                    if got >= 10_000 { break; }
                } else {
                    thread::yield_now();
                }
            }
            Ok(())
        })
    };

    // 4. Produce 10,000 messages.
    {
        let mut region: FanoutRegion<N> =
            open_shm_fanout_region(&path, false)?;
        let (p, _r, _c0) = region.handles(0).map_err(Error::from)?;
        let prod = kit.fanout_producer(p);

        for seq in 0..10_000u64 {
            let msg = Msg { seq };
            loop {
                match prod.publish_msg::<Msg>(&msg) {
                    Ok(()) => break,
                    Err(indexbus_typed::Error::Core(
                        indexbus_core::Error::Full,
                    )) => thread::yield_now(),
                    Err(e) => return Err(Error::from(e)),
                }
            }
        }
    }

    // 5. Clean up.
    consumer.join().map_err(|_| Error::msg("consumer panicked"))??;
    stop.store(true, Ordering::Relaxed);
    router.stop();
    let _ = router.join();
    remove_file_best_effort(&path);
    Ok(())
}

Run: cargo run -p indexbus --example p0_routing_lane_minimal

Edge Gateway Pipeline

A two-stage pipeline demonstrating multi-region fanout. Raw frames are ingested, distributed to decode workers via a work-queue router, then decoded messages are broadcast to sink consumers.

┌────────┐      ┌──────────┐      ┌─────────┐      ┌──────────┐      ┌──────┐
│Ingest  │─SPSC─│ Router   │─Work─│ Workers │─MPSC─│ Router   │─Bcast─│Sinks│
│(raw)   │      │(WorkQueue)│Queue│(decode) │      │(Broadcast)│      │(N=2)│
└────────┘      └──────────┘      └─────────┘      └──────────┘      └──────┘
     raw region                        decoded region
  • Raw region: SPSC producer → WorkQueue router → N decode workers.
  • Decoded region: Workers publish via MPSC → Broadcast router → N sink consumers.
  • Message types: RawFrame { seq: u64, adc: u16 }Decoded { seq: u64, milli_units: i32 }

Run: cargo run -p indexbus --example p0_edge_gateway_minimal

Full source: crates/indexbus/examples/p0_edge_gateway_minimal.rs (~220 lines)

Sequencer (Disruptor-Style Coordination)

Monotonic sequence claiming with per-consumer gating. The producer claims sequence numbers, publishes, and advances the cursor. The consumer waits on each sequence via a barrier and advances its gating position.

use indexbus::prelude::*;
use indexbus_core::WaitStrategy;
use indexbus_seq::{Error as SeqError, SequencerProducerConfig};

fn main() -> Result<()> {
    const N: usize = 1;
    let path = temp_mmap("indexbus_seq");
    let region = open_shm_sequencer_region::<N>(&path, false)?;
    let barrier = region.barrier_for_cursor().map_err(Error::from)?;

    // Producer: claim sequence numbers, publish, advance cursor.
    let prod = std::thread::spawn({
        let path = path.clone();
        move || -> Result<()> {
            let region = open_shm_sequencer_region::<N>(&path, false)?;
            let mut p = region
                .producer(SequencerProducerConfig::new(1024))
                .map_err(Error::from)?;
            let mut wait = indexbus_core::StdBackoff::default();

            for _ in 0..50_000u64 {
                let s = loop {
                    match p.try_claim_next() {
                        Ok(s) => break s,
                        Err(SeqError::Full) => { wait.wait(); continue; }
                        Err(e) => return Err(Error::from(e)),
                    }
                };
                p.publish(s).map_err(Error::from)?;
                wait.reset();
            }
            Ok(())
        }
    });

    // Consumer: wait for each sequence, advance gating.
    let mut c0 = region.consumer(0).map_err(Error::from)?;
    let mut wait = indexbus_core::StdBackoff::default();
    for _ in 0..50_000u64 {
        let want = c0.next_seq();
        barrier.wait_for(want, &mut wait);
        c0.advance(want).map_err(Error::from)?;
    }

    prod.join().map_err(|_| Error::msg("producer panicked"))??;
    remove_file_best_effort(&path);
    Ok(())
}

Run: cargo run -p indexbus --example p1_sequencer_minimal --release

Journal (Append-Only Log with Tail/Replay)

Append 10 log records and tail them as they appear. The publisher writes to a segmented journal region, and the subscriber polls for new records.

use std::thread;
use std::time::Duration;
use indexbus::prelude::*;
use indexbus_log::JournalPublisherConfig;

fn main() -> Result<()> {
    let path = temp_mmap("ib_journal");
    let mut region = open_shm_journal_region4(&path, true, true)?;

    let mut pubr = region
        .publisher(JournalPublisherConfig::default())
        .map_err(Error::from)?;
    let mut sub = region
        .subscriber_with_config(0, 0, subscriber_cfg(2))
        .map_err(Error::from)?;

    // Publisher: append 10 log records.
    let pub_thread = thread::spawn(move || -> Result<()> {
        for i in 0..10u32 {
            let line = format!("INFO i={i} msg=hello-from-indexbus");
            pubr.try_append(line.as_bytes()).map_err(Error::from)?;
            thread::sleep(Duration::from_millis(25));
        }
        Ok(())
    });

    // Subscriber: tail records as they appear.
    let mut buf = vec![0u8; 256];
    for _ in 0..10 {
        let rec = tail_blocking(&mut sub, &mut buf, Some(Duration::from_secs(1)))?
            .ok_or_else(|| Error::msg("timeout"))?;
        println!("tail: {}", String::from_utf8_lossy(rec));
    }

    pub_thread.join().map_err(|_| Error::msg("publisher panicked"))??;
    drop(region);
    remove_file_best_effort(&path);
    Ok(())
}

Run: cargo run -p indexbus --example p1_journal_minimal

Provenance
Need the canonical source?
Use the public hub to orient yourself, then jump to repo-owned docs or rustdoc when you need contract-level detail.