byteor_runtime/
snapshot.rs

1use std::path::{Path, PathBuf};
2use std::sync::atomic::Ordering;
3use std::sync::atomic::{AtomicU32, AtomicU64};
4
5use byteor_pipeline_exec::LaneGraphRoleActivitySnapshot;
6
7#[derive(Clone, Copy, Default)]
8struct LaneUsage {
9    producers: usize,
10    consumers: usize,
11}
12
13fn lane_graph_usage(
14    lg: &byteor_pipeline_spec::LaneGraphV1,
15) -> std::collections::BTreeMap<String, LaneUsage> {
16    use byteor_pipeline_spec::EndpointKindV1;
17
18    let mut stats = std::collections::BTreeMap::<String, LaneUsage>::new();
19    for lane in &lg.lanes {
20        stats.insert(lane.name.clone(), LaneUsage::default());
21    }
22
23    for ep in &lg.endpoints {
24        if let Some(entry) = stats.get_mut(ep.lane.as_str()) {
25            match ep.kind {
26                EndpointKindV1::Ingress => entry.producers = entry.producers.saturating_add(1),
27                EndpointKindV1::Egress => entry.consumers = entry.consumers.saturating_add(1),
28            }
29        }
30    }
31
32    for role in &lg.roles {
33        match role {
34            byteor_pipeline_spec::RoleCfgV1::Stage(cfg) => {
35                if let Some(entry) = stats.get_mut(cfg.rx.as_str()) {
36                    entry.consumers = entry.consumers.saturating_add(1);
37                }
38                if let Some(entry) = stats.get_mut(cfg.tx.as_str()) {
39                    entry.producers = entry.producers.saturating_add(1);
40                }
41            }
42            byteor_pipeline_spec::RoleCfgV1::Bridge(cfg) => {
43                if let Some(entry) = stats.get_mut(cfg.rx.as_str()) {
44                    entry.consumers = entry.consumers.saturating_add(1);
45                }
46                if let Some(entry) = stats.get_mut(cfg.tx.as_str()) {
47                    entry.producers = entry.producers.saturating_add(1);
48                }
49            }
50            byteor_pipeline_spec::RoleCfgV1::Router(cfg) => {
51                if let Some(entry) = stats.get_mut(cfg.rx.as_str()) {
52                    entry.consumers = entry.consumers.saturating_add(1);
53                }
54                for tx in &cfg.tx {
55                    if let Some(entry) = stats.get_mut(tx.as_str()) {
56                        entry.producers = entry.producers.saturating_add(1);
57                    }
58                }
59            }
60            byteor_pipeline_spec::RoleCfgV1::Merge(cfg) => {
61                for rx in &cfg.rx {
62                    if let Some(entry) = stats.get_mut(rx.as_str()) {
63                        entry.consumers = entry.consumers.saturating_add(1);
64                    }
65                }
66                if let Some(entry) = stats.get_mut(cfg.tx.as_str()) {
67                    entry.producers = entry.producers.saturating_add(1);
68                }
69            }
70        }
71    }
72
73    stats
74}
75
76fn sanitize_lane_name(lane: &str) -> String {
77    lane.chars()
78        .map(|c| {
79            if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
80                c
81            } else {
82                '_'
83            }
84        })
85        .collect()
86}
87
88fn default_lane_graph_shm_dir_for_spec(spec_path: &Path) -> PathBuf {
89    let parent = spec_path.parent().unwrap_or_else(|| Path::new("."));
90    let stem = spec_path
91        .file_stem()
92        .and_then(|value| value.to_str())
93        .unwrap_or("lane_graph");
94    parent.join(format!("{stem}.lanes"))
95}
96
97fn role_kind_label(role: &byteor_pipeline_spec::RoleCfgV1) -> &'static str {
98    match role {
99        byteor_pipeline_spec::RoleCfgV1::Stage(_) => "stage",
100        byteor_pipeline_spec::RoleCfgV1::Bridge(_) => "bridge",
101        byteor_pipeline_spec::RoleCfgV1::Router(_) => "router",
102        byteor_pipeline_spec::RoleCfgV1::Merge(_) => "merge",
103    }
104}
105
106fn role_lane_refs(role: &byteor_pipeline_spec::RoleCfgV1) -> (Vec<String>, Vec<String>) {
107    match role {
108        byteor_pipeline_spec::RoleCfgV1::Stage(cfg) => (vec![cfg.rx.clone()], vec![cfg.tx.clone()]),
109        byteor_pipeline_spec::RoleCfgV1::Bridge(cfg) => {
110            (vec![cfg.rx.clone()], vec![cfg.tx.clone()])
111        }
112        byteor_pipeline_spec::RoleCfgV1::Router(cfg) => (vec![cfg.rx.clone()], cfg.tx.clone()),
113        byteor_pipeline_spec::RoleCfgV1::Merge(cfg) => (cfg.rx.clone(), vec![cfg.tx.clone()]),
114    }
115}
116
117fn snapshot_lane_graph_roles(
118    spec: &byteor_pipeline_spec::LaneGraphV1,
119    role_activity: &[LaneGraphRoleActivitySnapshot],
120) -> serde_json::Value {
121    let activity_by_name = role_activity
122        .iter()
123        .map(|role| (role.name.as_str(), role))
124        .collect::<std::collections::BTreeMap<_, _>>();
125
126    serde_json::json!(spec
127        .roles
128        .iter()
129        .map(|role| {
130            let activity = activity_by_name.get(role.name());
131            let (input_lanes, output_lanes) = role_lane_refs(role);
132            serde_json::json!({
133                "name": role.name(),
134                "kind": role_kind_label(role),
135                "status": activity.map(|entry| entry.status()).unwrap_or("unavailable"),
136                "active_work": activity.map(|entry| entry.active_work),
137                "processed_count": activity.map(|entry| entry.processed_count),
138                "progress_epoch": activity.map(|entry| entry.progress_epoch),
139                "input_lanes": input_lanes,
140                "output_lanes": output_lanes,
141            })
142        })
143        .collect::<Vec<_>>())
144}
145
146pub(crate) fn lane_graph_lane_path(root: &Path, lane: &str) -> PathBuf {
147    root.join(format!("{}.mmap", sanitize_lane_name(lane)))
148}
149
150fn load_abi_u32(v: &indexbus_abi::IndexbusAtomicU32) -> u32 {
151    unsafe { (&*(v as *const _ as *const AtomicU32)).load(Ordering::Acquire) }
152}
153
154fn load_abi_u64(v: &indexbus_abi::IndexbusAtomicU64) -> u64 {
155    unsafe { (&*(v as *const _ as *const AtomicU64)).load(Ordering::Acquire) }
156}
157
158fn snapshot_index_queue(q: &indexbus_abi::layouts::IndexQueue) -> serde_json::Value {
159    let head = load_abi_u64(&q.head);
160    let tail = load_abi_u64(&q.tail);
161    serde_json::json!({
162        "head": head,
163        "tail": tail,
164        "depth": head.wrapping_sub(tail),
165    })
166}
167
168fn snapshot_mpsc_queue(q: &indexbus_abi::layouts::MpscQueue) -> serde_json::Value {
169    let write = load_abi_u64(&q.write);
170    let read = load_abi_u64(&q.read);
171    serde_json::json!({
172        "write": write,
173        "read": read,
174        "depth": write.wrapping_sub(read),
175    })
176}
177
178fn snapshot_events_lane(path: &Path, producers: usize) -> Result<serde_json::Value, String> {
179    let region = byteor_transport_shm::EventsChainRegion::<4>::open_path_with(
180        path,
181        byteor_transport_shm::OpenOptions::new().blocking(false),
182    )
183    .map_err(|e| format!("attach failed: {e:?}"))?;
184
185    let layout = unsafe { &*region.as_ptr() };
186    let selected_queue_kind = if producers > 1 { "mpsc" } else { "spsc" };
187
188    Ok(serde_json::json!({
189        "selected_queue_kind": selected_queue_kind,
190        "spsc_queue": snapshot_index_queue(&layout.queues[0]),
191        "mpsc_queue": snapshot_mpsc_queue(&layout.mpsc_queues[0]),
192    }))
193}
194
195fn snapshot_fanout_lane(
196    path: &Path,
197    configured_consumers: usize,
198) -> Result<serde_json::Value, String> {
199    let region = indexbus_transport_shm::FanoutRegion::<4>::open_path_with(
200        path,
201        indexbus_transport_shm::OpenOptions::new().blocking(false),
202    )
203    .map_err(|e| format!("attach failed: {e:?}"))?;
204
205    let layout = unsafe { &*region.as_ptr() };
206    let consumers = configured_consumers.min(layout.consumer_queues.len());
207    let consumer_queues = layout.consumer_queues[..consumers]
208        .iter()
209        .enumerate()
210        .map(|(idx, q)| {
211            serde_json::json!({
212                "consumer": idx,
213                "queue": snapshot_index_queue(q),
214            })
215        })
216        .collect::<Vec<_>>();
217
218    Ok(serde_json::json!({
219        "router_rr": load_abi_u32(&layout.router_rr),
220        "producer_spsc": snapshot_index_queue(&layout.producer_queue),
221        "producer_mpsc": snapshot_mpsc_queue(&layout.producer_queue_mpsc),
222        "consumer_queues": consumer_queues,
223    }))
224}
225
226fn snapshot_journal_lane(
227    path: &Path,
228    active_subscribers: usize,
229) -> Result<serde_json::Value, String> {
230    let region = indexbus_transport_shm::JournalRegion4::open_path_with(
231        path,
232        indexbus_transport_shm::JournalOpenOptions::new().blocking(false),
233    )
234    .map_err(|e| format!("attach failed: {e:?}"))?;
235
236    let layout = unsafe { &*region.as_ptr() };
237    let pub_pos = load_abi_u64(&layout.pub_pos);
238    let subscribers = layout
239        .sub_pos
240        .iter()
241        .enumerate()
242        .map(|(idx, pos)| {
243            let pos = load_abi_u64(pos);
244            serde_json::json!({
245                "subscriber": idx,
246                "configured": idx < active_subscribers,
247                "pos": pos,
248                "lag_bytes": pub_pos.saturating_sub(pos),
249            })
250        })
251        .collect::<Vec<_>>();
252
253    let segments = layout
254        .segments
255        .iter()
256        .enumerate()
257        .map(|(idx, seg)| {
258            serde_json::json!({
259                "segment": idx,
260                "segment_id": load_abi_u32(&seg.segment_id),
261                "segment_len": load_abi_u32(&seg.segment_len),
262                "tail": load_abi_u64(&seg.tail),
263            })
264        })
265        .collect::<Vec<_>>();
266
267    Ok(serde_json::json!({
268        "pub_pos": pub_pos,
269        "active_segment": load_abi_u32(&layout.active_segment),
270        "subscriber_positions": subscribers,
271        "segments": segments,
272    }))
273}
274
275fn snapshot_sequenced_slots_lane(
276    path: &Path,
277    active_gating: usize,
278) -> Result<serde_json::Value, String> {
279    let opts = byteor_pipeline_backings_shm::AttachOptions {
280        blocking: false,
281        path: Some(path.to_path_buf()),
282        prefault: false,
283        queue: 0,
284    };
285    let shm = byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, "ignored")
286        .map_err(|e| format!("attach failed: {e}"))?;
287    let snap = shm.snapshot();
288    let active_gating = active_gating.clamp(1, 4);
289
290    Ok(serde_json::json!({
291        "cursor": snap.cursor,
292        "gating": snap.gating,
293        "min_gating": snap.min_gating(active_gating),
294        "min_lag": snap.min_lag(active_gating),
295        "wrap_pressure": snap.wrap_pressure(active_gating, shm.window_capacity()),
296        "headroom": snap.headroom(active_gating, shm.window_capacity()),
297        "window_capacity": shm.window_capacity(),
298    }))
299}
300
301pub(crate) fn snapshot_lane_graph_state(
302    spec: &byteor_pipeline_spec::LaneGraphV1,
303    lane_root: Option<&Path>,
304) -> serde_json::Value {
305    snapshot_lane_graph_state_with_role_activity(spec, lane_root, None)
306}
307
308pub(crate) fn snapshot_lane_graph_state_with_role_activity(
309    spec: &byteor_pipeline_spec::LaneGraphV1,
310    lane_root: Option<&Path>,
311    role_activity: Option<&[LaneGraphRoleActivitySnapshot]>,
312) -> serde_json::Value {
313    use byteor_pipeline_spec::LaneKindV1;
314
315    let usage = lane_graph_usage(spec);
316    let Some(lane_root) = lane_root else {
317        let mut snapshot = serde_json::json!({
318            "ok": false,
319            "error": "lane root not provided",
320            "lanes": [],
321        });
322        if let Some(role_activity) = role_activity {
323            snapshot["roles"] = snapshot_lane_graph_roles(spec, role_activity);
324        }
325        return snapshot;
326    };
327
328    let lanes = spec
329        .lanes
330        .iter()
331        .map(|lane| {
332            let usage = usage.get(lane.name.as_str()).copied().unwrap_or_default();
333            let lane_path = lane_graph_lane_path(lane_root, lane.name.as_str());
334            let mut value = serde_json::json!({
335                "name": lane.name,
336                "kind": match lane.kind {
337                    LaneKindV1::Events => "events",
338                    LaneKindV1::Journal => "journal",
339                    LaneKindV1::SequencedSlots { .. } => "sequenced_slots",
340                    LaneKindV1::FanoutBroadcast { .. } => "fanout_broadcast",
341                },
342                "path": lane_path.display().to_string(),
343                "usage": {
344                    "producers": usage.producers,
345                    "consumers": usage.consumers,
346                },
347            });
348
349            let snap = match lane.kind {
350                LaneKindV1::Events => snapshot_events_lane(&lane_path, usage.producers),
351                LaneKindV1::Journal => snapshot_journal_lane(&lane_path, usage.consumers),
352                LaneKindV1::SequencedSlots { .. } => {
353                    snapshot_sequenced_slots_lane(&lane_path, usage.consumers.max(1))
354                }
355                LaneKindV1::FanoutBroadcast { consumers } => {
356                    snapshot_fanout_lane(&lane_path, consumers as usize)
357                }
358            };
359
360            match snap {
361                Ok(state) => {
362                    value["ok"] = serde_json::json!(true);
363                    value["state"] = state;
364                }
365                Err(err) => {
366                    value["ok"] = serde_json::json!(false);
367                    value["error"] = serde_json::json!(err);
368                }
369            }
370
371            value
372        })
373        .collect::<Vec<_>>();
374
375    let ok = lanes
376        .iter()
377        .all(|lane| lane["ok"] == serde_json::json!(true));
378    let mut snapshot = serde_json::json!({
379        "ok": ok,
380        "lane_root": lane_root.display().to_string(),
381        "lanes": lanes,
382    });
383    if let Some(role_activity) = role_activity {
384        snapshot["roles"] = snapshot_lane_graph_roles(spec, role_activity);
385    }
386    snapshot
387}
388
389/// Return the default SHM directory associated with a lane-graph spec path.
390pub fn default_lane_graph_shm_dir(spec_path: &Path) -> PathBuf {
391    default_lane_graph_shm_dir_for_spec(spec_path)
392}
393
394/// Snapshot a LaneGraph runtime using the spec, lane root, and optional role activity.
395pub fn snapshot_lane_graph_json(
396    spec_path: &Path,
397    lane_root: Option<&Path>,
398    role_activity: Option<&[LaneGraphRoleActivitySnapshot]>,
399) -> Result<serde_json::Value, String> {
400    let spec = crate::spec::read_spec_kv_v1(spec_path)?;
401    let lane_graph = match spec {
402        byteor_pipeline_spec::PipelineSpecV1::LaneGraph(lane_graph) => lane_graph,
403        other => {
404            return Err(format!(
405                "spec model {:?} does not support lane-graph snapshots",
406                other.model()
407            ));
408        }
409    };
410    let default_lane_root = lane_root
411        .is_none()
412        .then(|| default_lane_graph_shm_dir_for_spec(spec_path));
413    Ok(snapshot_lane_graph_state_with_role_activity(
414        &lane_graph,
415        lane_root.or(default_lane_root.as_deref()),
416        role_activity,
417    ))
418}
419
420/// Snapshot a SingleRing backing and return a JSON object.
421pub fn snapshot_single_ring_json(
422    path: PathBuf,
423    active_gating: usize,
424) -> Result<serde_json::Value, String> {
425    let opts = byteor_pipeline_backings_shm::AttachOptions {
426        blocking: false,
427        path: Some(path),
428        prefault: false,
429        queue: 0,
430    };
431    let lane = "ignored";
432    let shm = byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, lane)
433        .map_err(|e| format!("attach failed: {e}"))?;
434
435    let snap = shm.snapshot();
436    Ok(serde_json::json!({
437        "cursor": snap.cursor,
438        "gating": snap.gating,
439        "min_gating": snap.min_gating(active_gating),
440        "min_lag": snap.min_lag(active_gating),
441        "wrap_pressure": snap.wrap_pressure(active_gating, shm.window_capacity()),
442    }))
443}