indexbus_inspect/internal/
journal_dump.rs1use core::mem::size_of;
2use std::fs::File;
3use std::path::Path;
4use std::time::Duration;
5
6use indexbus_abi::layouts::JournalLayout4;
7use indexbus_abi::LayoutHeader;
8use memmap2::Mmap;
9
10use crate::Error;
11
12const HDR_BYTES: usize = 8;
13const FLAG_COMMITTED: u32 = 1;
14const FLAG_PAD: u32 = 2;
15
16#[inline]
17fn align8(n: u32) -> u32 {
18 (n + 7) & !7
19}
20
21#[inline]
22fn record_bytes(payload_len_bytes: u32) -> u32 {
23 align8((HDR_BYTES as u32) + payload_len_bytes)
24}
25
26#[inline]
27fn read_u32(p: *const u8) -> u32 {
28 unsafe { core::ptr::read_unaligned(p as *const u32) }
29}
30
31fn read_header(base: *const u8, offset: usize) -> (u32, u32) {
32 let payload_len_bytes = read_u32(unsafe { base.add(offset) });
33 let flags = read_u32(unsafe { base.add(offset + 4) });
34 (payload_len_bytes, flags)
35}
36
37fn read_layout_header(m: &Mmap) -> Result<LayoutHeader, Error> {
38 if m.len() < size_of::<LayoutHeader>() {
39 return Err(Error::TooSmall {
40 needed: size_of::<LayoutHeader>(),
41 got: m.len(),
42 });
43 }
44 let ptr = m.as_ptr() as *const LayoutHeader;
45 let header = unsafe { core::ptr::read_unaligned(ptr) };
46 if !header.is_compatible_v1() {
47 return Err(Error::IncompatibleLayout);
48 }
49 Ok(header)
50}
51
52pub(crate) fn dump_journal_path(
53 path: &Path,
54 start_pos: u64,
55 max_records: usize,
56) -> Result<(), Error> {
57 let file = File::open(path)?;
58 let mapped = unsafe { Mmap::map(&file)? };
59
60 let header = read_layout_header(&mapped)?;
61 if (header.capabilities & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_JOURNAL) == 0 {
62 return Err(Error::UnsupportedLayout {
63 capabilities: header.capabilities,
64 layout_bytes: header.layout_bytes,
65 });
66 }
67
68 if mapped.len() < size_of::<JournalLayout4>() {
69 return Err(Error::TooSmall {
70 needed: size_of::<JournalLayout4>(),
71 got: mapped.len(),
72 });
73 }
74
75 let layout = mapped.as_ptr() as *const JournalLayout4;
76 let segment_bytes = unsafe { (*layout).segment_bufs[0].len() as u32 };
77
78 println!("journal dump: start_pos={start_pos} max_records={max_records}");
79 println!(
80 "segment_bytes={segment_bytes} segments={} subs={}",
81 indexbus_abi::INDEXBUS_JOURNAL_SEGMENTS,
82 indexbus_abi::INDEXBUS_JOURNAL_SUBSCRIBERS_DEFAULT
83 );
84
85 let mut pos = start_pos;
86 for n in 0..max_records {
87 let off = (pos as u32) % segment_bytes;
88 let segment_idx =
89 ((pos / segment_bytes as u64) as usize) % indexbus_abi::INDEXBUS_JOURNAL_SEGMENTS;
90
91 let segment_base = unsafe { (*layout).segment_bufs[segment_idx].as_ptr() };
92 let (payload_len, flags) = read_header(segment_base, off as usize);
93
94 if payload_len == 0 {
95 println!("{n}: pos={pos} segment={segment_idx} off={off} <empty>");
96 break;
97 }
98
99 let rbytes = record_bytes(payload_len);
100 let committed = (flags & FLAG_COMMITTED) != 0;
101 let pad = (flags & FLAG_PAD) != 0;
102
103 println!(
104 "{n}: pos={pos} segment={segment_idx} off={off} payload_len={payload_len} record_bytes={rbytes} flags=0x{flags:08x} committed={committed} pad={pad}"
105 );
106
107 if !committed {
108 break;
109 }
110
111 pos += rbytes as u64;
112 }
113
114 Ok(())
115}
116
117pub(crate) fn watch_journal_path(path: &Path, interval: Duration) -> Result<(), Error> {
118 let file = File::open(path)?;
119 let mapped = unsafe { Mmap::map(&file)? };
120
121 let header = read_layout_header(&mapped)?;
122 if (header.capabilities & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_JOURNAL) == 0 {
123 return Err(Error::UnsupportedLayout {
124 capabilities: header.capabilities,
125 layout_bytes: header.layout_bytes,
126 });
127 }
128
129 if mapped.len() < size_of::<JournalLayout4>() {
130 return Err(Error::TooSmall {
131 needed: size_of::<JournalLayout4>(),
132 got: mapped.len(),
133 });
134 }
135
136 let layout = mapped.as_ptr() as *const JournalLayout4;
137 let base = mapped.as_ptr();
138
139 let stats_present =
140 (header.capabilities & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_JOURNAL_STATS) != 0;
141
142 let mut stats_offset = size_of::<JournalLayout4>().div_ceil(64) * 64;
144 if (header.capabilities & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) != 0 {
145 stats_offset = (stats_offset + size_of::<indexbus_abi::layouts::JournalWakeSection4>())
146 .div_ceil(64)
147 * 64;
148 }
149
150 let mut last_pub_pos: u64 =
151 unsafe { core::ptr::read_unaligned(core::ptr::addr_of!((*layout).pub_pos) as *const u64) };
152 let mut last_records: u64 = 0;
153
154 println!("watching journal (interval={:?})", interval);
155 println!(
156 "caps=0x{:08x} stats_present={stats_present}",
157 header.capabilities
158 );
159
160 loop {
161 std::thread::sleep(interval);
162
163 let pub_pos: u64 = unsafe {
164 core::ptr::read_unaligned(core::ptr::addr_of!((*layout).pub_pos) as *const u64)
165 };
166 let delta_bytes = pub_pos.saturating_sub(last_pub_pos);
167 last_pub_pos = pub_pos;
168
169 let bytes_per_sec = (delta_bytes as f64) / interval.as_secs_f64();
170
171 if stats_present {
172 let needed = stats_offset + size_of::<indexbus_abi::layouts::JournalStatsSection4>();
174 if mapped.len() >= needed {
175 let off = stats_offset;
176 let payload_bytes = unsafe {
177 core::ptr::read_unaligned(base.add(
178 off + core::mem::offset_of!(
179 indexbus_abi::layouts::JournalStatsSection4,
180 pub_payload_bytes
181 ),
182 ) as *const u64)
183 };
184 let records = unsafe {
185 core::ptr::read_unaligned(base.add(
186 off + core::mem::offset_of!(
187 indexbus_abi::layouts::JournalStatsSection4,
188 pub_records
189 ),
190 ) as *const u64)
191 };
192
193 let d_records = records.saturating_sub(last_records);
194 last_records = records;
195
196 let recs_per_sec = (d_records as f64) / interval.as_secs_f64();
197
198 println!(
199 "pub_pos={pub_pos} bytes/sec={bytes_per_sec:.0} records/sec={recs_per_sec:.0} total_payload_bytes={payload_bytes} total_records={records}",
200 );
201 continue;
202 }
203 }
204
205 println!("pub_pos={pub_pos} bytes/sec={:.0}", bytes_per_sec);
206 }
207}