1use core::hint::spin_loop;
2use core::time::Duration;
3
4use core::sync::atomic::{AtomicU64, Ordering};
5
6use indexbus_abi::IndexbusAtomicU64;
7
8use indexbus_abi::layouts::{IndexQueue, MpscQueue, SharedFanoutLayout};
9use indexbus_abi::INDEXBUS_QUEUE_CAPACITY;
10use indexbus_blocking::fanout_router_ref_blocking;
11use indexbus_core::{FanoutRouter, RouterMode, RouterSource, WaitStrategy};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum BackpressurePolicy {
30 Drop,
32
33 SpinThenBlock,
42
43 Block,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum CreditPolicy {
64 Drop,
66
67 Detach,
74
75 Park,
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub struct CreditConfig {
85 pub credit_max: u32,
91
92 pub policy: CreditPolicy,
94
95 pub detach_after_ms: Option<u32>,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub struct RouterLoopConfig {
117 pub source: RouterSource,
119 pub mode: RouterMode,
121 pub policy: BackpressurePolicy,
123
124 pub credit: Option<CreditConfig>,
126
127 pub batch_max: u32,
133
134 pub batch_time_us: Option<u32>,
138
139 pub yield_every: u32,
143
144 pub idle_spin_limit: u32,
149}
150
151impl Default for RouterLoopConfig {
152 fn default() -> Self {
153 Self {
154 source: RouterSource::Spsc,
155 mode: RouterMode::Broadcast,
156 policy: BackpressurePolicy::Drop,
157
158 credit: None,
159
160 batch_max: 1,
161 batch_time_us: None,
162 yield_every: 0,
163 idle_spin_limit: 0,
164 }
165 }
166}
167
168#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
174pub struct RouterCounters {
175 pub routed: u64,
177 pub delivered: u64,
179 pub dropped: u64,
181
182 pub dropped_queue_full: u64,
184
185 pub dropped_all_full: u64,
187
188 pub dropped_no_credit: u64,
190
191 pub credit_waits: u64,
193
194 pub detaches: u64,
196 pub pressure_waits: u64,
198 pub idle_waits: u64,
200
201 pub batches: u64,
203
204 pub batch_sum: u64,
206
207 pub batch_max: u32,
209
210 pub wake_waits: u64,
212
213 pub wake_timeouts: u64,
215}
216
217#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct CreditStats<const N: usize> {
226 pub delivered: [u64; N],
228 pub dropped_no_credit: [u64; N],
230 pub dropped_full: [u64; N],
232 pub detached: [bool; N],
234 pub detach_count: [u64; N],
236 pub depth: [u64; N],
240
241 #[cfg(feature = "std")]
242 #[doc(hidden)]
243 pub(crate) exhausted_since: [Option<std::time::Instant>; N],
244}
245
246impl<const N: usize> Default for CreditStats<N> {
247 fn default() -> Self {
248 Self {
249 delivered: [0; N],
250 dropped_no_credit: [0; N],
251 dropped_full: [0; N],
252 detached: [false; N],
253 detach_count: [0; N],
254 depth: [0; N],
255
256 #[cfg(feature = "std")]
257 exhausted_since: [None; N],
258 }
259 }
260}
261
262const PRESSURE_WAIT_TIMEOUT: Duration = Duration::from_millis(1);
263const IDLE_WAIT_TIMEOUT: Duration = Duration::from_millis(10);
264
265#[inline]
266fn load_u64(addr: *const IndexbusAtomicU64) -> u64 {
267 unsafe { (&*(addr as *const AtomicU64)).load(Ordering::Acquire) }
268}
269
270#[inline]
271fn index_queue_depth(q: &IndexQueue) -> u64 {
272 let head = load_u64(core::ptr::addr_of!(q.head));
273 let tail = load_u64(core::ptr::addr_of!(q.tail));
274 tail.wrapping_sub(head)
275}
276
277#[inline]
278fn mpsc_queue_depth(q: &MpscQueue) -> u64 {
279 let w = load_u64(core::ptr::addr_of!(q.write));
280 let r = load_u64(core::ptr::addr_of!(q.read));
281 w.wrapping_sub(r)
282}
283
284#[inline]
285fn has_pending_message<const N: usize>(
286 shared: *const SharedFanoutLayout<N>,
287 source: RouterSource,
288) -> bool {
289 let s = unsafe { &*shared };
290 match source {
291 RouterSource::Spsc => index_queue_depth(&s.producer_queue) != 0,
292 RouterSource::Mpsc => mpsc_queue_depth(&s.producer_queue_mpsc) != 0,
293 }
294}
295
296#[inline]
297fn any_consumer_has_space<const N: usize>(shared: *const SharedFanoutLayout<N>) -> bool {
298 if N == 0 {
299 return true;
300 }
301
302 let s = unsafe { &*shared };
303 let cap = INDEXBUS_QUEUE_CAPACITY as u64;
304 for q in &s.consumer_queues {
305 if index_queue_depth(q) < cap {
306 return true;
307 }
308 }
309 false
310}
311
312#[inline]
313fn consumer_depths<const N: usize>(shared: *const SharedFanoutLayout<N>) -> [u64; N] {
314 let s = unsafe { &*shared };
315 let mut out = [0u64; N];
316 for (i, q) in s.consumer_queues.iter().enumerate() {
317 out[i] = index_queue_depth(q);
318 }
319 out
320}
321
322#[inline]
323fn compute_credit_masks<const N: usize>(
324 shared: *const SharedFanoutLayout<N>,
325 credit_max: u32,
326 detached: &[bool; N],
327) -> (u64, u64, u64, [u64; N]) {
328 let depths = consumer_depths(shared);
329 if N == 0 {
330 return (0, 0, 0, depths);
331 }
332
333 let cap = INDEXBUS_QUEUE_CAPACITY as u64;
334 let mut eligible_mask: u64 = 0;
335 let mut no_credit_mask: u64 = 0;
336 let mut full_mask: u64 = 0;
337
338 if N > 64 {
339 return (0, 0, 0, depths);
341 }
342
343 let credit_max = credit_max as u64;
344 for (i, &is_detached) in detached.iter().enumerate() {
345 if is_detached {
346 continue;
347 }
348 let d = depths[i];
349 let bit = 1u64 << (i as u32);
350 if d >= cap {
351 full_mask |= bit;
352 } else if d >= credit_max {
353 no_credit_mask |= bit;
354 } else {
355 eligible_mask |= bit;
356 }
357 }
358
359 (eligible_mask, no_credit_mask, full_mask, depths)
360}
361
362#[inline]
363fn any_eligible_consumer_has_space_and_credit<const N: usize>(
364 shared: *const SharedFanoutLayout<N>,
365 credit_max: u32,
366 detached: &[bool; N],
367) -> bool {
368 if N == 0 {
369 return true;
370 }
371 let cap = INDEXBUS_QUEUE_CAPACITY as u64;
372 let credit_max = credit_max as u64;
373
374 let s = unsafe { &*shared };
375 for (i, &is_detached) in detached.iter().enumerate() {
376 if is_detached {
377 continue;
378 }
379 let d = index_queue_depth(&s.consumer_queues[i]);
380 if d < cap && d < credit_max {
381 return true;
382 }
383 }
384 false
385}
386
387#[inline]
388fn apply_masks_to_stats<const N: usize>(
389 stats: &mut CreditStats<N>,
390 delivered_mask: u64,
391 dropped_full_mask: u64,
392 dropped_no_credit_mask: u64,
393) {
394 if N > 64 {
395 return;
396 }
397 for i in 0..N {
398 let bit = 1u64 << (i as u32);
399 if (delivered_mask & bit) != 0 {
400 stats.delivered[i] = stats.delivered[i].wrapping_add(1);
401 }
402 if (dropped_full_mask & bit) != 0 {
403 stats.dropped_full[i] = stats.dropped_full[i].wrapping_add(1);
404 }
405 if (dropped_no_credit_mask & bit) != 0 {
406 stats.dropped_no_credit[i] = stats.dropped_no_credit[i].wrapping_add(1);
407 }
408 }
409}
410
411#[inline]
419pub fn route_until<const N: usize, W: WaitStrategy>(
420 router: &FanoutRouter<N>,
421 cfg: RouterLoopConfig,
422 wait: &mut W,
423 mut should_stop: impl FnMut() -> bool,
424) -> RouterCounters {
425 let mut credit_stats = CreditStats::<N>::default();
426 route_until_with_credit_stats(router, cfg, wait, &mut credit_stats, &mut should_stop)
427}
428
429#[cfg(feature = "std")]
435#[inline]
436pub fn route_until_reporting<const N: usize, W: WaitStrategy>(
437 router: &FanoutRouter<N>,
438 cfg: RouterLoopConfig,
439 wait: &mut W,
440 mut should_stop: impl FnMut() -> bool,
441 report_period: Duration,
442 mut report: impl FnMut(RouterCounters),
443) -> RouterCounters {
444 let mut credit_stats = CreditStats::<N>::default();
445 let mut last_report = std::time::Instant::now();
446
447 let mut c = RouterCounters::default();
448
449 let batch_max = cfg.batch_max.max(1);
450 let mut loop_iters: u64 = 0;
451
452 let blocking = if cfg.policy == BackpressurePolicy::Block {
453 fanout_router_ref_blocking(router).ok()
454 } else {
455 None
456 };
457
458 let mut pressure_cursor: usize = 0;
459
460 while !should_stop() {
461 loop_iters = loop_iters.wrapping_add(1);
462 if cfg.yield_every != 0 && loop_iters.is_multiple_of(cfg.yield_every as u64) {
463 std::thread::yield_now();
464 }
465
466 if report_period.as_nanos() != 0 && last_report.elapsed() >= report_period {
467 report(c);
468 last_report = std::time::Instant::now();
469 }
470
471 if cfg.mode == RouterMode::WorkQueue && cfg.policy == BackpressurePolicy::SpinThenBlock {
472 let shared = router.as_ptr();
473 let blocked = if let Some(credit) = cfg.credit {
474 has_pending_message(shared, cfg.source)
475 && !any_eligible_consumer_has_space_and_credit(
476 shared,
477 credit.credit_max.max(1),
478 &credit_stats.detached,
479 )
480 } else {
481 has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
482 };
483
484 if blocked {
485 if cfg.credit.is_some() {
486 c.credit_waits += 1;
487 } else {
488 c.pressure_waits += 1;
489 }
490 wait.wait();
491 continue;
492 }
493 }
494
495 if cfg.mode == RouterMode::WorkQueue && cfg.policy == BackpressurePolicy::Block {
496 let shared = router.as_ptr();
497 let blocked = if let Some(credit) = cfg.credit {
498 has_pending_message(shared, cfg.source)
499 && !any_eligible_consumer_has_space_and_credit(
500 shared,
501 credit.credit_max.max(1),
502 &credit_stats.detached,
503 )
504 } else {
505 has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
506 };
507
508 if blocked {
509 if cfg.credit.is_some() {
510 c.credit_waits += 1;
511 } else {
512 c.pressure_waits += 1;
513 }
514
515 if let Some(br) = &blocking {
516 if N != 0 {
517 let i = pressure_cursor % N;
518 pressure_cursor = pressure_cursor.wrapping_add(1);
519
520 if let Some(expected) = br.consumer_seq(i) {
521 c.wake_waits += 1;
522 match br.wait_consumer_seq_ne(i, expected, Some(PRESSURE_WAIT_TIMEOUT))
523 {
524 Ok(true) => {}
525 Ok(false) => c.wake_timeouts += 1,
526 Err(_) => {
527 wait.wait();
528 }
529 }
530 } else {
531 wait.wait();
532 }
533 } else {
534 wait.wait();
535 }
536 } else {
537 wait.wait();
538 }
539
540 continue;
541 }
542 }
543
544 if cfg.mode == RouterMode::WorkQueue {
546 if let Some(credit) = cfg.credit {
547 if credit.policy == CreditPolicy::Park || credit.policy == CreditPolicy::Detach {
548 let shared = router.as_ptr();
549 if has_pending_message(shared, cfg.source)
550 && !any_eligible_consumer_has_space_and_credit(
551 shared,
552 credit.credit_max.max(1),
553 &credit_stats.detached,
554 )
555 {
556 c.credit_waits += 1;
557
558 if cfg.policy == BackpressurePolicy::Block {
559 if let Some(br) = &blocking {
560 if N != 0 {
561 let i = pressure_cursor % N;
562 pressure_cursor = pressure_cursor.wrapping_add(1);
563 if let Some(expected) = br.consumer_seq(i) {
564 c.wake_waits += 1;
565 match br.wait_consumer_seq_ne(
566 i,
567 expected,
568 Some(PRESSURE_WAIT_TIMEOUT),
569 ) {
570 Ok(true) => {}
571 Ok(false) => c.wake_timeouts += 1,
572 Err(_) => wait.wait(),
573 }
574 } else {
575 wait.wait();
576 }
577 } else {
578 wait.wait();
579 }
580 } else {
581 wait.wait();
582 }
583 } else {
584 wait.wait();
585 }
586
587 continue;
588 }
589 }
590 }
591 }
592
593 if let Some(credit) = cfg.credit {
595 let shared = router.as_ptr();
596 let (eligible_mask, _no_credit_mask, _full_mask, depths) =
597 compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
598 credit_stats.depth = depths;
599
600 if credit.policy == CreditPolicy::Detach {
601 let now = std::time::Instant::now();
602 let detach_after = credit
603 .detach_after_ms
604 .map(|ms| std::time::Duration::from_millis(ms as u64));
605 let credit_max = credit.credit_max.max(1) as u64;
606 let reattach_at = (credit_max / 2).min(credit_max.saturating_sub(1));
607
608 for i in 0..N {
609 if credit_stats.detached[i] {
610 if credit_stats.depth[i] <= reattach_at {
611 credit_stats.detached[i] = false;
612 credit_stats.exhausted_since[i] = None;
613 }
614 continue;
615 }
616
617 if credit_stats.depth[i] >= credit_max {
618 if credit_stats.exhausted_since[i].is_none() {
619 credit_stats.exhausted_since[i] = Some(now);
620 }
621
622 if let (Some(since), Some(threshold)) =
623 (credit_stats.exhausted_since[i], detach_after)
624 {
625 if now.duration_since(since) >= threshold {
626 credit_stats.detached[i] = true;
627 credit_stats.detach_count[i] =
628 credit_stats.detach_count[i].wrapping_add(1);
629 c.detaches += 1;
630 credit_stats.exhausted_since[i] = None;
631 }
632 }
633 } else {
634 credit_stats.exhausted_since[i] = None;
635 }
636 }
637 }
638
639 let _ = eligible_mask;
640 }
641
642 let first_routed = if let Some(credit) = cfg.credit {
644 let shared = router.as_ptr();
645 let (eligible, no_credit, full, depths) =
646 compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
647 credit_stats.depth = depths;
648
649 let step = router
650 .route_once_with_credit_masks(cfg.source, cfg.mode, eligible, no_credit, full);
651 if !step.routed {
652 false
653 } else {
654 c.routed += 1;
655 c.delivered += step.delivered as u64;
656 c.dropped += (step.dropped_full + step.dropped_no_credit) as u64;
657 match cfg.mode {
658 RouterMode::Broadcast => c.dropped_queue_full += step.dropped_full as u64,
659 RouterMode::WorkQueue => c.dropped_all_full += step.dropped_full as u64,
660 }
661 c.dropped_no_credit += step.dropped_no_credit as u64;
662
663 if cfg.mode == RouterMode::Broadcast {
664 apply_masks_to_stats(
665 &mut credit_stats,
666 step.delivered_mask,
667 step.dropped_full_mask,
668 step.dropped_no_credit_mask,
669 );
670 } else {
671 apply_masks_to_stats(&mut credit_stats, step.delivered_mask, 0, 0);
672
673 if step.dropped_no_credit != 0 {
674 apply_masks_to_stats(&mut credit_stats, 0, 0, no_credit);
675 }
676 if step.dropped_full != 0 {
677 apply_masks_to_stats(&mut credit_stats, 0, full, 0);
678 }
679 }
680
681 true
682 }
683 } else {
684 let first = router.route_once_with_stats(cfg.source, cfg.mode);
685 if !first.routed {
686 false
687 } else {
688 c.routed += 1;
689 c.delivered += first.delivered as u64;
690 c.dropped += first.dropped as u64;
691 if first.dropped != 0 {
692 match cfg.mode {
693 RouterMode::Broadcast => c.dropped_queue_full += first.dropped as u64,
694 RouterMode::WorkQueue => c.dropped_all_full += first.dropped as u64,
695 }
696 }
697 true
698 }
699 };
700
701 if !first_routed {
702 c.idle_waits += 1;
703
704 if cfg.idle_spin_limit != 0 {
705 let shared = router.as_ptr();
706 for _ in 0..cfg.idle_spin_limit {
707 if should_stop() {
708 report(c);
709 return c;
710 }
711 if has_pending_message(shared, cfg.source) {
712 break;
713 }
714 spin_loop();
715 }
716 if has_pending_message(shared, cfg.source) {
717 continue;
718 }
719 }
720
721 if cfg.policy == BackpressurePolicy::Block {
722 if let Some(br) = &blocking {
723 let expected = br.producer_seq();
724 c.wake_waits += 1;
725 match br.wait_producer_seq_ne(expected, Some(IDLE_WAIT_TIMEOUT)) {
726 Ok(true) => {}
727 Ok(false) => c.wake_timeouts += 1,
728 Err(_) => wait.wait(),
729 }
730 } else {
731 wait.wait();
732 }
733 } else {
734 wait.wait();
735 }
736
737 continue;
738 }
739
740 let mut batch_size: u32 = 0;
741 let batch_start = std::time::Instant::now();
742 batch_size += 1;
743
744 while batch_size < batch_max {
745 if let Some(us) = cfg.batch_time_us {
746 if batch_start.elapsed() >= std::time::Duration::from_micros(us as u64) {
747 break;
748 }
749 }
750
751 if cfg.mode == RouterMode::WorkQueue
752 && (cfg.policy == BackpressurePolicy::SpinThenBlock
753 || cfg.policy == BackpressurePolicy::Block)
754 {
755 let shared = router.as_ptr();
756 let blocked = if let Some(credit) = cfg.credit {
757 has_pending_message(shared, cfg.source)
758 && !any_eligible_consumer_has_space_and_credit(
759 shared,
760 credit.credit_max.max(1),
761 &credit_stats.detached,
762 )
763 } else {
764 has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
765 };
766
767 if blocked {
768 break;
769 }
770 }
771
772 if let Some(credit) = cfg.credit {
773 let shared = router.as_ptr();
774 let (eligible, no_credit, full, depths) =
775 compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
776 credit_stats.depth = depths;
777
778 let step = router
779 .route_once_with_credit_masks(cfg.source, cfg.mode, eligible, no_credit, full);
780 if !step.routed {
781 break;
782 }
783
784 c.routed += 1;
785 c.delivered += step.delivered as u64;
786 c.dropped += (step.dropped_full + step.dropped_no_credit) as u64;
787 match cfg.mode {
788 RouterMode::Broadcast => c.dropped_queue_full += step.dropped_full as u64,
789 RouterMode::WorkQueue => c.dropped_all_full += step.dropped_full as u64,
790 }
791 c.dropped_no_credit += step.dropped_no_credit as u64;
792
793 if cfg.mode == RouterMode::Broadcast {
794 apply_masks_to_stats(
795 &mut credit_stats,
796 step.delivered_mask,
797 step.dropped_full_mask,
798 step.dropped_no_credit_mask,
799 );
800 } else {
801 apply_masks_to_stats(&mut credit_stats, step.delivered_mask, 0, 0);
802 if step.dropped_no_credit != 0 {
803 apply_masks_to_stats(&mut credit_stats, 0, 0, no_credit);
804 }
805 if step.dropped_full != 0 {
806 apply_masks_to_stats(&mut credit_stats, 0, full, 0);
807 }
808 }
809 } else {
810 let step = router.route_once_with_stats(cfg.source, cfg.mode);
811 if !step.routed {
812 break;
813 }
814 c.routed += 1;
815 c.delivered += step.delivered as u64;
816 c.dropped += step.dropped as u64;
817 if step.dropped != 0 {
818 match cfg.mode {
819 RouterMode::Broadcast => c.dropped_queue_full += step.dropped as u64,
820 RouterMode::WorkQueue => c.dropped_all_full += step.dropped as u64,
821 }
822 }
823 }
824 batch_size += 1;
825 }
826
827 c.batches += 1;
828 c.batch_sum += batch_size as u64;
829 c.batch_max = c.batch_max.max(batch_size);
830 wait.reset();
831 }
832
833 report(c);
834 c
835}
836
837#[inline]
849pub fn route_until_with_credit_stats<const N: usize, W: WaitStrategy>(
850 router: &FanoutRouter<N>,
851 cfg: RouterLoopConfig,
852 wait: &mut W,
853 credit_stats: &mut CreditStats<N>,
854 mut should_stop: impl FnMut() -> bool,
855) -> RouterCounters {
856 let mut c = RouterCounters::default();
857
858 let batch_max = cfg.batch_max.max(1);
859 let mut loop_iters: u64 = 0;
860
861 let blocking = if cfg.policy == BackpressurePolicy::Block {
862 fanout_router_ref_blocking(router).ok()
863 } else {
864 None
865 };
866
867 let mut pressure_cursor: usize = 0;
868
869 while !should_stop() {
870 loop_iters = loop_iters.wrapping_add(1);
871 if cfg.yield_every != 0 && loop_iters.is_multiple_of(cfg.yield_every as u64) {
872 #[cfg(feature = "std")]
873 std::thread::yield_now();
874 }
875
876 if cfg.mode == RouterMode::WorkQueue && cfg.policy == BackpressurePolicy::SpinThenBlock {
877 let shared = router.as_ptr();
878 let blocked = if let Some(credit) = cfg.credit {
879 has_pending_message(shared, cfg.source)
880 && !any_eligible_consumer_has_space_and_credit(
881 shared,
882 credit.credit_max.max(1),
883 &credit_stats.detached,
884 )
885 } else {
886 has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
887 };
888
889 if blocked {
890 if cfg.credit.is_some() {
891 c.credit_waits += 1;
892 } else {
893 c.pressure_waits += 1;
894 }
895 wait.wait();
896 continue;
897 }
898 }
899
900 if cfg.mode == RouterMode::WorkQueue && cfg.policy == BackpressurePolicy::Block {
901 let shared = router.as_ptr();
902 let blocked = if let Some(credit) = cfg.credit {
903 has_pending_message(shared, cfg.source)
904 && !any_eligible_consumer_has_space_and_credit(
905 shared,
906 credit.credit_max.max(1),
907 &credit_stats.detached,
908 )
909 } else {
910 has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
911 };
912
913 if blocked {
914 if cfg.credit.is_some() {
915 c.credit_waits += 1;
916 } else {
917 c.pressure_waits += 1;
918 }
919
920 if let Some(br) = &blocking {
921 if N != 0 {
922 let i = pressure_cursor % N;
923 pressure_cursor = pressure_cursor.wrapping_add(1);
924
925 if let Some(expected) = br.consumer_seq(i) {
926 c.wake_waits += 1;
927 match br.wait_consumer_seq_ne(i, expected, Some(PRESSURE_WAIT_TIMEOUT))
928 {
929 Ok(true) => {}
930 Ok(false) => c.wake_timeouts += 1,
931 Err(_) => {
932 wait.wait();
934 }
935 }
936 } else {
937 wait.wait();
938 }
939 } else {
940 wait.wait();
941 }
942 } else {
943 wait.wait();
944 }
945
946 continue;
947 }
948 }
949
950 if cfg.mode == RouterMode::WorkQueue {
952 if let Some(credit) = cfg.credit {
953 if credit.policy == CreditPolicy::Park || credit.policy == CreditPolicy::Detach {
954 let shared = router.as_ptr();
955 if has_pending_message(shared, cfg.source)
956 && !any_eligible_consumer_has_space_and_credit(
957 shared,
958 credit.credit_max.max(1),
959 &credit_stats.detached,
960 )
961 {
962 c.credit_waits += 1;
963
964 if cfg.policy == BackpressurePolicy::Block {
966 if let Some(br) = &blocking {
967 if N != 0 {
968 let i = pressure_cursor % N;
969 pressure_cursor = pressure_cursor.wrapping_add(1);
970 if let Some(expected) = br.consumer_seq(i) {
971 c.wake_waits += 1;
972 match br.wait_consumer_seq_ne(
973 i,
974 expected,
975 Some(PRESSURE_WAIT_TIMEOUT),
976 ) {
977 Ok(true) => {}
978 Ok(false) => c.wake_timeouts += 1,
979 Err(_) => wait.wait(),
980 }
981 } else {
982 wait.wait();
983 }
984 } else {
985 wait.wait();
986 }
987 } else {
988 wait.wait();
989 }
990 } else {
991 wait.wait();
992 }
993
994 continue;
995 }
996 }
997 }
998 }
999
1000 if let Some(credit) = cfg.credit {
1002 let shared = router.as_ptr();
1003 let (eligible_mask, _no_credit_mask, _full_mask, depths) =
1004 compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
1005 credit_stats.depth = depths;
1006
1007 if credit.policy == CreditPolicy::Detach {
1008 #[cfg(feature = "std")]
1009 {
1010 let now = std::time::Instant::now();
1011 let detach_after = credit
1012 .detach_after_ms
1013 .map(|ms| std::time::Duration::from_millis(ms as u64));
1014 let credit_max = credit.credit_max.max(1) as u64;
1015 let reattach_at = (credit_max / 2).min(credit_max.saturating_sub(1));
1016
1017 for i in 0..N {
1018 if credit_stats.detached[i] {
1019 if credit_stats.depth[i] <= reattach_at {
1020 credit_stats.detached[i] = false;
1021 credit_stats.exhausted_since[i] = None;
1022 }
1023 continue;
1024 }
1025
1026 if credit_stats.depth[i] >= credit_max {
1027 if credit_stats.exhausted_since[i].is_none() {
1028 credit_stats.exhausted_since[i] = Some(now);
1029 }
1030
1031 if let (Some(since), Some(threshold)) =
1032 (credit_stats.exhausted_since[i], detach_after)
1033 {
1034 if now.duration_since(since) >= threshold {
1035 credit_stats.detached[i] = true;
1036 credit_stats.detach_count[i] =
1037 credit_stats.detach_count[i].wrapping_add(1);
1038 c.detaches += 1;
1039 credit_stats.exhausted_since[i] = None;
1040 }
1041 }
1042 } else {
1043 credit_stats.exhausted_since[i] = None;
1044 }
1045 }
1046 }
1047 }
1048
1049 let _ = eligible_mask;
1051 }
1052
1053 let first_routed = if let Some(credit) = cfg.credit {
1055 let shared = router.as_ptr();
1056 let (eligible, no_credit, full, depths) =
1057 compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
1058 credit_stats.depth = depths;
1059
1060 let step = router
1061 .route_once_with_credit_masks(cfg.source, cfg.mode, eligible, no_credit, full);
1062 if !step.routed {
1063 false
1064 } else {
1065 c.routed += 1;
1066 c.delivered += step.delivered as u64;
1067 c.dropped += (step.dropped_full + step.dropped_no_credit) as u64;
1068 match cfg.mode {
1069 RouterMode::Broadcast => c.dropped_queue_full += step.dropped_full as u64,
1070 RouterMode::WorkQueue => c.dropped_all_full += step.dropped_full as u64,
1071 }
1072 c.dropped_no_credit += step.dropped_no_credit as u64;
1073
1074 if cfg.mode == RouterMode::Broadcast {
1075 apply_masks_to_stats(
1076 credit_stats,
1077 step.delivered_mask,
1078 step.dropped_full_mask,
1079 step.dropped_no_credit_mask,
1080 );
1081 } else {
1082 apply_masks_to_stats(credit_stats, step.delivered_mask, 0, 0);
1084
1085 if step.dropped_no_credit != 0 {
1087 apply_masks_to_stats(credit_stats, 0, 0, no_credit);
1088 }
1089 if step.dropped_full != 0 {
1090 apply_masks_to_stats(credit_stats, 0, full, 0);
1091 }
1092 }
1093
1094 true
1095 }
1096 } else {
1097 let first = router.route_once_with_stats(cfg.source, cfg.mode);
1098 if !first.routed {
1099 false
1100 } else {
1101 c.routed += 1;
1102 c.delivered += first.delivered as u64;
1103 c.dropped += first.dropped as u64;
1104 if first.dropped != 0 {
1105 match cfg.mode {
1106 RouterMode::Broadcast => c.dropped_queue_full += first.dropped as u64,
1107 RouterMode::WorkQueue => c.dropped_all_full += first.dropped as u64,
1108 }
1109 }
1110 true
1111 }
1112 };
1113
1114 if !first_routed {
1115 c.idle_waits += 1;
1116
1117 if cfg.idle_spin_limit != 0 {
1119 let shared = router.as_ptr();
1120 for _ in 0..cfg.idle_spin_limit {
1121 if should_stop() {
1122 return c;
1123 }
1124 if has_pending_message(shared, cfg.source) {
1125 break;
1126 }
1127 spin_loop();
1128 }
1129 if has_pending_message(shared, cfg.source) {
1130 continue;
1131 }
1132 }
1133
1134 if cfg.policy == BackpressurePolicy::Block {
1135 if let Some(br) = &blocking {
1136 let expected = br.producer_seq();
1137 c.wake_waits += 1;
1138 match br.wait_producer_seq_ne(expected, Some(IDLE_WAIT_TIMEOUT)) {
1139 Ok(true) => {}
1140 Ok(false) => c.wake_timeouts += 1,
1141 Err(_) => wait.wait(),
1142 }
1143 } else {
1144 wait.wait();
1145 }
1146 } else {
1147 wait.wait();
1148 }
1149
1150 continue;
1151 }
1152
1153 let mut batch_size: u32 = 0;
1155
1156 #[cfg(feature = "std")]
1157 let batch_start = std::time::Instant::now();
1158
1159 batch_size += 1;
1160
1161 while batch_size < batch_max {
1162 #[cfg(feature = "std")]
1163 if let Some(us) = cfg.batch_time_us {
1164 if batch_start.elapsed() >= Duration::from_micros(us as u64) {
1165 break;
1166 }
1167 }
1168
1169 if cfg.mode == RouterMode::WorkQueue
1172 && (cfg.policy == BackpressurePolicy::SpinThenBlock
1173 || cfg.policy == BackpressurePolicy::Block)
1174 {
1175 let shared = router.as_ptr();
1176 let blocked = if let Some(credit) = cfg.credit {
1177 has_pending_message(shared, cfg.source)
1178 && !any_eligible_consumer_has_space_and_credit(
1179 shared,
1180 credit.credit_max.max(1),
1181 &credit_stats.detached,
1182 )
1183 } else {
1184 has_pending_message(shared, cfg.source) && !any_consumer_has_space(shared)
1185 };
1186
1187 if blocked {
1188 break;
1189 }
1190 }
1191
1192 if let Some(credit) = cfg.credit {
1193 let shared = router.as_ptr();
1194 let (eligible, no_credit, full, depths) =
1195 compute_credit_masks(shared, credit.credit_max.max(1), &credit_stats.detached);
1196 credit_stats.depth = depths;
1197
1198 let step = router
1199 .route_once_with_credit_masks(cfg.source, cfg.mode, eligible, no_credit, full);
1200 if !step.routed {
1201 break;
1202 }
1203
1204 c.routed += 1;
1205 c.delivered += step.delivered as u64;
1206 c.dropped += (step.dropped_full + step.dropped_no_credit) as u64;
1207 match cfg.mode {
1208 RouterMode::Broadcast => c.dropped_queue_full += step.dropped_full as u64,
1209 RouterMode::WorkQueue => c.dropped_all_full += step.dropped_full as u64,
1210 }
1211 c.dropped_no_credit += step.dropped_no_credit as u64;
1212
1213 if cfg.mode == RouterMode::Broadcast {
1214 apply_masks_to_stats(
1215 credit_stats,
1216 step.delivered_mask,
1217 step.dropped_full_mask,
1218 step.dropped_no_credit_mask,
1219 );
1220 } else {
1221 apply_masks_to_stats(credit_stats, step.delivered_mask, 0, 0);
1222 if step.dropped_no_credit != 0 {
1223 apply_masks_to_stats(credit_stats, 0, 0, no_credit);
1224 }
1225 if step.dropped_full != 0 {
1226 apply_masks_to_stats(credit_stats, 0, full, 0);
1227 }
1228 }
1229 } else {
1230 let step = router.route_once_with_stats(cfg.source, cfg.mode);
1231 if !step.routed {
1232 break;
1233 }
1234 c.routed += 1;
1235 c.delivered += step.delivered as u64;
1236 c.dropped += step.dropped as u64;
1237 if step.dropped != 0 {
1238 match cfg.mode {
1239 RouterMode::Broadcast => c.dropped_queue_full += step.dropped as u64,
1240 RouterMode::WorkQueue => c.dropped_all_full += step.dropped as u64,
1241 }
1242 }
1243 }
1244 batch_size += 1;
1245 }
1246
1247 c.batches += 1;
1248 c.batch_sum += batch_size as u64;
1249 c.batch_max = c.batch_max.max(batch_size);
1250 wait.reset();
1251 }
1252
1253 c
1254}
1255
1256#[cfg(feature = "std")]
1260#[inline]
1261pub fn route_for<const N: usize, W: WaitStrategy>(
1262 router: &FanoutRouter<N>,
1263 cfg: RouterLoopConfig,
1264 wait: &mut W,
1265 dur: Duration,
1266) -> RouterCounters {
1267 let start = std::time::Instant::now();
1268 route_until(router, cfg, wait, || start.elapsed() >= dur)
1269}