byteor_transport_shm/
regions.rs1use std::path::Path;
4
5use memmap2::MmapMut;
6
7use byteor_abi::layouts::{EventsChainLayout, SequencedSlotsLayout};
8
9use crate::init::{
10 init_or_wait_events_chain, init_or_wait_sequenced_slots, required_bytes_events_chain,
11 required_bytes_sequenced_slots,
12};
13use crate::mapping::{open_file_backed, MappingInfo};
14use crate::{Error, OpenOptions};
15
16pub struct SequencedSlotsRegion<const N: usize> {
18 _mmap: MmapMut,
19 info: MappingInfo,
20 ptr: *mut SequencedSlotsLayout<N>,
21}
22
23unsafe impl<const N: usize> Send for SequencedSlotsRegion<N> {}
24unsafe impl<const N: usize> Sync for SequencedSlotsRegion<N> {}
25
26impl<const N: usize> SequencedSlotsRegion<N> {
27 pub fn open(name: &str) -> Result<Self, Error> {
29 Self::open_with(name, OpenOptions::default())
30 }
31
32 pub fn open_with(name: &str, options: OpenOptions) -> Result<Self, Error> {
34 let path = options.resolve_path(name);
35 Self::open_path_with(path, options)
36 }
37
38 pub fn open_path(path: impl AsRef<Path>) -> Result<Self, Error> {
40 Self::open_path_with(path, OpenOptions::default())
41 }
42
43 pub fn open_path_with(path: impl AsRef<Path>, options: OpenOptions) -> Result<Self, Error> {
45 let bytes = required_bytes_sequenced_slots::<N>(options.blocking);
46 let mapped = open_file_backed(path.as_ref(), bytes, &options)?;
47 let mut mmap = mapped.mmap;
48
49 let base = mmap.as_mut_ptr() as usize;
50 let align = core::mem::align_of::<SequencedSlotsLayout<N>>();
51 if !base.is_multiple_of(align) {
52 return Err(Error::Misaligned { align });
53 }
54 let ptr = mmap.as_mut_ptr() as *mut SequencedSlotsLayout<N>;
55
56 unsafe {
57 init_or_wait_sequenced_slots::<N>(ptr, bytes, options.blocking)?;
58 }
59
60 if options.blocking {
61 let caps = unsafe { (*ptr).header.capabilities };
62 if (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) == 0 {
63 return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
64 }
65 }
66
67 let claimed = unsafe { (*ptr).header.layout_bytes as usize };
68 if mapped.info.mapped_bytes < claimed {
69 return Err(Error::RegionTooSmall {
70 needed: claimed,
71 found: mapped.info.mapped_bytes,
72 });
73 }
74
75 Ok(Self {
76 _mmap: mmap,
77 info: mapped.info,
78 ptr,
79 })
80 }
81
82 pub fn as_ptr(&self) -> *mut SequencedSlotsLayout<N> {
84 self.ptr
85 }
86
87 pub fn mapping_info(&self) -> MappingInfo {
89 self.info
90 }
91}
92
93pub struct EventsChainRegion<const Q: usize> {
95 _mmap: MmapMut,
96 info: MappingInfo,
97 ptr: *mut EventsChainLayout<Q>,
98}
99
100unsafe impl<const Q: usize> Send for EventsChainRegion<Q> {}
101unsafe impl<const Q: usize> Sync for EventsChainRegion<Q> {}
102
103impl<const Q: usize> EventsChainRegion<Q> {
104 pub fn open(name: &str) -> Result<Self, Error> {
106 Self::open_with(name, OpenOptions::default())
107 }
108
109 pub fn open_with(name: &str, options: OpenOptions) -> Result<Self, Error> {
111 if options.blocking {
112 return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
114 }
115 let path = options.resolve_path(name);
116 Self::open_path_with(path, options)
117 }
118
119 pub fn open_path(path: impl AsRef<Path>) -> Result<Self, Error> {
121 Self::open_path_with(path, OpenOptions::default())
122 }
123
124 pub fn open_path_with(path: impl AsRef<Path>, options: OpenOptions) -> Result<Self, Error> {
126 if options.blocking {
127 return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
128 }
129
130 let bytes = required_bytes_events_chain::<Q>();
131 let mapped = open_file_backed(path.as_ref(), bytes, &options)?;
132 let mut mmap = mapped.mmap;
133
134 let base = mmap.as_mut_ptr() as usize;
135 let align = core::mem::align_of::<EventsChainLayout<Q>>();
136 if !base.is_multiple_of(align) {
137 return Err(Error::Misaligned { align });
138 }
139 let ptr = mmap.as_mut_ptr() as *mut EventsChainLayout<Q>;
140
141 unsafe {
142 init_or_wait_events_chain::<Q>(ptr, bytes)?;
143 }
144
145 let claimed = unsafe { (*ptr).header.layout_bytes as usize };
146 if mapped.info.mapped_bytes < claimed {
147 return Err(Error::RegionTooSmall {
148 needed: claimed,
149 found: mapped.info.mapped_bytes,
150 });
151 }
152
153 Ok(Self {
154 _mmap: mmap,
155 info: mapped.info,
156 ptr,
157 })
158 }
159
160 pub fn as_ptr(&self) -> *mut EventsChainLayout<Q> {
162 self.ptr
163 }
164
165 pub fn mapping_info(&self) -> MappingInfo {
167 self.info
168 }
169
170 pub fn queue_spsc(
174 &self,
175 idx: usize,
176 ) -> Result<
177 (
178 indexbus_core::ChainSpscSender,
179 indexbus_core::ChainSpscReceiver,
180 ),
181 Error,
182 > {
183 if idx >= Q {
184 return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
185 }
186
187 let pool = unsafe { core::ptr::addr_of_mut!((*self.ptr).slot_pool) };
188 let queue = unsafe { core::ptr::addr_of_mut!((*self.ptr).queues[idx]) };
189 Ok(unsafe { indexbus_core::split_chain_spsc(pool, queue) })
190 }
191
192 pub fn queue_mpsc(
196 &self,
197 idx: usize,
198 ) -> Result<
199 (
200 indexbus_core::ChainMpscProducer,
201 indexbus_core::ChainMpscConsumer,
202 ),
203 Error,
204 > {
205 if idx >= Q {
206 return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
207 }
208
209 let pool = unsafe { core::ptr::addr_of_mut!((*self.ptr).slot_pool) };
210 let queue = unsafe { core::ptr::addr_of_mut!((*self.ptr).mpsc_queues[idx]) };
211 Ok(unsafe { indexbus_core::split_chain_mpsc(pool, queue) })
212 }
213}