byteor_pipeline_backings_shm/
barriers.rs

1//! Cursor and gating barriers for SequencedSlots.
2
3use core::sync::atomic::Ordering;
4
5use byteor_pipeline_kernel::{SeqBarrier, WaitStrategy};
6
7use crate::sequenced_slots::ShmSequencedSlots;
8
9/// Cursor barrier (published sequences).
10pub struct CursorBarrier {
11    shm: ShmSequencedSlots,
12}
13
14impl CursorBarrier {
15    /// Create a cursor barrier.
16    pub fn new(shm: &ShmSequencedSlots) -> Self {
17        Self {
18            shm: ShmSequencedSlots {
19                region: shm.region.clone(),
20                ptr: shm.ptr,
21            },
22        }
23    }
24
25    /// Currently available published sequence.
26    pub fn available(&self) -> u64 {
27        unsafe { crate::ss_internal::load_cursor(self.shm.layout_ptr()) }
28    }
29
30    /// Wait until `available() >= seq`.
31    pub fn wait_for<W: WaitStrategy>(&self, seq: u64, wait: &mut W) -> u64 {
32        loop {
33            let avail = self.available();
34            if avail >= seq {
35                return avail;
36            }
37            wait.wait();
38        }
39    }
40}
41
42/// Gating barrier observing a particular consumer/stage's sequence.
43pub struct GatingBarrier {
44    shm: ShmSequencedSlots,
45    consumer: usize,
46}
47
48impl GatingBarrier {
49    /// Create a barrier observing `gating[consumer]`.
50    pub fn new(shm: &ShmSequencedSlots, consumer: usize) -> Self {
51        Self {
52            shm: ShmSequencedSlots {
53                region: shm.region.clone(),
54                ptr: shm.ptr,
55            },
56            consumer,
57        }
58    }
59
60    /// Currently available gating sequence.
61    pub fn available(&self) -> u64 {
62        let layout = self.shm.layout_ptr();
63        if self.consumer >= 4 {
64            return 0;
65        }
66        unsafe {
67            crate::ss_internal::atomic_u64(&(*layout).gating[self.consumer].seq)
68                .load(Ordering::Acquire)
69        }
70    }
71
72    /// Wait until `available() >= seq`.
73    pub fn wait_for<W: WaitStrategy>(&self, seq: u64, wait: &mut W) -> u64 {
74        loop {
75            let avail = self.available();
76            if avail >= seq {
77                return avail;
78            }
79            wait.wait();
80        }
81    }
82}
83
84impl SeqBarrier for CursorBarrier {
85    fn available(&self) -> u64 {
86        CursorBarrier::available(self)
87    }
88}
89
90impl SeqBarrier for GatingBarrier {
91    fn available(&self) -> u64 {
92        GatingBarrier::available(self)
93    }
94}