indexbus_inspect/internal/
inspect.rs

1use core::mem::size_of;
2use std::fs::File;
3use std::path::Path;
4
5use indexbus_abi::layouts::{
6    FanoutWakeSection4, JournalLayout4, JournalStatsSection4, JournalWakeSection4, SegmentMeta,
7    SequencerLayout4, SequencerWakeSection4, SharedFanoutLayout4, SharedLayout, SharedWakeSection,
8    StateLayout256, StateWakeSection,
9};
10use indexbus_abi::LayoutHeader;
11use indexbus_abi::{caps, flags};
12use memmap2::Mmap;
13
14use crate::Error;
15
16#[derive(Debug, Clone)]
17pub(crate) struct WakeInfo {
18    pub offset: usize,
19    pub size: usize,
20    pub present: bool,
21}
22
23#[derive(Debug, Clone)]
24pub(crate) struct Report {
25    pub path: String,
26    pub kind: &'static str,
27
28    pub header_magic: u32,
29    pub header_version: u16,
30    pub header_flags: u16,
31
32    pub capabilities: u32,
33    pub cap_names: Vec<&'static str>,
34
35    pub layout_bytes: u32,
36    pub mapped_bytes: usize,
37
38    pub base_size: usize,
39    pub initialized: Option<u32>,
40    pub wake: Option<WakeInfo>,
41
42    pub offsets: Vec<(&'static str, usize)>,
43
44    /// Kind-specific sampled values (best-effort).
45    pub values: Vec<(&'static str, String)>,
46}
47
48#[inline]
49const fn align_up(value: usize, align: usize) -> usize {
50    debug_assert!(align.is_power_of_two());
51    (value + (align - 1)) & !(align - 1)
52}
53
54fn cap_names(c: u32) -> Vec<&'static str> {
55    let mut out = Vec::new();
56    if (c & caps::INDEXBUS_CAP_SUPPORTS_EVENTS) != 0 {
57        out.push("EVENTS");
58    }
59    if (c & caps::INDEXBUS_CAP_SUPPORTS_STATE) != 0 {
60        out.push("STATE");
61    }
62    if (c & caps::INDEXBUS_CAP_SUPPORTS_FANOUT) != 0 {
63        out.push("FANOUT");
64    }
65    if (c & caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) != 0 {
66        out.push("BLOCKING");
67    }
68    if (c & caps::INDEXBUS_CAP_SUPPORTS_SEQUENCER) != 0 {
69        out.push("SEQUENCER");
70    }
71    if (c & caps::INDEXBUS_CAP_SUPPORTS_JOURNAL) != 0 {
72        out.push("JOURNAL");
73    }
74    if (c & caps::INDEXBUS_CAP_SUPPORTS_JOURNAL_STATS) != 0 {
75        out.push("JOURNAL_STATS");
76    }
77    out
78}
79
80fn read_header(m: &Mmap) -> Result<LayoutHeader, Error> {
81    if m.len() < size_of::<LayoutHeader>() {
82        return Err(Error::TooSmall {
83            needed: size_of::<LayoutHeader>(),
84            got: m.len(),
85        });
86    }
87
88    let ptr = m.as_ptr() as *const LayoutHeader;
89    let header = unsafe { core::ptr::read_unaligned(ptr) };
90    if !header.is_compatible_v1() {
91        return Err(Error::IncompatibleLayout);
92    }
93
94    Ok(header)
95}
96
97fn classify_kind(header: &LayoutHeader, mapped_bytes: usize) -> Option<(&'static str, usize)> {
98    let caps_bits = header.capabilities;
99    let lb = header.layout_bytes as usize;
100    let effective = mapped_bytes.min(lb.max(size_of::<LayoutHeader>()));
101
102    let kind = flags::region_kind(header.flags);
103    match kind {
104        flags::INDEXBUS_REGION_KIND_JOURNAL => {
105            let base = size_of::<JournalLayout4>();
106            (effective >= base).then_some(("journal4", base))
107        }
108        flags::INDEXBUS_REGION_KIND_SEQUENCER => {
109            let base = size_of::<SequencerLayout4>();
110            (effective >= base).then_some(("sequencer4", base))
111        }
112        flags::INDEXBUS_REGION_KIND_FANOUT => {
113            let base = size_of::<SharedFanoutLayout4>();
114            (effective >= base).then_some(("fanout4", base))
115        }
116        flags::INDEXBUS_REGION_KIND_EVENTS => {
117            let base = size_of::<SharedLayout>();
118            (effective >= base).then_some(("events", base))
119        }
120        flags::INDEXBUS_REGION_KIND_STATE => {
121            let base = size_of::<StateLayout256>();
122            if effective >= base {
123                Some(("state256", base))
124            } else {
125                // State layouts are const-parameterized in Rust; we only recognize the concrete 256B one.
126                Some(("state(unknown)", base))
127            }
128        }
129        _ => {
130            // Backward-compat fallback: older mappings may not set the region-kind discriminator
131            // in `flags` yet. Preserve the previous heuristic classification so golden fixtures
132            // and legacy files remain inspectable.
133
134            if (caps_bits & caps::INDEXBUS_CAP_SUPPORTS_JOURNAL) != 0 {
135                let base = size_of::<JournalLayout4>();
136                if effective >= base {
137                    return Some(("journal4", base));
138                }
139            }
140
141            if (caps_bits & caps::INDEXBUS_CAP_SUPPORTS_SEQUENCER) != 0 {
142                let base = size_of::<SequencerLayout4>();
143                if effective >= base {
144                    return Some(("sequencer4", base));
145                }
146            }
147
148            if (caps_bits & caps::INDEXBUS_CAP_SUPPORTS_FANOUT) != 0 {
149                let base = size_of::<SharedFanoutLayout4>();
150                if (caps_bits & caps::INDEXBUS_CAP_SUPPORTS_EVENTS) != 0 && effective >= base {
151                    return Some(("fanout4", base));
152                }
153            }
154
155            if (caps_bits & caps::INDEXBUS_CAP_SUPPORTS_EVENTS) != 0 {
156                let base = size_of::<SharedLayout>();
157                if effective >= base {
158                    return Some(("events", base));
159                }
160            }
161
162            if (caps_bits & caps::INDEXBUS_CAP_SUPPORTS_STATE) != 0 {
163                let base = size_of::<StateLayout256>();
164                if effective >= base {
165                    return Some(("state256", base));
166                }
167                return Some(("state(unknown)", base));
168            }
169
170            None
171        }
172    }
173}
174
175fn read_u32_at(base: *const u8, offset: usize) -> u32 {
176    // Safe as a raw read: used for inspection only.
177    unsafe { core::ptr::read_unaligned(base.add(offset) as *const u32) }
178}
179
180fn read_u64_at(base: *const u8, offset: usize) -> u64 {
181    unsafe { core::ptr::read_unaligned(base.add(offset) as *const u64) }
182}
183
184fn values_for_kind(kind: &str, base: *const u8) -> Vec<(&'static str, String)> {
185    let mut out = Vec::new();
186
187    match kind {
188        "journal4" => {
189            let pub_pos = read_u64_at(base, core::mem::offset_of!(JournalLayout4, pub_pos));
190            out.push(("pub_pos", pub_pos.to_string()));
191
192            let active_segment =
193                read_u32_at(base, core::mem::offset_of!(JournalLayout4, active_segment));
194            out.push(("active_segment", active_segment.to_string()));
195
196            // Sample segment metadata (best-effort).
197            let mut segment_ids = Vec::new();
198            let mut segment_tails = Vec::new();
199            for i in 0..3 {
200                let seg_base = core::mem::offset_of!(JournalLayout4, segments)
201                    + i * core::mem::size_of::<SegmentMeta>();
202                let id = read_u32_at(
203                    base,
204                    seg_base + core::mem::offset_of!(SegmentMeta, segment_id),
205                );
206                let tail = read_u64_at(base, seg_base + core::mem::offset_of!(SegmentMeta, tail));
207                segment_ids.push(id);
208                segment_tails.push(tail);
209            }
210            out.push(("segment_id", format!("{:?}", segment_ids)));
211            out.push(("segment_tail", format!("{:?}", segment_tails)));
212
213            // Sample subscriber positions (best-effort).
214            let mut sub_pos = Vec::new();
215            for i in 0..4 {
216                let off = core::mem::offset_of!(JournalLayout4, sub_pos)
217                    + i * core::mem::size_of::<u64>();
218                sub_pos.push(read_u64_at(base, off));
219            }
220            out.push(("sub_pos", format!("{:?}", sub_pos)));
221
222            if let Some(min) = sub_pos.iter().copied().min() {
223                out.push(("sub_pos_min", min.to_string()));
224            }
225            if let Some(max) = sub_pos.iter().copied().max() {
226                out.push(("sub_pos_max", max.to_string()));
227            }
228        }
229        "sequencer4" => {
230            let cursor = read_u64_at(base, core::mem::offset_of!(SequencerLayout4, cursor));
231            out.push(("cursor", cursor.to_string()));
232
233            // Best-effort: sample all gating sequences.
234            let mut gating_vals = Vec::new();
235            for i in 0..4 {
236                let off = core::mem::offset_of!(SequencerLayout4, gating)
237                    + i * core::mem::size_of::<indexbus_abi::layouts::SequencerGatingCell>();
238                let v = read_u64_at(base, off);
239                gating_vals.push(v);
240            }
241            out.push(("gating", format!("{:?}", gating_vals)));
242
243            if let Some(min) = gating_vals.iter().copied().min() {
244                out.push(("gating_min", min.to_string()));
245            }
246        }
247        _ => {}
248    }
249
250    out
251}
252
253fn read_initialized(kind: &str, base: *const u8) -> Option<u32> {
254    match kind {
255        "events" => Some(read_u32_at(
256            base,
257            core::mem::offset_of!(SharedLayout, initialized),
258        )),
259        "fanout4" => Some(read_u32_at(
260            base,
261            core::mem::offset_of!(SharedFanoutLayout4, initialized),
262        )),
263        "sequencer4" => Some(read_u32_at(
264            base,
265            core::mem::offset_of!(SequencerLayout4, initialized),
266        )),
267        "journal4" => Some(read_u32_at(
268            base,
269            core::mem::offset_of!(JournalLayout4, initialized),
270        )),
271        _ => None,
272    }
273}
274
275fn wake_info(
276    kind: &str,
277    header: &LayoutHeader,
278    mapped_bytes: usize,
279    base_size: usize,
280) -> Option<WakeInfo> {
281    if (header.capabilities & caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) == 0 {
282        return None;
283    }
284
285    let wake_size = match kind {
286        "events" => size_of::<SharedWakeSection>(),
287        "fanout4" => size_of::<FanoutWakeSection4>(),
288        "state256" | "state(unknown)" => size_of::<StateWakeSection>(),
289        "sequencer4" => size_of::<SequencerWakeSection4>(),
290        "journal4" => size_of::<JournalWakeSection4>(),
291        _ => return None,
292    };
293
294    let wake_offset = align_up(base_size, 64);
295
296    let claimed = header.layout_bytes as usize;
297    let effective = mapped_bytes.min(claimed.max(size_of::<LayoutHeader>()));
298
299    let present = effective >= wake_offset.saturating_add(wake_size);
300
301    Some(WakeInfo {
302        offset: wake_offset,
303        size: wake_size,
304        present,
305    })
306}
307
308fn offsets_for_kind(kind: &str) -> Vec<(&'static str, usize)> {
309    let mut out = Vec::new();
310
311    match kind {
312        "events" => {
313            out.push((
314                "SharedLayout.slot_pool",
315                core::mem::offset_of!(SharedLayout, slot_pool),
316            ));
317            out.push((
318                "SharedLayout.queue",
319                core::mem::offset_of!(SharedLayout, queue),
320            ));
321            out.push((
322                "SharedLayout.mpsc_queue",
323                core::mem::offset_of!(SharedLayout, mpsc_queue),
324            ));
325        }
326        "fanout4" => {
327            out.push((
328                "SharedFanoutLayout4.slot_pool",
329                core::mem::offset_of!(SharedFanoutLayout4, slot_pool),
330            ));
331            out.push((
332                "SharedFanoutLayout4.producer_queue",
333                core::mem::offset_of!(SharedFanoutLayout4, producer_queue),
334            ));
335            out.push((
336                "SharedFanoutLayout4.consumer_queues",
337                core::mem::offset_of!(SharedFanoutLayout4, consumer_queues),
338            ));
339        }
340        "sequencer4" => {
341            out.push((
342                "SequencerLayout4.cursor",
343                core::mem::offset_of!(SequencerLayout4, cursor),
344            ));
345            out.push((
346                "SequencerLayout4.gating",
347                core::mem::offset_of!(SequencerLayout4, gating),
348            ));
349        }
350        "journal4" => {
351            out.push((
352                "JournalLayout4.pub_pos",
353                core::mem::offset_of!(JournalLayout4, pub_pos),
354            ));
355            out.push((
356                "JournalLayout4.active_segment",
357                core::mem::offset_of!(JournalLayout4, active_segment),
358            ));
359            out.push((
360                "JournalLayout4.segments",
361                core::mem::offset_of!(JournalLayout4, segments),
362            ));
363            out.push((
364                "JournalLayout4.sub_pos",
365                core::mem::offset_of!(JournalLayout4, sub_pos),
366            ));
367            out.push((
368                "JournalLayout4.segment_bufs",
369                core::mem::offset_of!(JournalLayout4, segment_bufs),
370            ));
371            out.push(("SegmentMeta.tail", core::mem::offset_of!(SegmentMeta, tail)));
372        }
373        "state256" => {
374            out.push((
375                "StateLayout256.seq",
376                core::mem::offset_of!(StateLayout256, seq),
377            ));
378            out.push((
379                "StateLayout256.len",
380                core::mem::offset_of!(StateLayout256, len),
381            ));
382            out.push((
383                "StateLayout256.data",
384                core::mem::offset_of!(StateLayout256, data),
385            ));
386        }
387        _ => {}
388    }
389
390    out
391}
392
393pub(crate) fn inspect_path(path: &Path) -> Result<Report, Error> {
394    let file = File::open(path)?;
395    let mapped = unsafe { Mmap::map(&file)? };
396
397    let header = read_header(&mapped)?;
398
399    let (kind, base_size) =
400        classify_kind(&header, mapped.len()).ok_or(Error::UnsupportedLayout {
401            capabilities: header.capabilities,
402            layout_bytes: header.layout_bytes,
403        })?;
404
405    let base = mapped.as_ptr();
406    let init = read_initialized(kind, base);
407
408    let wake = wake_info(kind, &header, mapped.len(), base_size);
409
410    let offsets = offsets_for_kind(kind);
411    let mut values = values_for_kind(kind, base);
412
413    // Best-effort: if a journal stats section is appended, sample its counters.
414    if kind == "journal4" && (header.capabilities & caps::INDEXBUS_CAP_SUPPORTS_JOURNAL_STATS) != 0
415    {
416        let stats_size = size_of::<JournalStatsSection4>();
417        let mut stats_offset = align_up(base_size, 64);
418        if (header.capabilities & caps::INDEXBUS_CAP_SUPPORTS_BLOCKING) != 0 {
419            stats_offset = align_up(stats_offset + size_of::<JournalWakeSection4>(), 64);
420        }
421
422        let claimed = header.layout_bytes as usize;
423        let effective = mapped.len().min(claimed.max(size_of::<LayoutHeader>()));
424        let present = effective >= stats_offset.saturating_add(stats_size);
425
426        if present {
427            let b = mapped.as_ptr();
428            let off = stats_offset;
429
430            let pub_payload_bytes = read_u64_at(
431                b,
432                off + core::mem::offset_of!(JournalStatsSection4, pub_payload_bytes),
433            );
434            let pub_records = read_u64_at(
435                b,
436                off + core::mem::offset_of!(JournalStatsSection4, pub_records),
437            );
438            let pad_records = read_u64_at(
439                b,
440                off + core::mem::offset_of!(JournalStatsSection4, pad_records),
441            );
442            let rotations = read_u64_at(
443                b,
444                off + core::mem::offset_of!(JournalStatsSection4, rotations),
445            );
446            let pub_would_overrun = read_u64_at(
447                b,
448                off + core::mem::offset_of!(JournalStatsSection4, pub_would_overrun),
449            );
450            let pub_forced_catchup = read_u64_at(
451                b,
452                off + core::mem::offset_of!(JournalStatsSection4, pub_forced_catchup),
453            );
454
455            values.push(("stats.offset", stats_offset.to_string()));
456            values.push(("stats.pub_payload_bytes", pub_payload_bytes.to_string()));
457            values.push(("stats.pub_records", pub_records.to_string()));
458            values.push(("stats.pad_records", pad_records.to_string()));
459            values.push(("stats.rotations", rotations.to_string()));
460            values.push(("stats.pub_would_overrun", pub_would_overrun.to_string()));
461            values.push(("stats.pub_forced_catchup", pub_forced_catchup.to_string()));
462
463            let mut sub_overruns = Vec::new();
464            for i in 0..4 {
465                let o = core::mem::offset_of!(JournalStatsSection4, sub_overruns)
466                    + i * core::mem::size_of::<u64>();
467                sub_overruns.push(read_u64_at(b, off + o));
468            }
469            values.push(("stats.sub_overruns", format!("{:?}", sub_overruns)));
470        }
471    }
472
473    Ok(Report {
474        path: path.to_string_lossy().into_owned(),
475        kind,
476
477        header_magic: header.magic,
478        header_version: header.version,
479        header_flags: header.flags,
480
481        capabilities: header.capabilities,
482        cap_names: cap_names(header.capabilities),
483
484        layout_bytes: header.layout_bytes,
485        mapped_bytes: mapped.len(),
486
487        base_size,
488        initialized: init,
489        wake,
490
491        offsets,
492        values,
493    })
494}