indexbus_transport_shm/
regions.rs

1use indexbus_abi::layouts::{JournalLayout4, SequencerLayout, SharedFanoutLayout, SharedLayout};
2use indexbus_core::{
3    fanout_handles, split_mpsc, split_spsc, FanoutConsumer, FanoutProducer, FanoutRouter,
4    MpscConsumer, MpscProducer, SpscReceiver, SpscSender,
5};
6use indexbus_log::{
7    JournalPublisher, JournalPublisherConfig, JournalSubscriber, JournalSubscriberConfig,
8};
9use indexbus_seq::{
10    SequencerBarrier, SequencerConsumer, SequencerProducer, SequencerProducerConfig,
11};
12
13use std::path::{Path, PathBuf};
14
15use crate::internal;
16use crate::Error;
17
18/// Options for opening a mapped region.
19///
20/// These options must be consistent across all processes that open the same backing file.
21/// For example, if a creator initializes without `blocking` and a joiner requests `blocking`,
22/// opening fails with a layout compatibility error.
23#[derive(Debug, Clone, Default)]
24pub struct OpenOptions {
25    path: Option<PathBuf>,
26    blocking: bool,
27}
28
29/// Options for opening a mapped journal region.
30///
31/// These options must be consistent across all processes that open the same backing file.
32#[derive(Debug, Clone, Default)]
33pub struct JournalOpenOptions {
34    path: Option<PathBuf>,
35    blocking: bool,
36    stats: bool,
37}
38
39impl JournalOpenOptions {
40    #[inline]
41    /// Create default journal open options.
42    pub fn new() -> Self {
43        Self::default()
44    }
45
46    #[inline]
47    /// Override the backing file path for the mapping.
48    pub fn path(mut self, path: impl Into<PathBuf>) -> Self {
49        self.path = Some(path.into());
50        self
51    }
52
53    /// Enable an appended wake section and set `INDEXBUS_CAP_SUPPORTS_BLOCKING`.
54    ///
55    /// All participants that open the same backing file should agree on this flag.
56    #[inline]
57    pub fn blocking(mut self, enabled: bool) -> Self {
58        self.blocking = enabled;
59        self
60    }
61
62    /// Enable an appended stats section and set `INDEXBUS_CAP_SUPPORTS_JOURNAL_STATS`.
63    ///
64    /// All participants that open the same backing file should agree on this flag.
65    #[inline]
66    pub fn stats(mut self, enabled: bool) -> Self {
67        self.stats = enabled;
68        self
69    }
70
71    #[inline]
72    fn resolve_path(&self, name: &str) -> PathBuf {
73        self.path
74            .clone()
75            .unwrap_or_else(|| internal::paths::default_region_path(name))
76    }
77}
78
79impl OpenOptions {
80    #[inline]
81    /// Create default open options.
82    pub fn new() -> Self {
83        Self::default()
84    }
85
86    /// Use a specific file path for the mapping.
87    #[inline]
88    pub fn path(mut self, path: impl Into<PathBuf>) -> Self {
89        self.path = Some(path.into());
90        self
91    }
92
93    /// Enable appended wake sections and set `INDEXBUS_CAP_SUPPORTS_BLOCKING`.
94    ///
95    /// All participants that open the same backing file should agree on this flag.
96    #[inline]
97    pub fn blocking(mut self, enabled: bool) -> Self {
98        self.blocking = enabled;
99        self
100    }
101
102    #[inline]
103    fn resolve_path(&self, name: &str) -> PathBuf {
104        self.path
105            .clone()
106            .unwrap_or_else(|| internal::paths::default_region_path(name))
107    }
108}
109
110/// File-backed shared events region (SPSC + MPSC).
111///
112/// Creator/joiner behavior:
113///
114/// - If the backing file is new/empty, the first opener initializes the layout.
115/// - If another process is initializing, openers wait until `initialized == 2`.
116/// - Joiners validate the v1 header, claimed layout size, and requested capabilities.
117pub struct EventsRegion {
118    _mmap: memmap2::MmapMut,
119    ptr: *mut SharedLayout,
120}
121
122unsafe impl Send for EventsRegion {}
123unsafe impl Sync for EventsRegion {}
124
125impl EventsRegion {
126    /// Open (or create) an events region using the default path for `name`.
127    pub fn open(name: &str) -> Result<Self, Error> {
128        Self::open_with(name, OpenOptions::default())
129    }
130
131    /// Open (or create) an events region using explicit `options`.
132    pub fn open_with(name: &str, options: OpenOptions) -> Result<Self, Error> {
133        let path = options.resolve_path(name);
134        Self::open_path_with(path, options)
135    }
136
137    /// Open (or create) an events region backed by `path`.
138    pub fn open_path(path: impl AsRef<Path>) -> Result<Self, Error> {
139        Self::open_path_with(path, OpenOptions::default())
140    }
141
142    /// Open (or create) an events region backed by `path` with explicit `options`.
143    ///
144    /// # Errors
145    ///
146    /// Returns:
147    /// - [`Error::Io`] on filesystem/mapping failures
148    /// - [`Error::Misaligned`] if the mapping base address does not meet the ABI alignment
149    /// - [`Error::RegionTooSmall`] if the mapped file is smaller than the layout claims
150    /// - [`Error::Layout`] if the region is uninitialized or incompatible (wrong options/caps)
151    pub fn open_path_with(path: impl AsRef<Path>, options: OpenOptions) -> Result<Self, Error> {
152        // Initialization handshake (cross-process safe):
153        // - If the file is new/empty, this opener becomes the creator and initializes the layout.
154        // - If another process is initializing, we wait for `initialized == 2`, then validate.
155        // - Joiners validate the header and fail fast on capability/layout mismatches.
156        let bytes = internal::init::required_bytes_shared(options.blocking);
157        let mapped = internal::region::MappedRegion::open_file_backed(path.as_ref(), bytes)?;
158        let mut mmap = mapped.mmap;
159
160        // Defense-in-depth: ensure the mapping is aligned for the layout.
161        let base = mmap.as_mut_ptr() as usize;
162        let align = core::mem::align_of::<SharedLayout>();
163        if !base.is_multiple_of(align) {
164            return Err(Error::Misaligned { align });
165        }
166        let ptr = mmap.as_mut_ptr() as *mut SharedLayout;
167
168        unsafe {
169            internal::init::init_or_wait_shared(ptr, bytes, options.blocking)?;
170        }
171
172        // If blocking was requested, require the blocking capability (wake sections present).
173        if options.blocking {
174            let caps = unsafe { (*ptr).header.capabilities };
175            if (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) == 0 {
176                return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
177            }
178        }
179
180        // Ensure the mapping is large enough to cover the claimed layout bytes.
181        let claimed = unsafe { (*ptr).header.layout_bytes as usize };
182        if mapped.mapped_bytes < claimed {
183            return Err(Error::RegionTooSmall {
184                needed: claimed,
185                found: mapped.mapped_bytes,
186            });
187        }
188
189        Ok(Self { _mmap: mmap, ptr })
190    }
191
192    #[inline]
193    /// Get the raw layout pointer for this mapping.
194    ///
195    /// The pointer is valid as long as the `EventsRegion` is alive.
196    pub fn as_ptr(&self) -> *mut SharedLayout {
197        self.ptr
198    }
199
200    #[inline]
201    /// Split the region into an SPSC sender/receiver pair.
202    ///
203    /// This validates the mapping header before constructing the handles.
204    pub fn split_spsc(&mut self) -> Result<(SpscSender, SpscReceiver), indexbus_core::Error> {
205        let cell = unsafe { indexbus_core::SharedLayoutCell::from_ptr(self.ptr) };
206        split_spsc(cell)
207    }
208
209    #[inline]
210    /// Split the region into an MPSC producer/consumer pair.
211    ///
212    /// This validates the mapping header before constructing the handles.
213    pub fn split_mpsc(&mut self) -> Result<(MpscProducer, MpscConsumer), indexbus_core::Error> {
214        let cell = unsafe { indexbus_core::SharedLayoutCell::from_ptr(self.ptr) };
215        split_mpsc(cell)
216    }
217}
218
219/// File-backed shared fanout region.
220///
221/// See [`EventsRegion`] for the creator/joiner initialization model.
222pub struct FanoutRegion<const N: usize> {
223    _mmap: memmap2::MmapMut,
224    ptr: *mut SharedFanoutLayout<N>,
225}
226
227/// File-backed shared sequencer region.
228///
229/// See [`EventsRegion`] for the creator/joiner initialization model.
230pub struct SequencerRegion<const N: usize> {
231    _mmap: memmap2::MmapMut,
232    ptr: *mut SequencerLayout<N>,
233}
234
235unsafe impl<const N: usize> Send for SequencerRegion<N> {}
236unsafe impl<const N: usize> Sync for SequencerRegion<N> {}
237
238impl<const N: usize> SequencerRegion<N> {
239    /// Open (or create) a sequencer region using the default path for `name`.
240    pub fn open(name: &str) -> Result<Self, Error> {
241        Self::open_with(name, OpenOptions::default())
242    }
243
244    /// Open (or create) a sequencer region using explicit `options`.
245    pub fn open_with(name: &str, options: OpenOptions) -> Result<Self, Error> {
246        let path = options.resolve_path(name);
247        Self::open_path_with(path, options)
248    }
249
250    /// Open (or create) a sequencer region backed by `path`.
251    pub fn open_path(path: impl AsRef<Path>) -> Result<Self, Error> {
252        Self::open_path_with(path, OpenOptions::default())
253    }
254
255    /// Open (or create) a sequencer region backed by `path` with explicit `options`.
256    ///
257    /// See [`EventsRegion::open_path_with`] for error semantics.
258    pub fn open_path_with(path: impl AsRef<Path>, options: OpenOptions) -> Result<Self, Error> {
259        let bytes = internal::init::required_bytes_sequencer::<N>(options.blocking);
260        let mapped = internal::region::MappedRegion::open_file_backed(path.as_ref(), bytes)?;
261        let mut mmap = mapped.mmap;
262
263        let base = mmap.as_mut_ptr() as usize;
264        let align = core::mem::align_of::<SequencerLayout<N>>();
265        if !base.is_multiple_of(align) {
266            return Err(Error::Misaligned { align });
267        }
268        let ptr = mmap.as_mut_ptr() as *mut SequencerLayout<N>;
269
270        unsafe {
271            internal::init::init_or_wait_sequencer::<N>(ptr, bytes, options.blocking)?;
272        }
273
274        if options.blocking {
275            let caps = unsafe { (*ptr).header.capabilities };
276            if (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) == 0 {
277                return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
278            }
279        }
280
281        let claimed = unsafe { (*ptr).header.layout_bytes as usize };
282        if mapped.mapped_bytes < claimed {
283            return Err(Error::RegionTooSmall {
284                needed: claimed,
285                found: mapped.mapped_bytes,
286            });
287        }
288
289        Ok(Self { _mmap: mmap, ptr })
290    }
291
292    #[inline]
293    /// Get the raw layout pointer for this mapping.
294    ///
295    /// The pointer is valid as long as the `SequencerRegion` is alive.
296    pub fn as_ptr(&self) -> *mut SequencerLayout<N> {
297        self.ptr
298    }
299
300    #[inline]
301    /// Create a sequencer producer with the provided configuration.
302    pub fn producer(
303        &self,
304        cfg: SequencerProducerConfig,
305    ) -> Result<SequencerProducer<'_, N>, indexbus_seq::Error> {
306        unsafe { SequencerProducer::try_new(&*self.ptr, cfg) }
307    }
308
309    #[inline]
310    /// Create a sequencer consumer for `consumer`.
311    pub fn consumer(
312        &self,
313        consumer: usize,
314    ) -> Result<SequencerConsumer<'_, N>, indexbus_seq::Error> {
315        unsafe { SequencerConsumer::try_new(&*self.ptr, consumer) }
316    }
317
318    #[inline]
319    /// Create a barrier that tracks the producer cursor.
320    pub fn barrier_for_cursor(&self) -> Result<SequencerBarrier<'_, N>, indexbus_seq::Error> {
321        unsafe { SequencerBarrier::for_cursor(&*self.ptr) }
322    }
323}
324
325/// File-backed journal region (concrete v1 layout).
326///
327/// See [`EventsRegion`] for the creator/joiner initialization model.
328pub struct JournalRegion4 {
329    _mmap: memmap2::MmapMut,
330    ptr: *mut JournalLayout4,
331}
332
333unsafe impl Send for JournalRegion4 {}
334unsafe impl Sync for JournalRegion4 {}
335
336impl JournalRegion4 {
337    /// Open (or create) a journal region using the default path for `name`.
338    pub fn open(name: &str) -> Result<Self, Error> {
339        Self::open_with(name, JournalOpenOptions::default())
340    }
341
342    /// Open (or create) a journal region using explicit `options`.
343    pub fn open_with(name: &str, options: JournalOpenOptions) -> Result<Self, Error> {
344        let path = options.resolve_path(name);
345        Self::open_path_with(path, options)
346    }
347
348    /// Open (or create) a journal region backed by `path`.
349    pub fn open_path(path: impl AsRef<Path>) -> Result<Self, Error> {
350        Self::open_path_with(path, JournalOpenOptions::default())
351    }
352
353    /// Open (or create) a journal region backed by `path` with explicit `options`.
354    ///
355    /// See [`EventsRegion::open_path_with`] for error semantics.
356    pub fn open_path_with(
357        path: impl AsRef<Path>,
358        options: JournalOpenOptions,
359    ) -> Result<Self, Error> {
360        let bytes = internal::init::required_bytes_journal(options.blocking, options.stats);
361        let mapped = internal::region::MappedRegion::open_file_backed(path.as_ref(), bytes)?;
362        let mut mmap = mapped.mmap;
363
364        let base = mmap.as_mut_ptr() as usize;
365        let align = core::mem::align_of::<JournalLayout4>();
366        if !base.is_multiple_of(align) {
367            return Err(Error::Misaligned { align });
368        }
369        let ptr = mmap.as_mut_ptr() as *mut JournalLayout4;
370
371        unsafe {
372            internal::init::init_or_wait_journal(ptr, bytes, options.blocking, options.stats)?;
373        }
374
375        let caps = unsafe { (*ptr).header.capabilities };
376        if options.blocking && (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) == 0 {
377            return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
378        }
379        if options.stats && (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_JOURNAL_STATS) == 0 {
380            return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
381        }
382
383        let claimed = unsafe { (*ptr).header.layout_bytes as usize };
384        if mapped.mapped_bytes < claimed {
385            return Err(Error::RegionTooSmall {
386                needed: claimed,
387                found: mapped.mapped_bytes,
388            });
389        }
390
391        Ok(Self { _mmap: mmap, ptr })
392    }
393
394    /// Flush modified pages to the underlying file (best-effort durability).
395    #[cfg(feature = "std")]
396    pub fn flush(&self) -> std::io::Result<()> {
397        self._mmap.flush()
398    }
399
400    /// Flush modified pages asynchronously.
401    #[cfg(feature = "std")]
402    pub fn flush_async(&self) -> std::io::Result<()> {
403        self._mmap.flush_async()
404    }
405
406    #[inline]
407    /// Get the raw layout pointer for this mapping.
408    ///
409    /// The pointer is valid as long as the `JournalRegion4` is alive.
410    pub fn as_ptr(&self) -> *mut JournalLayout4 {
411        self.ptr
412    }
413
414    #[inline]
415    /// Create a journal publisher.
416    pub fn publisher(
417        &mut self,
418        cfg: JournalPublisherConfig,
419    ) -> Result<JournalPublisher, indexbus_log::Error> {
420        unsafe { JournalPublisher::new(&mut *self.ptr, cfg) }
421    }
422
423    #[inline]
424    /// Create a journal subscriber at `idx` starting from `start_pos`.
425    pub fn subscriber(
426        &mut self,
427        idx: usize,
428        start_pos: u64,
429    ) -> Result<JournalSubscriber, indexbus_log::Error> {
430        unsafe { JournalSubscriber::new(&mut *self.ptr, idx, start_pos) }
431    }
432
433    #[inline]
434    /// Create a journal subscriber with explicit configuration.
435    pub fn subscriber_with_config(
436        &mut self,
437        idx: usize,
438        start_pos: u64,
439        cfg: JournalSubscriberConfig,
440    ) -> Result<JournalSubscriber, indexbus_log::Error> {
441        unsafe { JournalSubscriber::new_with_config(&mut *self.ptr, idx, start_pos, cfg) }
442    }
443}
444
445unsafe impl<const N: usize> Send for FanoutRegion<N> {}
446unsafe impl<const N: usize> Sync for FanoutRegion<N> {}
447
448impl<const N: usize> FanoutRegion<N> {
449    /// Open (or create) a fanout region using the default path for `name`.
450    pub fn open(name: &str) -> Result<Self, Error> {
451        Self::open_with(name, OpenOptions::default())
452    }
453
454    /// Open (or create) a fanout region using explicit `options`.
455    pub fn open_with(name: &str, options: OpenOptions) -> Result<Self, Error> {
456        let path = options.resolve_path(name);
457        Self::open_path_with(path, options)
458    }
459
460    /// Open (or create) a fanout region backed by `path`.
461    pub fn open_path(path: impl AsRef<Path>) -> Result<Self, Error> {
462        Self::open_path_with(path, OpenOptions::default())
463    }
464
465    /// Open (or create) a fanout region backed by `path` with explicit `options`.
466    ///
467    /// See [`EventsRegion::open_path_with`] for error semantics.
468    pub fn open_path_with(path: impl AsRef<Path>, options: OpenOptions) -> Result<Self, Error> {
469        // See `EventsRegion::open_path_with` for creator/joiner handshake behavior.
470        let bytes = internal::init::required_bytes_fanout::<N>(options.blocking);
471        let mapped = internal::region::MappedRegion::open_file_backed(path.as_ref(), bytes)?;
472        let mut mmap = mapped.mmap;
473
474        let base = mmap.as_mut_ptr() as usize;
475        let align = core::mem::align_of::<SharedFanoutLayout<N>>();
476        if !base.is_multiple_of(align) {
477            return Err(Error::Misaligned { align });
478        }
479        let ptr = mmap.as_mut_ptr() as *mut SharedFanoutLayout<N>;
480
481        unsafe {
482            internal::init::init_or_wait_fanout::<N>(ptr, bytes, options.blocking)?;
483        }
484
485        if options.blocking {
486            let caps = unsafe { (*ptr).header.capabilities };
487            if (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) == 0 {
488                return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
489            }
490        }
491
492        let claimed = unsafe { (*ptr).header.layout_bytes as usize };
493        if mapped.mapped_bytes < claimed {
494            return Err(Error::RegionTooSmall {
495                needed: claimed,
496                found: mapped.mapped_bytes,
497            });
498        }
499
500        Ok(Self { _mmap: mmap, ptr })
501    }
502
503    #[inline]
504    /// Get the raw layout pointer for this mapping.
505    ///
506    /// The pointer is valid as long as the `FanoutRegion` is alive.
507    pub fn as_ptr(&self) -> *mut SharedFanoutLayout<N> {
508        self.ptr
509    }
510
511    #[inline]
512    /// Create producer/router/consumer handles for `consumer`.
513    pub fn handles(
514        &mut self,
515        consumer: usize,
516    ) -> Result<(FanoutProducer<N>, FanoutRouter<N>, FanoutConsumer<N>), indexbus_core::Error> {
517        let cell = unsafe { indexbus_core::SharedFanoutLayoutCell::from_ptr(self.ptr) };
518        fanout_handles(cell, consumer)
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525    use indexbus_core::{RouterMode, RouterSource, WaitStrategy};
526    use std::time::{SystemTime, UNIX_EPOCH};
527
528    fn unique_temp_path(prefix: &str) -> std::path::PathBuf {
529        let mut p = std::env::temp_dir();
530        let nonce = SystemTime::now()
531            .duration_since(UNIX_EPOCH)
532            .unwrap()
533            .as_nanos();
534        p.push(format!("{}_{}_{}.mmap", prefix, std::process::id(), nonce));
535        p
536    }
537
538    fn run_child(test_name: &str, path: &std::path::Path, kind: &str) {
539        run_child_blocking(test_name, path, kind, false);
540    }
541
542    fn run_child_blocking(test_name: &str, path: &std::path::Path, kind: &str, blocking: bool) {
543        let exe = std::env::current_exe().unwrap();
544        let status = std::process::Command::new(exe)
545            .arg("--exact")
546            .arg(test_name)
547            .arg("--nocapture")
548            .env("INDEXBUS_SHM_CHILD_KIND", kind)
549            .env("INDEXBUS_SHM_CHILD_PATH", path)
550            .env(
551                "INDEXBUS_SHM_CHILD_BLOCKING",
552                if blocking { "1" } else { "0" },
553            )
554            .status()
555            .unwrap();
556        assert!(status.success(), "child test failed: {status:?}");
557    }
558
559    #[test]
560    fn blocking_mismatch_fails_fast() {
561        let path = unique_temp_path("indexbus_transport_shm_blocking_mismatch");
562
563        // Create a file large enough for a blocking region, but initialize it *without*
564        // the blocking capability. This simulates a creator/joiner option mismatch.
565        let bytes = crate::internal::init::required_bytes_shared(true);
566        let mapped = crate::internal::region::MappedRegion::open_file_backed(&path, bytes).unwrap();
567        let mut mmap = mapped.mmap;
568        let ptr = mmap.as_mut_ptr() as *mut SharedLayout;
569        unsafe {
570            crate::internal::init::init_or_wait_shared(ptr, bytes, false).unwrap();
571        }
572        drop(mmap);
573
574        // Now request blocking; must fail because capability is missing.
575        let err = EventsRegion::open_path_with(&path, OpenOptions::new().blocking(true))
576            .err()
577            .expect("expected error");
578        match err {
579            Error::Layout(indexbus_core::Error::IncompatibleLayout) => {}
580            other => panic!("unexpected error: {other:?}"),
581        }
582
583        let _ = std::fs::remove_file(&path);
584    }
585
586    #[test]
587    fn too_small_region_errors() {
588        let path = unique_temp_path("indexbus_transport_shm_too_small");
589
590        // Create a too-small backing file.
591        std::fs::write(&path, [0u8; 16]).unwrap();
592
593        let err = EventsRegion::open_path_with(&path, OpenOptions::new().blocking(false))
594            .err()
595            .expect("expected error");
596        match err {
597            Error::RegionTooSmall { .. } => {}
598            other => panic!("unexpected error: {other:?}"),
599        }
600
601        let _ = std::fs::remove_file(&path);
602    }
603
604    #[test]
605    #[cfg(unix)]
606    fn new_region_file_permissions_are_restrictive() {
607        use std::os::unix::fs::PermissionsExt;
608
609        let path = unique_temp_path("indexbus_transport_shm_perms");
610
611        let _region = EventsRegion::open_path_with(&path, OpenOptions::new().blocking(false))
612            .expect("open_path_with");
613
614        let meta = std::fs::metadata(&path).expect("metadata");
615        let mode = meta.permissions().mode() & 0o777;
616
617        // Owner read/write required.
618        assert_eq!(mode & 0o600, 0o600);
619        // No group/other permissions by default.
620        assert_eq!(mode & 0o077, 0);
621
622        let _ = std::fs::remove_file(&path);
623    }
624
625    #[test]
626    fn events_region_cross_process_spsc_roundtrip() {
627        let path = unique_temp_path("indexbus_transport_shm_events_spsc_xproc");
628
629        let mut region =
630            EventsRegion::open_path_with(&path, OpenOptions::new().blocking(false)).unwrap();
631        let (tx, _rx) = region.split_spsc().unwrap();
632        tx.publish(b"ping").unwrap();
633
634        run_child_blocking(
635            "events_region_cross_process_spsc_child",
636            &path,
637            "events_spsc",
638            false,
639        );
640
641        let _ = std::fs::remove_file(&path);
642    }
643
644    #[test]
645    fn events_region_cross_process_spsc_roundtrip_blocking() {
646        let path = unique_temp_path("indexbus_transport_shm_events_spsc_xproc_blocking");
647
648        let mut region =
649            EventsRegion::open_path_with(&path, OpenOptions::new().blocking(true)).unwrap();
650        let (tx, _rx) = region.split_spsc().unwrap();
651        tx.publish(b"ping").unwrap();
652
653        run_child_blocking(
654            "events_region_cross_process_spsc_child",
655            &path,
656            "events_spsc",
657            true,
658        );
659
660        let _ = std::fs::remove_file(&path);
661    }
662
663    #[test]
664    fn events_region_cross_process_spsc_child() {
665        let kind = std::env::var("INDEXBUS_SHM_CHILD_KIND").ok();
666        if kind.as_deref() != Some("events_spsc") {
667            return;
668        }
669        let path = std::env::var("INDEXBUS_SHM_CHILD_PATH").unwrap();
670        let blocking = std::env::var("INDEXBUS_SHM_CHILD_BLOCKING").ok().as_deref() == Some("1");
671
672        let mut region =
673            EventsRegion::open_path_with(&path, OpenOptions::new().blocking(blocking)).unwrap();
674        let (_tx, rx) = region.split_spsc().unwrap();
675
676        let mut wait = indexbus_core::SpinWait::default();
677        let mut out = [0u8; 32];
678        loop {
679            if let Some(n) = rx.try_recv_into(&mut out).unwrap() {
680                assert_eq!(&out[..n], b"ping");
681                break;
682            }
683            wait.wait();
684        }
685    }
686
687    #[test]
688    fn fanout_region_cross_process_broadcast_roundtrip() {
689        let path = unique_temp_path("indexbus_transport_shm_fanout_xproc");
690
691        let mut region =
692            FanoutRegion::<4>::open_path_with(&path, OpenOptions::new().blocking(false)).unwrap();
693        let (prod, _router, _c0) = region.handles(0).unwrap();
694        prod.publish(b"hi").unwrap();
695
696        run_child_blocking(
697            "fanout_region_cross_process_broadcast_child",
698            &path,
699            "fanout",
700            false,
701        );
702
703        let _ = std::fs::remove_file(&path);
704    }
705
706    #[test]
707    fn fanout_region_cross_process_broadcast_roundtrip_blocking() {
708        let path = unique_temp_path("indexbus_transport_shm_fanout_xproc_blocking");
709
710        let mut region =
711            FanoutRegion::<4>::open_path_with(&path, OpenOptions::new().blocking(true)).unwrap();
712        let (prod, _router, _c0) = region.handles(0).unwrap();
713        prod.publish(b"hi").unwrap();
714
715        run_child_blocking(
716            "fanout_region_cross_process_broadcast_child",
717            &path,
718            "fanout",
719            true,
720        );
721
722        let _ = std::fs::remove_file(&path);
723    }
724
725    #[test]
726    fn fanout_region_cross_process_broadcast_child() {
727        let kind = std::env::var("INDEXBUS_SHM_CHILD_KIND").ok();
728        if kind.as_deref() != Some("fanout") {
729            return;
730        }
731        let path = std::env::var("INDEXBUS_SHM_CHILD_PATH").unwrap();
732        let blocking = std::env::var("INDEXBUS_SHM_CHILD_BLOCKING").ok().as_deref() == Some("1");
733
734        let mut region =
735            FanoutRegion::<4>::open_path_with(&path, OpenOptions::new().blocking(blocking))
736                .unwrap();
737        let (_prod, router, cons0) = region.handles(0).unwrap();
738
739        assert!(router.route_once_with(RouterSource::Spsc, RouterMode::Broadcast));
740
741        let mut wait = indexbus_core::SpinWait::default();
742        let mut out = [0u8; 32];
743        loop {
744            if let Some(n) = cons0.try_recv_into(&mut out).unwrap() {
745                assert_eq!(&out[..n], b"hi");
746                break;
747            }
748            wait.wait();
749        }
750    }
751
752    #[test]
753    fn sequencer_region_cross_process_roundtrip() {
754        let path = unique_temp_path("indexbus_transport_shm_sequencer_xproc");
755
756        // Parent creates region + publishes one sequence.
757        let region = SequencerRegion::<2>::open_path_with(&path, OpenOptions::new()).unwrap();
758        let mut prod = region.producer(SequencerProducerConfig::new(8)).unwrap();
759        let s1 = prod.try_claim_next().unwrap();
760        prod.publish(s1).unwrap();
761
762        // Child opens mapping and advances gating.
763        run_child("sequencer_region_cross_process_child", &path, "sequencer");
764
765        // Child already verified it can observe the published cursor across processes.
766
767        let _ = std::fs::remove_file(&path);
768    }
769
770    #[test]
771    fn sequencer_region_cross_process_child() {
772        let kind = std::env::var("INDEXBUS_SHM_CHILD_KIND").ok();
773        if kind.as_deref() != Some("sequencer") {
774            return;
775        }
776        let path = std::env::var("INDEXBUS_SHM_CHILD_PATH").unwrap();
777        let blocking = std::env::var("INDEXBUS_SHM_CHILD_BLOCKING").ok().as_deref() == Some("1");
778        let region =
779            SequencerRegion::<2>::open_path_with(&path, OpenOptions::new().blocking(blocking))
780                .unwrap();
781
782        let barrier = region.barrier_for_cursor().unwrap();
783        let mut wait = indexbus_core::SpinWait::default();
784        assert!(barrier.wait_for(1, &mut wait) >= 1);
785
786        let mut c0 = region.consumer(0).unwrap();
787        c0.advance(1).unwrap();
788    }
789
790    #[test]
791    fn journal_region_cross_process_roundtrip() {
792        let path = unique_temp_path("indexbus_transport_shm_journal_xproc");
793
794        let mut region = JournalRegion4::open_path(&path).unwrap();
795        let mut pubr = region.publisher(JournalPublisherConfig::default()).unwrap();
796        pubr.try_append(b"hello").unwrap();
797
798        run_child("journal_region_cross_process_child", &path, "journal");
799
800        let _ = std::fs::remove_file(&path);
801    }
802
803    #[test]
804    fn journal_region_cross_process_child() {
805        let kind = std::env::var("INDEXBUS_SHM_CHILD_KIND").ok();
806        if kind.as_deref() != Some("journal") {
807            return;
808        }
809        let path = std::env::var("INDEXBUS_SHM_CHILD_PATH").unwrap();
810        let blocking = std::env::var("INDEXBUS_SHM_CHILD_BLOCKING").ok().as_deref() == Some("1");
811        let mut region =
812            JournalRegion4::open_path_with(&path, JournalOpenOptions::new().blocking(blocking))
813                .unwrap();
814        let mut sub = region.subscriber(0, 0).unwrap();
815
816        let mut wait = indexbus_core::SpinWait::default();
817        let mut out = [0u8; 32];
818        loop {
819            if let Some(msg) = sub.poll_next(&mut out).unwrap() {
820                assert_eq!(msg, b"hello");
821                break;
822            }
823            wait.wait();
824        }
825    }
826
827    #[test]
828    fn sequencer_region_cross_process_roundtrip_blocking() {
829        let path = unique_temp_path("indexbus_transport_shm_sequencer_xproc_blocking");
830
831        let region =
832            SequencerRegion::<2>::open_path_with(&path, OpenOptions::new().blocking(true)).unwrap();
833        let mut prod = region.producer(SequencerProducerConfig::new(8)).unwrap();
834        let s1 = prod.try_claim_next().unwrap();
835        prod.publish(s1).unwrap();
836
837        run_child_blocking(
838            "sequencer_region_cross_process_child",
839            &path,
840            "sequencer",
841            true,
842        );
843
844        let _ = std::fs::remove_file(&path);
845    }
846
847    #[test]
848    fn journal_region_cross_process_roundtrip_blocking() {
849        let path = unique_temp_path("indexbus_transport_shm_journal_xproc_blocking");
850
851        let mut region =
852            JournalRegion4::open_path_with(&path, JournalOpenOptions::new().blocking(true))
853                .unwrap();
854        let mut pubr = region.publisher(JournalPublisherConfig::default()).unwrap();
855        pubr.try_append(b"hello").unwrap();
856
857        run_child_blocking("journal_region_cross_process_child", &path, "journal", true);
858
859        let _ = std::fs::remove_file(&path);
860    }
861
862    #[test]
863    fn journal_region_restart_reattach_roundtrip() {
864        let path = unique_temp_path("indexbus_transport_shm_journal_restart_reattach");
865
866        // "First process": create + append.
867        {
868            let mut region = JournalRegion4::open_path(&path).unwrap();
869            let mut pubr = region.publisher(JournalPublisherConfig::default()).unwrap();
870            pubr.try_append(b"hello").unwrap();
871            pubr.try_append(b"world").unwrap();
872            region.flush().unwrap();
873        }
874
875        // "Restart": reopen mapping and tail from the beginning.
876        {
877            let mut region = JournalRegion4::open_path(&path).unwrap();
878            let mut sub = region.subscriber(0, 0).unwrap();
879
880            let mut wait = indexbus_core::SpinWait::default();
881            let mut out = [0u8; 32];
882
883            let mut got = Vec::new();
884            while got.len() < 2 {
885                if let Some(msg) = sub.poll_next(&mut out).unwrap() {
886                    got.push(msg.to_vec());
887                } else {
888                    wait.wait();
889                }
890            }
891
892            assert_eq!(got[0].as_slice(), b"hello");
893            assert_eq!(got[1].as_slice(), b"world");
894        }
895
896        let _ = std::fs::remove_file(&path);
897    }
898}