1use indexbus_abi::layouts::{JournalLayout4, SequencerLayout, SharedFanoutLayout, SharedLayout};
2use indexbus_core::{
3 fanout_handles, split_mpsc, split_spsc, FanoutConsumer, FanoutProducer, FanoutRouter,
4 MpscConsumer, MpscProducer, SpscReceiver, SpscSender,
5};
6use indexbus_log::{
7 JournalPublisher, JournalPublisherConfig, JournalSubscriber, JournalSubscriberConfig,
8};
9use indexbus_seq::{
10 SequencerBarrier, SequencerConsumer, SequencerProducer, SequencerProducerConfig,
11};
12
13use std::path::{Path, PathBuf};
14
15use crate::internal;
16use crate::Error;
17
18#[derive(Debug, Clone, Default)]
24pub struct OpenOptions {
25 path: Option<PathBuf>,
26 blocking: bool,
27}
28
29#[derive(Debug, Clone, Default)]
33pub struct JournalOpenOptions {
34 path: Option<PathBuf>,
35 blocking: bool,
36 stats: bool,
37}
38
39impl JournalOpenOptions {
40 #[inline]
41 pub fn new() -> Self {
43 Self::default()
44 }
45
46 #[inline]
47 pub fn path(mut self, path: impl Into<PathBuf>) -> Self {
49 self.path = Some(path.into());
50 self
51 }
52
53 #[inline]
57 pub fn blocking(mut self, enabled: bool) -> Self {
58 self.blocking = enabled;
59 self
60 }
61
62 #[inline]
66 pub fn stats(mut self, enabled: bool) -> Self {
67 self.stats = enabled;
68 self
69 }
70
71 #[inline]
72 fn resolve_path(&self, name: &str) -> PathBuf {
73 self.path
74 .clone()
75 .unwrap_or_else(|| internal::paths::default_region_path(name))
76 }
77}
78
79impl OpenOptions {
80 #[inline]
81 pub fn new() -> Self {
83 Self::default()
84 }
85
86 #[inline]
88 pub fn path(mut self, path: impl Into<PathBuf>) -> Self {
89 self.path = Some(path.into());
90 self
91 }
92
93 #[inline]
97 pub fn blocking(mut self, enabled: bool) -> Self {
98 self.blocking = enabled;
99 self
100 }
101
102 #[inline]
103 fn resolve_path(&self, name: &str) -> PathBuf {
104 self.path
105 .clone()
106 .unwrap_or_else(|| internal::paths::default_region_path(name))
107 }
108}
109
110pub struct EventsRegion {
118 _mmap: memmap2::MmapMut,
119 ptr: *mut SharedLayout,
120}
121
122unsafe impl Send for EventsRegion {}
123unsafe impl Sync for EventsRegion {}
124
125impl EventsRegion {
126 pub fn open(name: &str) -> Result<Self, Error> {
128 Self::open_with(name, OpenOptions::default())
129 }
130
131 pub fn open_with(name: &str, options: OpenOptions) -> Result<Self, Error> {
133 let path = options.resolve_path(name);
134 Self::open_path_with(path, options)
135 }
136
137 pub fn open_path(path: impl AsRef<Path>) -> Result<Self, Error> {
139 Self::open_path_with(path, OpenOptions::default())
140 }
141
142 pub fn open_path_with(path: impl AsRef<Path>, options: OpenOptions) -> Result<Self, Error> {
152 let bytes = internal::init::required_bytes_shared(options.blocking);
157 let mapped = internal::region::MappedRegion::open_file_backed(path.as_ref(), bytes)?;
158 let mut mmap = mapped.mmap;
159
160 let base = mmap.as_mut_ptr() as usize;
162 let align = core::mem::align_of::<SharedLayout>();
163 if !base.is_multiple_of(align) {
164 return Err(Error::Misaligned { align });
165 }
166 let ptr = mmap.as_mut_ptr() as *mut SharedLayout;
167
168 unsafe {
169 internal::init::init_or_wait_shared(ptr, bytes, options.blocking)?;
170 }
171
172 if options.blocking {
174 let caps = unsafe { (*ptr).header.capabilities };
175 if (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) == 0 {
176 return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
177 }
178 }
179
180 let claimed = unsafe { (*ptr).header.layout_bytes as usize };
182 if mapped.mapped_bytes < claimed {
183 return Err(Error::RegionTooSmall {
184 needed: claimed,
185 found: mapped.mapped_bytes,
186 });
187 }
188
189 Ok(Self { _mmap: mmap, ptr })
190 }
191
192 #[inline]
193 pub fn as_ptr(&self) -> *mut SharedLayout {
197 self.ptr
198 }
199
200 #[inline]
201 pub fn split_spsc(&mut self) -> Result<(SpscSender, SpscReceiver), indexbus_core::Error> {
205 let cell = unsafe { indexbus_core::SharedLayoutCell::from_ptr(self.ptr) };
206 split_spsc(cell)
207 }
208
209 #[inline]
210 pub fn split_mpsc(&mut self) -> Result<(MpscProducer, MpscConsumer), indexbus_core::Error> {
214 let cell = unsafe { indexbus_core::SharedLayoutCell::from_ptr(self.ptr) };
215 split_mpsc(cell)
216 }
217}
218
219pub struct FanoutRegion<const N: usize> {
223 _mmap: memmap2::MmapMut,
224 ptr: *mut SharedFanoutLayout<N>,
225}
226
227pub struct SequencerRegion<const N: usize> {
231 _mmap: memmap2::MmapMut,
232 ptr: *mut SequencerLayout<N>,
233}
234
235unsafe impl<const N: usize> Send for SequencerRegion<N> {}
236unsafe impl<const N: usize> Sync for SequencerRegion<N> {}
237
238impl<const N: usize> SequencerRegion<N> {
239 pub fn open(name: &str) -> Result<Self, Error> {
241 Self::open_with(name, OpenOptions::default())
242 }
243
244 pub fn open_with(name: &str, options: OpenOptions) -> Result<Self, Error> {
246 let path = options.resolve_path(name);
247 Self::open_path_with(path, options)
248 }
249
250 pub fn open_path(path: impl AsRef<Path>) -> Result<Self, Error> {
252 Self::open_path_with(path, OpenOptions::default())
253 }
254
255 pub fn open_path_with(path: impl AsRef<Path>, options: OpenOptions) -> Result<Self, Error> {
259 let bytes = internal::init::required_bytes_sequencer::<N>(options.blocking);
260 let mapped = internal::region::MappedRegion::open_file_backed(path.as_ref(), bytes)?;
261 let mut mmap = mapped.mmap;
262
263 let base = mmap.as_mut_ptr() as usize;
264 let align = core::mem::align_of::<SequencerLayout<N>>();
265 if !base.is_multiple_of(align) {
266 return Err(Error::Misaligned { align });
267 }
268 let ptr = mmap.as_mut_ptr() as *mut SequencerLayout<N>;
269
270 unsafe {
271 internal::init::init_or_wait_sequencer::<N>(ptr, bytes, options.blocking)?;
272 }
273
274 if options.blocking {
275 let caps = unsafe { (*ptr).header.capabilities };
276 if (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) == 0 {
277 return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
278 }
279 }
280
281 let claimed = unsafe { (*ptr).header.layout_bytes as usize };
282 if mapped.mapped_bytes < claimed {
283 return Err(Error::RegionTooSmall {
284 needed: claimed,
285 found: mapped.mapped_bytes,
286 });
287 }
288
289 Ok(Self { _mmap: mmap, ptr })
290 }
291
292 #[inline]
293 pub fn as_ptr(&self) -> *mut SequencerLayout<N> {
297 self.ptr
298 }
299
300 #[inline]
301 pub fn producer(
303 &self,
304 cfg: SequencerProducerConfig,
305 ) -> Result<SequencerProducer<'_, N>, indexbus_seq::Error> {
306 unsafe { SequencerProducer::try_new(&*self.ptr, cfg) }
307 }
308
309 #[inline]
310 pub fn consumer(
312 &self,
313 consumer: usize,
314 ) -> Result<SequencerConsumer<'_, N>, indexbus_seq::Error> {
315 unsafe { SequencerConsumer::try_new(&*self.ptr, consumer) }
316 }
317
318 #[inline]
319 pub fn barrier_for_cursor(&self) -> Result<SequencerBarrier<'_, N>, indexbus_seq::Error> {
321 unsafe { SequencerBarrier::for_cursor(&*self.ptr) }
322 }
323}
324
325pub struct JournalRegion4 {
329 _mmap: memmap2::MmapMut,
330 ptr: *mut JournalLayout4,
331}
332
333unsafe impl Send for JournalRegion4 {}
334unsafe impl Sync for JournalRegion4 {}
335
336impl JournalRegion4 {
337 pub fn open(name: &str) -> Result<Self, Error> {
339 Self::open_with(name, JournalOpenOptions::default())
340 }
341
342 pub fn open_with(name: &str, options: JournalOpenOptions) -> Result<Self, Error> {
344 let path = options.resolve_path(name);
345 Self::open_path_with(path, options)
346 }
347
348 pub fn open_path(path: impl AsRef<Path>) -> Result<Self, Error> {
350 Self::open_path_with(path, JournalOpenOptions::default())
351 }
352
353 pub fn open_path_with(
357 path: impl AsRef<Path>,
358 options: JournalOpenOptions,
359 ) -> Result<Self, Error> {
360 let bytes = internal::init::required_bytes_journal(options.blocking, options.stats);
361 let mapped = internal::region::MappedRegion::open_file_backed(path.as_ref(), bytes)?;
362 let mut mmap = mapped.mmap;
363
364 let base = mmap.as_mut_ptr() as usize;
365 let align = core::mem::align_of::<JournalLayout4>();
366 if !base.is_multiple_of(align) {
367 return Err(Error::Misaligned { align });
368 }
369 let ptr = mmap.as_mut_ptr() as *mut JournalLayout4;
370
371 unsafe {
372 internal::init::init_or_wait_journal(ptr, bytes, options.blocking, options.stats)?;
373 }
374
375 let caps = unsafe { (*ptr).header.capabilities };
376 if options.blocking && (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) == 0 {
377 return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
378 }
379 if options.stats && (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_JOURNAL_STATS) == 0 {
380 return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
381 }
382
383 let claimed = unsafe { (*ptr).header.layout_bytes as usize };
384 if mapped.mapped_bytes < claimed {
385 return Err(Error::RegionTooSmall {
386 needed: claimed,
387 found: mapped.mapped_bytes,
388 });
389 }
390
391 Ok(Self { _mmap: mmap, ptr })
392 }
393
394 #[cfg(feature = "std")]
396 pub fn flush(&self) -> std::io::Result<()> {
397 self._mmap.flush()
398 }
399
400 #[cfg(feature = "std")]
402 pub fn flush_async(&self) -> std::io::Result<()> {
403 self._mmap.flush_async()
404 }
405
406 #[inline]
407 pub fn as_ptr(&self) -> *mut JournalLayout4 {
411 self.ptr
412 }
413
414 #[inline]
415 pub fn publisher(
417 &mut self,
418 cfg: JournalPublisherConfig,
419 ) -> Result<JournalPublisher, indexbus_log::Error> {
420 unsafe { JournalPublisher::new(&mut *self.ptr, cfg) }
421 }
422
423 #[inline]
424 pub fn subscriber(
426 &mut self,
427 idx: usize,
428 start_pos: u64,
429 ) -> Result<JournalSubscriber, indexbus_log::Error> {
430 unsafe { JournalSubscriber::new(&mut *self.ptr, idx, start_pos) }
431 }
432
433 #[inline]
434 pub fn subscriber_with_config(
436 &mut self,
437 idx: usize,
438 start_pos: u64,
439 cfg: JournalSubscriberConfig,
440 ) -> Result<JournalSubscriber, indexbus_log::Error> {
441 unsafe { JournalSubscriber::new_with_config(&mut *self.ptr, idx, start_pos, cfg) }
442 }
443}
444
445unsafe impl<const N: usize> Send for FanoutRegion<N> {}
446unsafe impl<const N: usize> Sync for FanoutRegion<N> {}
447
448impl<const N: usize> FanoutRegion<N> {
449 pub fn open(name: &str) -> Result<Self, Error> {
451 Self::open_with(name, OpenOptions::default())
452 }
453
454 pub fn open_with(name: &str, options: OpenOptions) -> Result<Self, Error> {
456 let path = options.resolve_path(name);
457 Self::open_path_with(path, options)
458 }
459
460 pub fn open_path(path: impl AsRef<Path>) -> Result<Self, Error> {
462 Self::open_path_with(path, OpenOptions::default())
463 }
464
465 pub fn open_path_with(path: impl AsRef<Path>, options: OpenOptions) -> Result<Self, Error> {
469 let bytes = internal::init::required_bytes_fanout::<N>(options.blocking);
471 let mapped = internal::region::MappedRegion::open_file_backed(path.as_ref(), bytes)?;
472 let mut mmap = mapped.mmap;
473
474 let base = mmap.as_mut_ptr() as usize;
475 let align = core::mem::align_of::<SharedFanoutLayout<N>>();
476 if !base.is_multiple_of(align) {
477 return Err(Error::Misaligned { align });
478 }
479 let ptr = mmap.as_mut_ptr() as *mut SharedFanoutLayout<N>;
480
481 unsafe {
482 internal::init::init_or_wait_fanout::<N>(ptr, bytes, options.blocking)?;
483 }
484
485 if options.blocking {
486 let caps = unsafe { (*ptr).header.capabilities };
487 if (caps & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) == 0 {
488 return Err(Error::Layout(indexbus_core::Error::IncompatibleLayout));
489 }
490 }
491
492 let claimed = unsafe { (*ptr).header.layout_bytes as usize };
493 if mapped.mapped_bytes < claimed {
494 return Err(Error::RegionTooSmall {
495 needed: claimed,
496 found: mapped.mapped_bytes,
497 });
498 }
499
500 Ok(Self { _mmap: mmap, ptr })
501 }
502
503 #[inline]
504 pub fn as_ptr(&self) -> *mut SharedFanoutLayout<N> {
508 self.ptr
509 }
510
511 #[inline]
512 pub fn handles(
514 &mut self,
515 consumer: usize,
516 ) -> Result<(FanoutProducer<N>, FanoutRouter<N>, FanoutConsumer<N>), indexbus_core::Error> {
517 let cell = unsafe { indexbus_core::SharedFanoutLayoutCell::from_ptr(self.ptr) };
518 fanout_handles(cell, consumer)
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525 use indexbus_core::{RouterMode, RouterSource, WaitStrategy};
526 use std::time::{SystemTime, UNIX_EPOCH};
527
528 fn unique_temp_path(prefix: &str) -> std::path::PathBuf {
529 let mut p = std::env::temp_dir();
530 let nonce = SystemTime::now()
531 .duration_since(UNIX_EPOCH)
532 .unwrap()
533 .as_nanos();
534 p.push(format!("{}_{}_{}.mmap", prefix, std::process::id(), nonce));
535 p
536 }
537
538 fn run_child(test_name: &str, path: &std::path::Path, kind: &str) {
539 run_child_blocking(test_name, path, kind, false);
540 }
541
542 fn run_child_blocking(test_name: &str, path: &std::path::Path, kind: &str, blocking: bool) {
543 let exe = std::env::current_exe().unwrap();
544 let status = std::process::Command::new(exe)
545 .arg("--exact")
546 .arg(test_name)
547 .arg("--nocapture")
548 .env("INDEXBUS_SHM_CHILD_KIND", kind)
549 .env("INDEXBUS_SHM_CHILD_PATH", path)
550 .env(
551 "INDEXBUS_SHM_CHILD_BLOCKING",
552 if blocking { "1" } else { "0" },
553 )
554 .status()
555 .unwrap();
556 assert!(status.success(), "child test failed: {status:?}");
557 }
558
559 #[test]
560 fn blocking_mismatch_fails_fast() {
561 let path = unique_temp_path("indexbus_transport_shm_blocking_mismatch");
562
563 let bytes = crate::internal::init::required_bytes_shared(true);
566 let mapped = crate::internal::region::MappedRegion::open_file_backed(&path, bytes).unwrap();
567 let mut mmap = mapped.mmap;
568 let ptr = mmap.as_mut_ptr() as *mut SharedLayout;
569 unsafe {
570 crate::internal::init::init_or_wait_shared(ptr, bytes, false).unwrap();
571 }
572 drop(mmap);
573
574 let err = EventsRegion::open_path_with(&path, OpenOptions::new().blocking(true))
576 .err()
577 .expect("expected error");
578 match err {
579 Error::Layout(indexbus_core::Error::IncompatibleLayout) => {}
580 other => panic!("unexpected error: {other:?}"),
581 }
582
583 let _ = std::fs::remove_file(&path);
584 }
585
586 #[test]
587 fn too_small_region_errors() {
588 let path = unique_temp_path("indexbus_transport_shm_too_small");
589
590 std::fs::write(&path, [0u8; 16]).unwrap();
592
593 let err = EventsRegion::open_path_with(&path, OpenOptions::new().blocking(false))
594 .err()
595 .expect("expected error");
596 match err {
597 Error::RegionTooSmall { .. } => {}
598 other => panic!("unexpected error: {other:?}"),
599 }
600
601 let _ = std::fs::remove_file(&path);
602 }
603
604 #[test]
605 #[cfg(unix)]
606 fn new_region_file_permissions_are_restrictive() {
607 use std::os::unix::fs::PermissionsExt;
608
609 let path = unique_temp_path("indexbus_transport_shm_perms");
610
611 let _region = EventsRegion::open_path_with(&path, OpenOptions::new().blocking(false))
612 .expect("open_path_with");
613
614 let meta = std::fs::metadata(&path).expect("metadata");
615 let mode = meta.permissions().mode() & 0o777;
616
617 assert_eq!(mode & 0o600, 0o600);
619 assert_eq!(mode & 0o077, 0);
621
622 let _ = std::fs::remove_file(&path);
623 }
624
625 #[test]
626 fn events_region_cross_process_spsc_roundtrip() {
627 let path = unique_temp_path("indexbus_transport_shm_events_spsc_xproc");
628
629 let mut region =
630 EventsRegion::open_path_with(&path, OpenOptions::new().blocking(false)).unwrap();
631 let (tx, _rx) = region.split_spsc().unwrap();
632 tx.publish(b"ping").unwrap();
633
634 run_child_blocking(
635 "events_region_cross_process_spsc_child",
636 &path,
637 "events_spsc",
638 false,
639 );
640
641 let _ = std::fs::remove_file(&path);
642 }
643
644 #[test]
645 fn events_region_cross_process_spsc_roundtrip_blocking() {
646 let path = unique_temp_path("indexbus_transport_shm_events_spsc_xproc_blocking");
647
648 let mut region =
649 EventsRegion::open_path_with(&path, OpenOptions::new().blocking(true)).unwrap();
650 let (tx, _rx) = region.split_spsc().unwrap();
651 tx.publish(b"ping").unwrap();
652
653 run_child_blocking(
654 "events_region_cross_process_spsc_child",
655 &path,
656 "events_spsc",
657 true,
658 );
659
660 let _ = std::fs::remove_file(&path);
661 }
662
663 #[test]
664 fn events_region_cross_process_spsc_child() {
665 let kind = std::env::var("INDEXBUS_SHM_CHILD_KIND").ok();
666 if kind.as_deref() != Some("events_spsc") {
667 return;
668 }
669 let path = std::env::var("INDEXBUS_SHM_CHILD_PATH").unwrap();
670 let blocking = std::env::var("INDEXBUS_SHM_CHILD_BLOCKING").ok().as_deref() == Some("1");
671
672 let mut region =
673 EventsRegion::open_path_with(&path, OpenOptions::new().blocking(blocking)).unwrap();
674 let (_tx, rx) = region.split_spsc().unwrap();
675
676 let mut wait = indexbus_core::SpinWait::default();
677 let mut out = [0u8; 32];
678 loop {
679 if let Some(n) = rx.try_recv_into(&mut out).unwrap() {
680 assert_eq!(&out[..n], b"ping");
681 break;
682 }
683 wait.wait();
684 }
685 }
686
687 #[test]
688 fn fanout_region_cross_process_broadcast_roundtrip() {
689 let path = unique_temp_path("indexbus_transport_shm_fanout_xproc");
690
691 let mut region =
692 FanoutRegion::<4>::open_path_with(&path, OpenOptions::new().blocking(false)).unwrap();
693 let (prod, _router, _c0) = region.handles(0).unwrap();
694 prod.publish(b"hi").unwrap();
695
696 run_child_blocking(
697 "fanout_region_cross_process_broadcast_child",
698 &path,
699 "fanout",
700 false,
701 );
702
703 let _ = std::fs::remove_file(&path);
704 }
705
706 #[test]
707 fn fanout_region_cross_process_broadcast_roundtrip_blocking() {
708 let path = unique_temp_path("indexbus_transport_shm_fanout_xproc_blocking");
709
710 let mut region =
711 FanoutRegion::<4>::open_path_with(&path, OpenOptions::new().blocking(true)).unwrap();
712 let (prod, _router, _c0) = region.handles(0).unwrap();
713 prod.publish(b"hi").unwrap();
714
715 run_child_blocking(
716 "fanout_region_cross_process_broadcast_child",
717 &path,
718 "fanout",
719 true,
720 );
721
722 let _ = std::fs::remove_file(&path);
723 }
724
725 #[test]
726 fn fanout_region_cross_process_broadcast_child() {
727 let kind = std::env::var("INDEXBUS_SHM_CHILD_KIND").ok();
728 if kind.as_deref() != Some("fanout") {
729 return;
730 }
731 let path = std::env::var("INDEXBUS_SHM_CHILD_PATH").unwrap();
732 let blocking = std::env::var("INDEXBUS_SHM_CHILD_BLOCKING").ok().as_deref() == Some("1");
733
734 let mut region =
735 FanoutRegion::<4>::open_path_with(&path, OpenOptions::new().blocking(blocking))
736 .unwrap();
737 let (_prod, router, cons0) = region.handles(0).unwrap();
738
739 assert!(router.route_once_with(RouterSource::Spsc, RouterMode::Broadcast));
740
741 let mut wait = indexbus_core::SpinWait::default();
742 let mut out = [0u8; 32];
743 loop {
744 if let Some(n) = cons0.try_recv_into(&mut out).unwrap() {
745 assert_eq!(&out[..n], b"hi");
746 break;
747 }
748 wait.wait();
749 }
750 }
751
752 #[test]
753 fn sequencer_region_cross_process_roundtrip() {
754 let path = unique_temp_path("indexbus_transport_shm_sequencer_xproc");
755
756 let region = SequencerRegion::<2>::open_path_with(&path, OpenOptions::new()).unwrap();
758 let mut prod = region.producer(SequencerProducerConfig::new(8)).unwrap();
759 let s1 = prod.try_claim_next().unwrap();
760 prod.publish(s1).unwrap();
761
762 run_child("sequencer_region_cross_process_child", &path, "sequencer");
764
765 let _ = std::fs::remove_file(&path);
768 }
769
770 #[test]
771 fn sequencer_region_cross_process_child() {
772 let kind = std::env::var("INDEXBUS_SHM_CHILD_KIND").ok();
773 if kind.as_deref() != Some("sequencer") {
774 return;
775 }
776 let path = std::env::var("INDEXBUS_SHM_CHILD_PATH").unwrap();
777 let blocking = std::env::var("INDEXBUS_SHM_CHILD_BLOCKING").ok().as_deref() == Some("1");
778 let region =
779 SequencerRegion::<2>::open_path_with(&path, OpenOptions::new().blocking(blocking))
780 .unwrap();
781
782 let barrier = region.barrier_for_cursor().unwrap();
783 let mut wait = indexbus_core::SpinWait::default();
784 assert!(barrier.wait_for(1, &mut wait) >= 1);
785
786 let mut c0 = region.consumer(0).unwrap();
787 c0.advance(1).unwrap();
788 }
789
790 #[test]
791 fn journal_region_cross_process_roundtrip() {
792 let path = unique_temp_path("indexbus_transport_shm_journal_xproc");
793
794 let mut region = JournalRegion4::open_path(&path).unwrap();
795 let mut pubr = region.publisher(JournalPublisherConfig::default()).unwrap();
796 pubr.try_append(b"hello").unwrap();
797
798 run_child("journal_region_cross_process_child", &path, "journal");
799
800 let _ = std::fs::remove_file(&path);
801 }
802
803 #[test]
804 fn journal_region_cross_process_child() {
805 let kind = std::env::var("INDEXBUS_SHM_CHILD_KIND").ok();
806 if kind.as_deref() != Some("journal") {
807 return;
808 }
809 let path = std::env::var("INDEXBUS_SHM_CHILD_PATH").unwrap();
810 let blocking = std::env::var("INDEXBUS_SHM_CHILD_BLOCKING").ok().as_deref() == Some("1");
811 let mut region =
812 JournalRegion4::open_path_with(&path, JournalOpenOptions::new().blocking(blocking))
813 .unwrap();
814 let mut sub = region.subscriber(0, 0).unwrap();
815
816 let mut wait = indexbus_core::SpinWait::default();
817 let mut out = [0u8; 32];
818 loop {
819 if let Some(msg) = sub.poll_next(&mut out).unwrap() {
820 assert_eq!(msg, b"hello");
821 break;
822 }
823 wait.wait();
824 }
825 }
826
827 #[test]
828 fn sequencer_region_cross_process_roundtrip_blocking() {
829 let path = unique_temp_path("indexbus_transport_shm_sequencer_xproc_blocking");
830
831 let region =
832 SequencerRegion::<2>::open_path_with(&path, OpenOptions::new().blocking(true)).unwrap();
833 let mut prod = region.producer(SequencerProducerConfig::new(8)).unwrap();
834 let s1 = prod.try_claim_next().unwrap();
835 prod.publish(s1).unwrap();
836
837 run_child_blocking(
838 "sequencer_region_cross_process_child",
839 &path,
840 "sequencer",
841 true,
842 );
843
844 let _ = std::fs::remove_file(&path);
845 }
846
847 #[test]
848 fn journal_region_cross_process_roundtrip_blocking() {
849 let path = unique_temp_path("indexbus_transport_shm_journal_xproc_blocking");
850
851 let mut region =
852 JournalRegion4::open_path_with(&path, JournalOpenOptions::new().blocking(true))
853 .unwrap();
854 let mut pubr = region.publisher(JournalPublisherConfig::default()).unwrap();
855 pubr.try_append(b"hello").unwrap();
856
857 run_child_blocking("journal_region_cross_process_child", &path, "journal", true);
858
859 let _ = std::fs::remove_file(&path);
860 }
861
862 #[test]
863 fn journal_region_restart_reattach_roundtrip() {
864 let path = unique_temp_path("indexbus_transport_shm_journal_restart_reattach");
865
866 {
868 let mut region = JournalRegion4::open_path(&path).unwrap();
869 let mut pubr = region.publisher(JournalPublisherConfig::default()).unwrap();
870 pubr.try_append(b"hello").unwrap();
871 pubr.try_append(b"world").unwrap();
872 region.flush().unwrap();
873 }
874
875 {
877 let mut region = JournalRegion4::open_path(&path).unwrap();
878 let mut sub = region.subscriber(0, 0).unwrap();
879
880 let mut wait = indexbus_core::SpinWait::default();
881 let mut out = [0u8; 32];
882
883 let mut got = Vec::new();
884 while got.len() < 2 {
885 if let Some(msg) = sub.poll_next(&mut out).unwrap() {
886 got.push(msg.to_vec());
887 } else {
888 wait.wait();
889 }
890 }
891
892 assert_eq!(got[0].as_slice(), b"hello");
893 assert_eq!(got[1].as_slice(), b"world");
894 }
895
896 let _ = std::fs::remove_file(&path);
897 }
898}