indexbus_route/internal/
router.rs

1use core::hint::spin_loop;
2use core::time::Duration;
3
4use core::sync::atomic::{AtomicU64, Ordering};
5
6use indexbus_abi::IndexbusAtomicU64;
7
8use indexbus_abi::layouts::{IndexQueue, MpscQueue, SharedFanoutLayout};
9use indexbus_abi::INDEXBUS_QUEUE_CAPACITY;
10use indexbus_blocking::fanout_router_ref_blocking;
11use indexbus_core::{FanoutRouter, RouterMode, RouterSource, WaitStrategy};
12
13/// Backpressure policy to apply when routing to consumers.
14///
15/// ## Contract
16///
17/// - Backpressure policies only affect the router loop (this crate). Producers and consumers are
18///   still nonblocking and may observe drops depending on mode and policy.
19/// - In v1, only [`RouterMode::WorkQueue`] can apply backpressure without changing the ABI layout.
20///   [`RouterMode::Broadcast`] remains best-effort and uses drop-on-full semantics.
21///
22/// ## Notes
23///
24/// - [`BackpressurePolicy::SpinThenBlock`] is a CPU-side wait strategy; it does not use wake
25///   sections.
26/// - [`BackpressurePolicy::Block`] uses optional v1 wake sections when available; if wake-backed
27///   waits fail at runtime, the loop falls back to the provided [`WaitStrategy`].
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum BackpressurePolicy {
30    /// Drop when a destination queue is full.
31    Drop,
32
33    /// When routing in `RouterMode::WorkQueue`, avoid dequeuing a message if all consumer queues
34    /// are full; instead, apply the configured wait strategy until at least one consumer has
35    /// capacity.
36    ///
37    /// Notes:
38    /// - This is a v1 best-effort policy; it does not use wake sections.
39    /// - In `RouterMode::Broadcast`, this policy behaves like `Drop` (broadcast cannot be made
40    ///   "all-or-nothing" without a different layout).
41    SpinThenBlock,
42
43    /// Use the optional v1 wake sections (`INDEXBUS_CAP_SUPPORTS_BLOCKING`) to block the router
44    /// thread when idle or when all consumers are full.
45    ///
46    /// Notes:
47    /// - Implemented only for `RouterMode::WorkQueue`.
48    /// - If wake sections are unavailable, this falls back to the configured wait strategy.
49    Block,
50}
51
52/// Credit policy (router-enforced credits).
53///
54/// Credits are an overload-control mechanism, not a reliability mechanism.
55///
56/// ## Contract
57///
58/// - Credit enforcement is implemented in the router loop; it does not change the underlying
59///   ABI layout.
60/// - Credits are based on an estimated per-consumer queue depth and are best-effort.
61/// - Credit policies are primarily intended for [`RouterMode::WorkQueue`].
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum CreditPolicy {
64    /// Drop when a message cannot be delivered due to credit exhaustion.
65    Drop,
66
67    /// Temporarily detach consumers that remain over the credit limit for long enough.
68    ///
69    /// Detached consumers are not considered for delivery until they catch up.
70    ///
71    /// Requires the `std` feature to measure elapsed time for the detach threshold. Without
72    /// `std`, this behaves like [`CreditPolicy::Park`] (no detaches will occur).
73    Detach,
74
75    /// When no consumers are eligible (work-queue only), wait instead of dropping.
76    Park,
77}
78
79/// Credit configuration (router-enforced credits).
80///
81/// The credit limit is an approximate in-flight depth bound per consumer. It is computed as a
82/// `tail - head` distance in the destination queue and may be stale.
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub struct CreditConfig {
85    /// Maximum in-flight depth allowed per consumer.
86    ///
87    /// Depth is estimated as `tail - head` of the consumer queue.
88    ///
89    /// Values of `0` are treated as `1`.
90    pub credit_max: u32,
91
92    /// Credit enforcement behavior when consumers exceed `credit_max`.
93    pub policy: CreditPolicy,
94
95    /// Optional detach threshold. Only meaningful with `CreditPolicy::Detach`.
96    ///
97    /// If set, consumers that remain over `credit_max` for longer than this duration are
98    /// detached (temporarily ignored).
99    ///
100    /// Only available with the `std` feature.
101    pub detach_after_ms: Option<u32>,
102}
103
104/// Configuration for a routing loop.
105///
106/// ## Batching
107///
108/// The loop routes messages sequentially. Batching repeats single-message routing up to
109/// [`batch_max`](RouterLoopConfig::batch_max) times per iteration; it does not reorder messages.
110///
111/// ## Feature notes
112///
113/// - [`batch_time_us`](RouterLoopConfig::batch_time_us) and
114///   [`yield_every`](RouterLoopConfig::yield_every) are only effective with the `std` feature.
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub struct RouterLoopConfig {
117    /// Producer queue to drain from (`Spsc` or `Mpsc`).
118    pub source: RouterSource,
119    /// Routing mode (`Broadcast` or `WorkQueue`).
120    pub mode: RouterMode,
121    /// Backpressure behavior when routing cannot make progress.
122    pub policy: BackpressurePolicy,
123
124    /// Optional router-enforced credit enforcement.
125    pub credit: Option<CreditConfig>,
126
127    /// Maximum number of messages to route per loop iteration.
128    ///
129    /// Notes:
130    /// - `0` is treated as `1`.
131    /// - Batching never reorders messages; it just repeats `route_once_with_stats`.
132    pub batch_max: u32,
133
134    /// Optional time cap for a single batch.
135    ///
136    /// This is best-effort and only enforced when the `std` feature is enabled.
137    pub batch_time_us: Option<u32>,
138
139    /// Fairness knob: yield the current thread every N loop iterations.
140    ///
141    /// Only effective with the `std` feature; otherwise ignored.
142    pub yield_every: u32,
143
144    /// When idle, spin up to this many times before invoking the wait strategy.
145    ///
146    /// This provides a bounded low-latency polling window without requiring a custom
147    /// `WaitStrategy` implementation.
148    pub idle_spin_limit: u32,
149}
150
151impl Default for RouterLoopConfig {
152    fn default() -> Self {
153        Self {
154            source: RouterSource::Spsc,
155            mode: RouterMode::Broadcast,
156            policy: BackpressurePolicy::Drop,
157
158            credit: None,
159
160            batch_max: 1,
161            batch_time_us: None,
162            yield_every: 0,
163            idle_spin_limit: 0,
164        }
165    }
166}
167
168/// Counters returned by routing loops.
169///
170/// These counters are best-effort observability signals. In particular, some drop attribution is
171/// only approximate for work-queue routing because the router may fail to find any eligible
172/// destination.
173#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
174pub struct RouterCounters {
175    /// Number of messages routed (i.e. consumed from the producer queue).
176    pub routed: u64,
177    /// Total number of consumer enqueues performed.
178    pub delivered: u64,
179    /// Total number of consumer enqueue failures.
180    pub dropped: u64,
181
182    /// Drops attributable to destination queue being full (best-effort, v1 fanout).
183    pub dropped_queue_full: u64,
184
185    /// Drops attributable to no eligible consumer having space (work-queue: all full).
186    pub dropped_all_full: u64,
187
188    /// Drops attributable to no eligible consumer having credit (best-effort).
189    pub dropped_no_credit: u64,
190
191    /// Number of times routing was throttled due to credit exhaustion.
192    pub credit_waits: u64,
193
194    /// Number of consumer detach events.
195    pub detaches: u64,
196    /// Number of times routing was throttled because consumer queues had no capacity.
197    pub pressure_waits: u64,
198    /// Number of times the loop observed "no work" and invoked the wait strategy.
199    pub idle_waits: u64,
200
201    /// Number of batches routed.
202    pub batches: u64,
203
204    /// Sum of batch sizes (messages per batch).
205    pub batch_sum: u64,
206
207    /// Maximum batch size observed.
208    pub batch_max: u32,
209
210    /// Number of OS-backed wake waits performed (when `BackpressurePolicy::Block` is active).
211    pub wake_waits: u64,
212
213    /// Number of OS-backed wake waits that timed out.
214    pub wake_timeouts: u64,
215}
216
217/// Per-consumer credit stats (router-enforced credits).
218///
219/// Notes:
220/// - For `RouterMode::WorkQueue`, `dropped_full` is not attributable to a single consumer when
221///   the router fails to find any eligible destination; in that case per-consumer drops are
222///   best-effort.
223/// - If `N > 64`, per-consumer masks are not computed and attribution is disabled.
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct CreditStats<const N: usize> {
226    /// Per-consumer count of delivered messages.
227    pub delivered: [u64; N],
228    /// Per-consumer drops attributable to credit exhaustion.
229    pub dropped_no_credit: [u64; N],
230    /// Per-consumer drops attributable to destination queues being full.
231    pub dropped_full: [u64; N],
232    /// Per-consumer detached state (for `CreditPolicy::Detach`).
233    pub detached: [bool; N],
234    /// Per-consumer number of detach events.
235    pub detach_count: [u64; N],
236    /// Per-consumer estimated queue depth (`tail - head`).
237    ///
238    /// This is sampled from shared state and may be stale.
239    pub depth: [u64; N],
240
241    #[cfg(feature = "std")]
242    #[doc(hidden)]
243    pub(crate) exhausted_since: [Option<std::time::Instant>; N],
244}
245
246impl<const N: usize> Default for CreditStats<N> {
247    fn default() -> Self {
248        Self {
249            delivered: [0; N],
250            dropped_no_credit: [0; N],
251            dropped_full: [0; N],
252            detached: [false; N],
253            detach_count: [0; N],
254            depth: [0; N],
255
256            #[cfg(feature = "std")]
257            exhausted_since: [None; N],
258        }
259    }
260}
261
262const PRESSURE_WAIT_TIMEOUT: Duration = Duration::from_millis(1);
263const IDLE_WAIT_TIMEOUT: Duration = Duration::from_millis(10);
264
265#[inline]
266fn load_u64(addr: *const IndexbusAtomicU64) -> u64 {
267    unsafe { (&*(addr as *const AtomicU64)).load(Ordering::Acquire) }
268}
269
270#[inline]
271fn index_queue_depth(q: &IndexQueue) -> u64 {
272    let head = load_u64(core::ptr::addr_of!(q.head));
273    let tail = load_u64(core::ptr::addr_of!(q.tail));
274    tail.wrapping_sub(head)
275}
276
277#[inline]
278fn mpsc_queue_depth(q: &MpscQueue) -> u64 {
279    let w = load_u64(core::ptr::addr_of!(q.write));
280    let r = load_u64(core::ptr::addr_of!(q.read));
281    w.wrapping_sub(r)
282}
283
284#[inline]
285fn has_pending_message<const N: usize>(
286    shared: *const SharedFanoutLayout<N>,
287    source: RouterSource,
288) -> bool {
289    let s = unsafe { &*shared };
290    match source {
291        RouterSource::Spsc => index_queue_depth(&s.producer_queue) != 0,
292        RouterSource::Mpsc => mpsc_queue_depth(&s.producer_queue_mpsc) != 0,
293    }
294}
295
296#[inline]
297fn any_consumer_has_space<const N: usize>(shared: *const SharedFanoutLayout<N>) -> bool {
298    if N == 0 {
299        return true;
300    }
301
302    let s = unsafe { &*shared };
303    let cap = INDEXBUS_QUEUE_CAPACITY as u64;
304    for q in &s.consumer_queues {
305        if index_queue_depth(q) < cap {
306            return true;
307        }
308    }
309    false
310}
311
312#[inline]
313fn consumer_depths<const N: usize>(shared: *const SharedFanoutLayout<N>) -> [u64; N] {
314    let s = unsafe { &*shared };
315    let mut out = [0u64; N];
316    for (i, q) in s.consumer_queues.iter().enumerate() {
317        out[i] = index_queue_depth(q);
318    }
319    out
320}
321
322#[inline]
323fn compute_credit_masks<const N: usize>(
324    shared: *const SharedFanoutLayout<N>,
325    credit_max: u32,
326    detached: &[bool; N],
327) -> (u64, u64, u64, [u64; N]) {
328    let depths = consumer_depths(shared);
329    if N == 0 {
330        return (0, 0, 0, depths);
331    }
332
333    let cap = INDEXBUS_QUEUE_CAPACITY as u64;
334    let mut eligible_mask: u64 = 0;
335    let mut no_credit_mask: u64 = 0;
336    let mut full_mask: u64 = 0;
337
338    if N > 64 {
339        // Best-effort: no masks for N>64.
340        return (0, 0, 0, depths);
341    }
342
343    let credit_max = credit_max as u64;
344    for (i, &is_detached) in detached.iter().enumerate() {
345        if is_detached {
346            continue;
347        }
348        let d = depths[i];
349        let bit = 1u64 << (i as u32);
350        if d >= cap {
351            full_mask |= bit;
352        } else if d >= credit_max {
353            no_credit_mask |= bit;
354        } else {
355            eligible_mask |= bit;
356        }
357    }
358
359    (eligible_mask, no_credit_mask, full_mask, depths)
360}
361
362#[inline]
363fn any_eligible_consumer_has_space_and_credit<const N: usize>(
364    shared: *const SharedFanoutLayout<N>,
365    credit_max: u32,
366    detached: &[bool; N],
367) -> bool {
368    if N == 0 {
369        return true;
370    }
371    let cap = INDEXBUS_QUEUE_CAPACITY as u64;
372    let credit_max = credit_max as u64;
373
374    let s = unsafe { &*shared };
375    for (i, &is_detached) in detached.iter().enumerate() {
376        if is_detached {
377            continue;
378        }
379        let d = index_queue_depth(&s.consumer_queues[i]);
380        if d < cap && d < credit_max {
381            return true;
382        }
383    }
384    false
385}
386
387#[inline]
388fn apply_masks_to_stats<const N: usize>(
389    stats: &mut CreditStats<N>,
390    delivered_mask: u64,
391    dropped_full_mask: u64,
392    dropped_no_credit_mask: u64,
393) {
394    if N > 64 {
395        return;
396    }
397    for i in 0..N {
398        let bit = 1u64 << (i as u32);
399        if (delivered_mask & bit) != 0 {
400            stats.delivered[i] = stats.delivered[i].wrapping_add(1);
401        }
402        if (dropped_full_mask & bit) != 0 {
403            stats.dropped_full[i] = stats.dropped_full[i].wrapping_add(1);
404        }
405        if (dropped_no_credit_mask & bit) != 0 {
406            stats.dropped_no_credit[i] = stats.dropped_no_credit[i].wrapping_add(1);
407        }
408    }
409}
410
411/// Run a fanout router until `should_stop()` returns true.
412///
413/// ## Contract
414///
415/// - `should_stop` should be cheap and side-effect free; it may be called frequently.
416/// - The provided [`WaitStrategy`] is used for idle/backpressure waits and is reset after any
417///   successful routing step.
418#[inline]
419pub fn route_until<const N: usize, W: WaitStrategy>(
420    router: &FanoutRouter<N>,
421    cfg: RouterLoopConfig,
422    wait: &mut W,
423    mut should_stop: impl FnMut() -> bool,
424) -> RouterCounters {
425    let mut credit_stats = CreditStats::<N>::default();
426    route_until_with_credit_stats(router, cfg, wait, &mut credit_stats, &mut should_stop)
427}
428
429/// Run a fanout router until `should_stop()` returns true, periodically reporting counters.
430///
431/// Notes:
432/// - Reporting is best-effort and uses `std::time::Instant`, so this is `std`-only.
433/// - `report` is invoked at most once per `report_period`, and once at the end.
434#[cfg(feature = "std")]
435#[inline]
436pub fn route_until_reporting<const N: usize, W: WaitStrategy>(
437    router: &FanoutRouter<N>,
438    cfg: RouterLoopConfig,
439    wait: &mut W,
440    mut should_stop: impl FnMut() -> bool,
441    report_period: Duration,
442    mut report: impl FnMut(RouterCounters),
443) -> RouterCounters {
444    let mut credit_stats = CreditStats::<N>::default();
445    let mut last_report = std::time::Instant::now();
446
447    let mut c = RouterCounters::default();
448
449    let batch_max = cfg.batch_max.max(1);
450    let mut loop_iters: u64 = 0;
451
452    let blocking = if cfg.policy == BackpressurePolicy::Block {
453        fanout_router_ref_blocking(router).ok()
454    } else {
455        None
456    };
457
458    let mut pressure_cursor: usize = 0;
459
460    while !should_stop() {
461        loop_iters = loop_iters.wrapping_add(1);
462        if cfg.yield_every != 0 && loop_iters.is_multiple_of(cfg.yield_every as u64) {
463            std::thread::yield_now();
464        }
465
466        if report_period.as_nanos() != 0 && last_report.elapsed() >= report_period {
467            report(c);
468            last_report = std::time::Instant::now();
469        }
470
471        if cfg.mode == RouterMode::WorkQueue && cfg.policy == BackpressurePolicy::SpinThenBlock {
472            let shared = router.as_ptr();
473            let blocked = if let Some(credit) = cfg.credit {
474                has_pending_message(shared, cfg.source)
475                    && !any_eligible_consumer_has_space_and_credit(
476                        shared,
477                        credit.credit_max.max(1),
478                        &credit_stats.detached,
479                    )
480            } else {
481                has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
482            };
483
484            if blocked {
485                if cfg.credit.is_some() {
486                    c.credit_waits += 1;
487                } else {
488                    c.pressure_waits += 1;
489                }
490                wait.wait();
491                continue;
492            }
493        }
494
495        if cfg.mode == RouterMode::WorkQueue && cfg.policy == BackpressurePolicy::Block {
496            let shared = router.as_ptr();
497            let blocked = if let Some(credit) = cfg.credit {
498                has_pending_message(shared, cfg.source)
499                    && !any_eligible_consumer_has_space_and_credit(
500                        shared,
501                        credit.credit_max.max(1),
502                        &credit_stats.detached,
503                    )
504            } else {
505                has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
506            };
507
508            if blocked {
509                if cfg.credit.is_some() {
510                    c.credit_waits += 1;
511                } else {
512                    c.pressure_waits += 1;
513                }
514
515                if let Some(br) = &blocking {
516                    if N != 0 {
517                        let i = pressure_cursor % N;
518                        pressure_cursor = pressure_cursor.wrapping_add(1);
519
520                        if let Some(expected) = br.consumer_seq(i) {
521                            c.wake_waits += 1;
522                            match br.wait_consumer_seq_ne(i, expected, Some(PRESSURE_WAIT_TIMEOUT))
523                            {
524                                Ok(true) => {}
525                                Ok(false) => c.wake_timeouts += 1,
526                                Err(_) => {
527                                    wait.wait();
528                                }
529                            }
530                        } else {
531                            wait.wait();
532                        }
533                    } else {
534                        wait.wait();
535                    }
536                } else {
537                    wait.wait();
538                }
539
540                continue;
541            }
542        }
543
544        // Credits: if configured to park/detach, avoid dequeuing when nobody is eligible.
545        if cfg.mode == RouterMode::WorkQueue {
546            if let Some(credit) = cfg.credit {
547                if credit.policy == CreditPolicy::Park || credit.policy == CreditPolicy::Detach {
548                    let shared = router.as_ptr();
549                    if has_pending_message(shared, cfg.source)
550                        && !any_eligible_consumer_has_space_and_credit(
551                            shared,
552                            credit.credit_max.max(1),
553                            &credit_stats.detached,
554                        )
555                    {
556                        c.credit_waits += 1;
557
558                        if cfg.policy == BackpressurePolicy::Block {
559                            if let Some(br) = &blocking {
560                                if N != 0 {
561                                    let i = pressure_cursor % N;
562                                    pressure_cursor = pressure_cursor.wrapping_add(1);
563                                    if let Some(expected) = br.consumer_seq(i) {
564                                        c.wake_waits += 1;
565                                        match br.wait_consumer_seq_ne(
566                                            i,
567                                            expected,
568                                            Some(PRESSURE_WAIT_TIMEOUT),
569                                        ) {
570                                            Ok(true) => {}
571                                            Ok(false) => c.wake_timeouts += 1,
572                                            Err(_) => wait.wait(),
573                                        }
574                                    } else {
575                                        wait.wait();
576                                    }
577                                } else {
578                                    wait.wait();
579                                }
580                            } else {
581                                wait.wait();
582                            }
583                        } else {
584                            wait.wait();
585                        }
586
587                        continue;
588                    }
589                }
590            }
591        }
592
593        // Credit detach bookkeeping (best-effort).
594        if let Some(credit) = cfg.credit {
595            let shared = router.as_ptr();
596            let (eligible_mask, _no_credit_mask, _full_mask, depths) =
597                compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
598            credit_stats.depth = depths;
599
600            if credit.policy == CreditPolicy::Detach {
601                let now = std::time::Instant::now();
602                let detach_after = credit
603                    .detach_after_ms
604                    .map(|ms| std::time::Duration::from_millis(ms as u64));
605                let credit_max = credit.credit_max.max(1) as u64;
606                let reattach_at = (credit_max / 2).min(credit_max.saturating_sub(1));
607
608                for i in 0..N {
609                    if credit_stats.detached[i] {
610                        if credit_stats.depth[i] <= reattach_at {
611                            credit_stats.detached[i] = false;
612                            credit_stats.exhausted_since[i] = None;
613                        }
614                        continue;
615                    }
616
617                    if credit_stats.depth[i] >= credit_max {
618                        if credit_stats.exhausted_since[i].is_none() {
619                            credit_stats.exhausted_since[i] = Some(now);
620                        }
621
622                        if let (Some(since), Some(threshold)) =
623                            (credit_stats.exhausted_since[i], detach_after)
624                        {
625                            if now.duration_since(since) >= threshold {
626                                credit_stats.detached[i] = true;
627                                credit_stats.detach_count[i] =
628                                    credit_stats.detach_count[i].wrapping_add(1);
629                                c.detaches += 1;
630                                credit_stats.exhausted_since[i] = None;
631                            }
632                        }
633                    } else {
634                        credit_stats.exhausted_since[i] = None;
635                    }
636                }
637            }
638
639            let _ = eligible_mask;
640        }
641
642        // Attempt at least one route.
643        let first_routed = if let Some(credit) = cfg.credit {
644            let shared = router.as_ptr();
645            let (eligible, no_credit, full, depths) =
646                compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
647            credit_stats.depth = depths;
648
649            let step = router
650                .route_once_with_credit_masks(cfg.source, cfg.mode, eligible, no_credit, full);
651            if !step.routed {
652                false
653            } else {
654                c.routed += 1;
655                c.delivered += step.delivered as u64;
656                c.dropped += (step.dropped_full + step.dropped_no_credit) as u64;
657                match cfg.mode {
658                    RouterMode::Broadcast => c.dropped_queue_full += step.dropped_full as u64,
659                    RouterMode::WorkQueue => c.dropped_all_full += step.dropped_full as u64,
660                }
661                c.dropped_no_credit += step.dropped_no_credit as u64;
662
663                if cfg.mode == RouterMode::Broadcast {
664                    apply_masks_to_stats(
665                        &mut credit_stats,
666                        step.delivered_mask,
667                        step.dropped_full_mask,
668                        step.dropped_no_credit_mask,
669                    );
670                } else {
671                    apply_masks_to_stats(&mut credit_stats, step.delivered_mask, 0, 0);
672
673                    if step.dropped_no_credit != 0 {
674                        apply_masks_to_stats(&mut credit_stats, 0, 0, no_credit);
675                    }
676                    if step.dropped_full != 0 {
677                        apply_masks_to_stats(&mut credit_stats, 0, full, 0);
678                    }
679                }
680
681                true
682            }
683        } else {
684            let first = router.route_once_with_stats(cfg.source, cfg.mode);
685            if !first.routed {
686                false
687            } else {
688                c.routed += 1;
689                c.delivered += first.delivered as u64;
690                c.dropped += first.dropped as u64;
691                if first.dropped != 0 {
692                    match cfg.mode {
693                        RouterMode::Broadcast => c.dropped_queue_full += first.dropped as u64,
694                        RouterMode::WorkQueue => c.dropped_all_full += first.dropped as u64,
695                    }
696                }
697                true
698            }
699        };
700
701        if !first_routed {
702            c.idle_waits += 1;
703
704            if cfg.idle_spin_limit != 0 {
705                let shared = router.as_ptr();
706                for _ in 0..cfg.idle_spin_limit {
707                    if should_stop() {
708                        report(c);
709                        return c;
710                    }
711                    if has_pending_message(shared, cfg.source) {
712                        break;
713                    }
714                    spin_loop();
715                }
716                if has_pending_message(shared, cfg.source) {
717                    continue;
718                }
719            }
720
721            if cfg.policy == BackpressurePolicy::Block {
722                if let Some(br) = &blocking {
723                    let expected = br.producer_seq();
724                    c.wake_waits += 1;
725                    match br.wait_producer_seq_ne(expected, Some(IDLE_WAIT_TIMEOUT)) {
726                        Ok(true) => {}
727                        Ok(false) => c.wake_timeouts += 1,
728                        Err(_) => wait.wait(),
729                    }
730                } else {
731                    wait.wait();
732                }
733            } else {
734                wait.wait();
735            }
736
737            continue;
738        }
739
740        let mut batch_size: u32 = 0;
741        let batch_start = std::time::Instant::now();
742        batch_size += 1;
743
744        while batch_size < batch_max {
745            if let Some(us) = cfg.batch_time_us {
746                if batch_start.elapsed() >= std::time::Duration::from_micros(us as u64) {
747                    break;
748                }
749            }
750
751            if cfg.mode == RouterMode::WorkQueue
752                && (cfg.policy == BackpressurePolicy::SpinThenBlock
753                    || cfg.policy == BackpressurePolicy::Block)
754            {
755                let shared = router.as_ptr();
756                let blocked = if let Some(credit) = cfg.credit {
757                    has_pending_message(shared, cfg.source)
758                        && !any_eligible_consumer_has_space_and_credit(
759                            shared,
760                            credit.credit_max.max(1),
761                            &credit_stats.detached,
762                        )
763                } else {
764                    has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
765                };
766
767                if blocked {
768                    break;
769                }
770            }
771
772            if let Some(credit) = cfg.credit {
773                let shared = router.as_ptr();
774                let (eligible, no_credit, full, depths) =
775                    compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
776                credit_stats.depth = depths;
777
778                let step = router
779                    .route_once_with_credit_masks(cfg.source, cfg.mode, eligible, no_credit, full);
780                if !step.routed {
781                    break;
782                }
783
784                c.routed += 1;
785                c.delivered += step.delivered as u64;
786                c.dropped += (step.dropped_full + step.dropped_no_credit) as u64;
787                match cfg.mode {
788                    RouterMode::Broadcast => c.dropped_queue_full += step.dropped_full as u64,
789                    RouterMode::WorkQueue => c.dropped_all_full += step.dropped_full as u64,
790                }
791                c.dropped_no_credit += step.dropped_no_credit as u64;
792
793                if cfg.mode == RouterMode::Broadcast {
794                    apply_masks_to_stats(
795                        &mut credit_stats,
796                        step.delivered_mask,
797                        step.dropped_full_mask,
798                        step.dropped_no_credit_mask,
799                    );
800                } else {
801                    apply_masks_to_stats(&mut credit_stats, step.delivered_mask, 0, 0);
802                    if step.dropped_no_credit != 0 {
803                        apply_masks_to_stats(&mut credit_stats, 0, 0, no_credit);
804                    }
805                    if step.dropped_full != 0 {
806                        apply_masks_to_stats(&mut credit_stats, 0, full, 0);
807                    }
808                }
809            } else {
810                let step = router.route_once_with_stats(cfg.source, cfg.mode);
811                if !step.routed {
812                    break;
813                }
814                c.routed += 1;
815                c.delivered += step.delivered as u64;
816                c.dropped += step.dropped as u64;
817                if step.dropped != 0 {
818                    match cfg.mode {
819                        RouterMode::Broadcast => c.dropped_queue_full += step.dropped as u64,
820                        RouterMode::WorkQueue => c.dropped_all_full += step.dropped as u64,
821                    }
822                }
823            }
824            batch_size += 1;
825        }
826
827        c.batches += 1;
828        c.batch_sum += batch_size as u64;
829        c.batch_max = c.batch_max.max(batch_size);
830        wait.reset();
831    }
832
833    report(c);
834    c
835}
836
837/// Run a fanout router until `should_stop()` returns true, returning aggregate counters.
838///
839/// If `cfg.credit` is enabled, `credit_stats` is updated in place.
840///
841/// ## Backpressure and blocking
842///
843/// - For broadcast routing, destination-full remains drop-on-full.
844/// - For work-queue routing, backpressure policies can avoid dequeuing the next source message
845///   when no consumer can accept it.
846/// - With [`BackpressurePolicy::Block`], the router uses wake sections when present; if wake
847///   waits fail at runtime, the loop falls back to `wait.wait()`.
848#[inline]
849pub fn route_until_with_credit_stats<const N: usize, W: WaitStrategy>(
850    router: &FanoutRouter<N>,
851    cfg: RouterLoopConfig,
852    wait: &mut W,
853    credit_stats: &mut CreditStats<N>,
854    mut should_stop: impl FnMut() -> bool,
855) -> RouterCounters {
856    let mut c = RouterCounters::default();
857
858    let batch_max = cfg.batch_max.max(1);
859    let mut loop_iters: u64 = 0;
860
861    let blocking = if cfg.policy == BackpressurePolicy::Block {
862        fanout_router_ref_blocking(router).ok()
863    } else {
864        None
865    };
866
867    let mut pressure_cursor: usize = 0;
868
869    while !should_stop() {
870        loop_iters = loop_iters.wrapping_add(1);
871        if cfg.yield_every != 0 && loop_iters.is_multiple_of(cfg.yield_every as u64) {
872            #[cfg(feature = "std")]
873            std::thread::yield_now();
874        }
875
876        if cfg.mode == RouterMode::WorkQueue && cfg.policy == BackpressurePolicy::SpinThenBlock {
877            let shared = router.as_ptr();
878            let blocked = if let Some(credit) = cfg.credit {
879                has_pending_message(shared, cfg.source)
880                    && !any_eligible_consumer_has_space_and_credit(
881                        shared,
882                        credit.credit_max.max(1),
883                        &credit_stats.detached,
884                    )
885            } else {
886                has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
887            };
888
889            if blocked {
890                if cfg.credit.is_some() {
891                    c.credit_waits += 1;
892                } else {
893                    c.pressure_waits += 1;
894                }
895                wait.wait();
896                continue;
897            }
898        }
899
900        if cfg.mode == RouterMode::WorkQueue && cfg.policy == BackpressurePolicy::Block {
901            let shared = router.as_ptr();
902            let blocked = if let Some(credit) = cfg.credit {
903                has_pending_message(shared, cfg.source)
904                    && !any_eligible_consumer_has_space_and_credit(
905                        shared,
906                        credit.credit_max.max(1),
907                        &credit_stats.detached,
908                    )
909            } else {
910                has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
911            };
912
913            if blocked {
914                if cfg.credit.is_some() {
915                    c.credit_waits += 1;
916                } else {
917                    c.pressure_waits += 1;
918                }
919
920                if let Some(br) = &blocking {
921                    if N != 0 {
922                        let i = pressure_cursor % N;
923                        pressure_cursor = pressure_cursor.wrapping_add(1);
924
925                        if let Some(expected) = br.consumer_seq(i) {
926                            c.wake_waits += 1;
927                            match br.wait_consumer_seq_ne(i, expected, Some(PRESSURE_WAIT_TIMEOUT))
928                            {
929                                Ok(true) => {}
930                                Ok(false) => c.wake_timeouts += 1,
931                                Err(_) => {
932                                    // Fall back if OS wait fails.
933                                    wait.wait();
934                                }
935                            }
936                        } else {
937                            wait.wait();
938                        }
939                    } else {
940                        wait.wait();
941                    }
942                } else {
943                    wait.wait();
944                }
945
946                continue;
947            }
948        }
949
950        // Credits: if configured to park/detach, avoid dequeuing when nobody is eligible.
951        if cfg.mode == RouterMode::WorkQueue {
952            if let Some(credit) = cfg.credit {
953                if credit.policy == CreditPolicy::Park || credit.policy == CreditPolicy::Detach {
954                    let shared = router.as_ptr();
955                    if has_pending_message(shared, cfg.source)
956                        && !any_eligible_consumer_has_space_and_credit(
957                            shared,
958                            credit.credit_max.max(1),
959                            &credit_stats.detached,
960                        )
961                    {
962                        c.credit_waits += 1;
963
964                        // Prefer wake-backed blocking if enabled.
965                        if cfg.policy == BackpressurePolicy::Block {
966                            if let Some(br) = &blocking {
967                                if N != 0 {
968                                    let i = pressure_cursor % N;
969                                    pressure_cursor = pressure_cursor.wrapping_add(1);
970                                    if let Some(expected) = br.consumer_seq(i) {
971                                        c.wake_waits += 1;
972                                        match br.wait_consumer_seq_ne(
973                                            i,
974                                            expected,
975                                            Some(PRESSURE_WAIT_TIMEOUT),
976                                        ) {
977                                            Ok(true) => {}
978                                            Ok(false) => c.wake_timeouts += 1,
979                                            Err(_) => wait.wait(),
980                                        }
981                                    } else {
982                                        wait.wait();
983                                    }
984                                } else {
985                                    wait.wait();
986                                }
987                            } else {
988                                wait.wait();
989                            }
990                        } else {
991                            wait.wait();
992                        }
993
994                        continue;
995                    }
996                }
997            }
998        }
999
1000        // Credit detach bookkeeping (best-effort).
1001        if let Some(credit) = cfg.credit {
1002            let shared = router.as_ptr();
1003            let (eligible_mask, _no_credit_mask, _full_mask, depths) =
1004                compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
1005            credit_stats.depth = depths;
1006
1007            if credit.policy == CreditPolicy::Detach {
1008                #[cfg(feature = "std")]
1009                {
1010                    let now = std::time::Instant::now();
1011                    let detach_after = credit
1012                        .detach_after_ms
1013                        .map(|ms| std::time::Duration::from_millis(ms as u64));
1014                    let credit_max = credit.credit_max.max(1) as u64;
1015                    let reattach_at = (credit_max / 2).min(credit_max.saturating_sub(1));
1016
1017                    for i in 0..N {
1018                        if credit_stats.detached[i] {
1019                            if credit_stats.depth[i] <= reattach_at {
1020                                credit_stats.detached[i] = false;
1021                                credit_stats.exhausted_since[i] = None;
1022                            }
1023                            continue;
1024                        }
1025
1026                        if credit_stats.depth[i] >= credit_max {
1027                            if credit_stats.exhausted_since[i].is_none() {
1028                                credit_stats.exhausted_since[i] = Some(now);
1029                            }
1030
1031                            if let (Some(since), Some(threshold)) =
1032                                (credit_stats.exhausted_since[i], detach_after)
1033                            {
1034                                if now.duration_since(since) >= threshold {
1035                                    credit_stats.detached[i] = true;
1036                                    credit_stats.detach_count[i] =
1037                                        credit_stats.detach_count[i].wrapping_add(1);
1038                                    c.detaches += 1;
1039                                    credit_stats.exhausted_since[i] = None;
1040                                }
1041                            }
1042                        } else {
1043                            credit_stats.exhausted_since[i] = None;
1044                        }
1045                    }
1046                }
1047            }
1048
1049            // If N<=64 and no consumers are eligible, keep eligible_mask computed for later.
1050            let _ = eligible_mask;
1051        }
1052
1053        // Attempt at least one route.
1054        let first_routed = if let Some(credit) = cfg.credit {
1055            let shared = router.as_ptr();
1056            let (eligible, no_credit, full, depths) =
1057                compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
1058            credit_stats.depth = depths;
1059
1060            let step = router
1061                .route_once_with_credit_masks(cfg.source, cfg.mode, eligible, no_credit, full);
1062            if !step.routed {
1063                false
1064            } else {
1065                c.routed += 1;
1066                c.delivered += step.delivered as u64;
1067                c.dropped += (step.dropped_full + step.dropped_no_credit) as u64;
1068                match cfg.mode {
1069                    RouterMode::Broadcast => c.dropped_queue_full += step.dropped_full as u64,
1070                    RouterMode::WorkQueue => c.dropped_all_full += step.dropped_full as u64,
1071                }
1072                c.dropped_no_credit += step.dropped_no_credit as u64;
1073
1074                if cfg.mode == RouterMode::Broadcast {
1075                    apply_masks_to_stats(
1076                        credit_stats,
1077                        step.delivered_mask,
1078                        step.dropped_full_mask,
1079                        step.dropped_no_credit_mask,
1080                    );
1081                } else {
1082                    // Work-queue attribution is best-effort.
1083                    apply_masks_to_stats(credit_stats, step.delivered_mask, 0, 0);
1084
1085                    // If we dropped due to no-credit/full, attribute using the precomputed masks.
1086                    if step.dropped_no_credit != 0 {
1087                        apply_masks_to_stats(credit_stats, 0, 0, no_credit);
1088                    }
1089                    if step.dropped_full != 0 {
1090                        apply_masks_to_stats(credit_stats, 0, full, 0);
1091                    }
1092                }
1093
1094                true
1095            }
1096        } else {
1097            let first = router.route_once_with_stats(cfg.source, cfg.mode);
1098            if !first.routed {
1099                false
1100            } else {
1101                c.routed += 1;
1102                c.delivered += first.delivered as u64;
1103                c.dropped += first.dropped as u64;
1104                if first.dropped != 0 {
1105                    match cfg.mode {
1106                        RouterMode::Broadcast => c.dropped_queue_full += first.dropped as u64,
1107                        RouterMode::WorkQueue => c.dropped_all_full += first.dropped as u64,
1108                    }
1109                }
1110                true
1111            }
1112        };
1113
1114        if !first_routed {
1115            c.idle_waits += 1;
1116
1117            // Bounded idle spin window before invoking a heavier wait.
1118            if cfg.idle_spin_limit != 0 {
1119                let shared = router.as_ptr();
1120                for _ in 0..cfg.idle_spin_limit {
1121                    if should_stop() {
1122                        return c;
1123                    }
1124                    if has_pending_message(shared, cfg.source) {
1125                        break;
1126                    }
1127                    spin_loop();
1128                }
1129                if has_pending_message(shared, cfg.source) {
1130                    continue;
1131                }
1132            }
1133
1134            if cfg.policy == BackpressurePolicy::Block {
1135                if let Some(br) = &blocking {
1136                    let expected = br.producer_seq();
1137                    c.wake_waits += 1;
1138                    match br.wait_producer_seq_ne(expected, Some(IDLE_WAIT_TIMEOUT)) {
1139                        Ok(true) => {}
1140                        Ok(false) => c.wake_timeouts += 1,
1141                        Err(_) => wait.wait(),
1142                    }
1143                } else {
1144                    wait.wait();
1145                }
1146            } else {
1147                wait.wait();
1148            }
1149
1150            continue;
1151        }
1152
1153        // Start a batch.
1154        let mut batch_size: u32 = 0;
1155
1156        #[cfg(feature = "std")]
1157        let batch_start = std::time::Instant::now();
1158
1159        batch_size += 1;
1160
1161        while batch_size < batch_max {
1162            #[cfg(feature = "std")]
1163            if let Some(us) = cfg.batch_time_us {
1164                if batch_start.elapsed() >= Duration::from_micros(us as u64) {
1165                    break;
1166                }
1167            }
1168
1169            // If we are about to hit pressure, end the batch and let the outer loop apply the
1170            // configured wait policy.
1171            if cfg.mode == RouterMode::WorkQueue
1172                && (cfg.policy == BackpressurePolicy::SpinThenBlock
1173                    || cfg.policy == BackpressurePolicy::Block)
1174            {
1175                let shared = router.as_ptr();
1176                let blocked = if let Some(credit) = cfg.credit {
1177                    has_pending_message(shared, cfg.source)
1178                        && !any_eligible_consumer_has_space_and_credit(
1179                            shared,
1180                            credit.credit_max.max(1),
1181                            &credit_stats.detached,
1182                        )
1183                } else {
1184                    has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
1185                };
1186
1187                if blocked {
1188                    break;
1189                }
1190            }
1191
1192            if let Some(credit) = cfg.credit {
1193                let shared = router.as_ptr();
1194                let (eligible, no_credit, full, depths) =
1195                    compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
1196                credit_stats.depth = depths;
1197
1198                let step = router
1199                    .route_once_with_credit_masks(cfg.source, cfg.mode, eligible, no_credit, full);
1200                if !step.routed {
1201                    break;
1202                }
1203
1204                c.routed += 1;
1205                c.delivered += step.delivered as u64;
1206                c.dropped += (step.dropped_full + step.dropped_no_credit) as u64;
1207                match cfg.mode {
1208                    RouterMode::Broadcast => c.dropped_queue_full += step.dropped_full as u64,
1209                    RouterMode::WorkQueue => c.dropped_all_full += step.dropped_full as u64,
1210                }
1211                c.dropped_no_credit += step.dropped_no_credit as u64;
1212
1213                if cfg.mode == RouterMode::Broadcast {
1214                    apply_masks_to_stats(
1215                        credit_stats,
1216                        step.delivered_mask,
1217                        step.dropped_full_mask,
1218                        step.dropped_no_credit_mask,
1219                    );
1220                } else {
1221                    apply_masks_to_stats(credit_stats, step.delivered_mask, 0, 0);
1222                    if step.dropped_no_credit != 0 {
1223                        apply_masks_to_stats(credit_stats, 0, 0, no_credit);
1224                    }
1225                    if step.dropped_full != 0 {
1226                        apply_masks_to_stats(credit_stats, 0, full, 0);
1227                    }
1228                }
1229            } else {
1230                let step = router.route_once_with_stats(cfg.source, cfg.mode);
1231                if !step.routed {
1232                    break;
1233                }
1234                c.routed += 1;
1235                c.delivered += step.delivered as u64;
1236                c.dropped += step.dropped as u64;
1237                if step.dropped != 0 {
1238                    match cfg.mode {
1239                        RouterMode::Broadcast => c.dropped_queue_full += step.dropped as u64,
1240                        RouterMode::WorkQueue => c.dropped_all_full += step.dropped as u64,
1241                    }
1242                }
1243            }
1244            batch_size += 1;
1245        }
1246
1247        c.batches += 1;
1248        c.batch_sum += batch_size as u64;
1249        c.batch_max = c.batch_max.max(batch_size);
1250        wait.reset();
1251    }
1252
1253    c
1254}
1255
1256/// Run the router for a fixed duration.
1257///
1258/// Requires `std` because it uses `std::time::Instant`.
1259#[cfg(feature = "std")]
1260#[inline]
1261pub fn route_for<const N: usize, W: WaitStrategy>(
1262    router: &FanoutRouter<N>,
1263    cfg: RouterLoopConfig,
1264    wait: &mut W,
1265    dur: Duration,
1266) -> RouterCounters {
1267    let start = std::time::Instant::now();
1268    route_until(router, cfg, wait, || start.elapsed() >= dur)
1269}