indexbus_inspect/internal/
journal_dump.rs

1use core::mem::size_of;
2use std::fs::File;
3use std::path::Path;
4use std::time::Duration;
5
6use indexbus_abi::layouts::JournalLayout4;
7use indexbus_abi::LayoutHeader;
8use memmap2::Mmap;
9
10use crate::Error;
11
12const HDR_BYTES: usize = 8;
13const FLAG_COMMITTED: u32 = 1;
14const FLAG_PAD: u32 = 2;
15
16#[inline]
17fn align8(n: u32) -> u32 {
18    (n + 7) & !7
19}
20
21#[inline]
22fn record_bytes(payload_len_bytes: u32) -> u32 {
23    align8((HDR_BYTES as u32) + payload_len_bytes)
24}
25
26#[inline]
27fn read_u32(p: *const u8) -> u32 {
28    unsafe { core::ptr::read_unaligned(p as *const u32) }
29}
30
31fn read_header(base: *const u8, offset: usize) -> (u32, u32) {
32    let payload_len_bytes = read_u32(unsafe { base.add(offset) });
33    let flags = read_u32(unsafe { base.add(offset + 4) });
34    (payload_len_bytes, flags)
35}
36
37fn read_layout_header(m: &Mmap) -> Result<LayoutHeader, Error> {
38    if m.len() < size_of::<LayoutHeader>() {
39        return Err(Error::TooSmall {
40            needed: size_of::<LayoutHeader>(),
41            got: m.len(),
42        });
43    }
44    let ptr = m.as_ptr() as *const LayoutHeader;
45    let header = unsafe { core::ptr::read_unaligned(ptr) };
46    if !header.is_compatible_v1() {
47        return Err(Error::IncompatibleLayout);
48    }
49    Ok(header)
50}
51
52pub(crate) fn dump_journal_path(
53    path: &Path,
54    start_pos: u64,
55    max_records: usize,
56) -> Result<(), Error> {
57    let file = File::open(path)?;
58    let mapped = unsafe { Mmap::map(&file)? };
59
60    let header = read_layout_header(&mapped)?;
61    if (header.capabilities & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_JOURNAL) == 0 {
62        return Err(Error::UnsupportedLayout {
63            capabilities: header.capabilities,
64            layout_bytes: header.layout_bytes,
65        });
66    }
67
68    if mapped.len() < size_of::<JournalLayout4>() {
69        return Err(Error::TooSmall {
70            needed: size_of::<JournalLayout4>(),
71            got: mapped.len(),
72        });
73    }
74
75    let layout = mapped.as_ptr() as *const JournalLayout4;
76    let segment_bytes = unsafe { (*layout).segment_bufs[0].len() as u32 };
77
78    println!("journal dump: start_pos={start_pos} max_records={max_records}");
79    println!(
80        "segment_bytes={segment_bytes} segments={} subs={}",
81        indexbus_abi::INDEXBUS_JOURNAL_SEGMENTS,
82        indexbus_abi::INDEXBUS_JOURNAL_SUBSCRIBERS_DEFAULT
83    );
84
85    let mut pos = start_pos;
86    for n in 0..max_records {
87        let off = (pos as u32) % segment_bytes;
88        let segment_idx =
89            ((pos / segment_bytes as u64) as usize) % indexbus_abi::INDEXBUS_JOURNAL_SEGMENTS;
90
91        let segment_base = unsafe { (*layout).segment_bufs[segment_idx].as_ptr() };
92        let (payload_len, flags) = read_header(segment_base, off as usize);
93
94        if payload_len == 0 {
95            println!("{n}: pos={pos} segment={segment_idx} off={off} <empty>");
96            break;
97        }
98
99        let rbytes = record_bytes(payload_len);
100        let committed = (flags & FLAG_COMMITTED) != 0;
101        let pad = (flags & FLAG_PAD) != 0;
102
103        println!(
104            "{n}: pos={pos} segment={segment_idx} off={off} payload_len={payload_len} record_bytes={rbytes} flags=0x{flags:08x} committed={committed} pad={pad}"
105        );
106
107        if !committed {
108            break;
109        }
110
111        pos += rbytes as u64;
112    }
113
114    Ok(())
115}
116
117pub(crate) fn watch_journal_path(path: &Path, interval: Duration) -> Result<(), Error> {
118    let file = File::open(path)?;
119    let mapped = unsafe { Mmap::map(&file)? };
120
121    let header = read_layout_header(&mapped)?;
122    if (header.capabilities & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_JOURNAL) == 0 {
123        return Err(Error::UnsupportedLayout {
124            capabilities: header.capabilities,
125            layout_bytes: header.layout_bytes,
126        });
127    }
128
129    if mapped.len() < size_of::<JournalLayout4>() {
130        return Err(Error::TooSmall {
131            needed: size_of::<JournalLayout4>(),
132            got: mapped.len(),
133        });
134    }
135
136    let layout = mapped.as_ptr() as *const JournalLayout4;
137    let base = mapped.as_ptr();
138
139    let stats_present =
140        (header.capabilities & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_JOURNAL_STATS) != 0;
141
142    // Compute stats offset using the same ordering rule as validation: base -> optional wake -> stats.
143    let mut stats_offset = size_of::<JournalLayout4>().div_ceil(64) * 64;
144    if (header.capabilities & indexbus_abi::caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) != 0 {
145        stats_offset = (stats_offset + size_of::<indexbus_abi::layouts::JournalWakeSection4>())
146            .div_ceil(64)
147            * 64;
148    }
149
150    let mut last_pub_pos: u64 =
151        unsafe { core::ptr::read_unaligned(core::ptr::addr_of!((*layout).pub_pos) as *const u64) };
152    let mut last_records: u64 = 0;
153
154    println!("watching journal (interval={:?})", interval);
155    println!(
156        "caps=0x{:08x} stats_present={stats_present}",
157        header.capabilities
158    );
159
160    loop {
161        std::thread::sleep(interval);
162
163        let pub_pos: u64 = unsafe {
164            core::ptr::read_unaligned(core::ptr::addr_of!((*layout).pub_pos) as *const u64)
165        };
166        let delta_bytes = pub_pos.saturating_sub(last_pub_pos);
167        last_pub_pos = pub_pos;
168
169        let bytes_per_sec = (delta_bytes as f64) / interval.as_secs_f64();
170
171        if stats_present {
172            // Best-effort: only if the mapping is big enough.
173            let needed = stats_offset + size_of::<indexbus_abi::layouts::JournalStatsSection4>();
174            if mapped.len() >= needed {
175                let off = stats_offset;
176                let payload_bytes = unsafe {
177                    core::ptr::read_unaligned(base.add(
178                        off + core::mem::offset_of!(
179                            indexbus_abi::layouts::JournalStatsSection4,
180                            pub_payload_bytes
181                        ),
182                    ) as *const u64)
183                };
184                let records = unsafe {
185                    core::ptr::read_unaligned(base.add(
186                        off + core::mem::offset_of!(
187                            indexbus_abi::layouts::JournalStatsSection4,
188                            pub_records
189                        ),
190                    ) as *const u64)
191                };
192
193                let d_records = records.saturating_sub(last_records);
194                last_records = records;
195
196                let recs_per_sec = (d_records as f64) / interval.as_secs_f64();
197
198                println!(
199                    "pub_pos={pub_pos} bytes/sec={bytes_per_sec:.0} records/sec={recs_per_sec:.0} total_payload_bytes={payload_bytes} total_records={records}",
200                );
201                continue;
202            }
203        }
204
205        println!("pub_pos={pub_pos} bytes/sec={:.0}", bytes_per_sec);
206    }
207}