1use std::path::{Path, PathBuf};
2use std::sync::atomic::Ordering;
3use std::sync::atomic::{AtomicU32, AtomicU64};
4
5use byteor_pipeline_exec::LaneGraphRoleActivitySnapshot;
6
7#[derive(Clone, Copy, Default)]
8struct LaneUsage {
9 producers: usize,
10 consumers: usize,
11}
12
13fn lane_graph_usage(
14 lg: &byteor_pipeline_spec::LaneGraphV1,
15) -> std::collections::BTreeMap<String, LaneUsage> {
16 use byteor_pipeline_spec::EndpointKindV1;
17
18 let mut stats = std::collections::BTreeMap::<String, LaneUsage>::new();
19 for lane in &lg.lanes {
20 stats.insert(lane.name.clone(), LaneUsage::default());
21 }
22
23 for ep in &lg.endpoints {
24 if let Some(entry) = stats.get_mut(ep.lane.as_str()) {
25 match ep.kind {
26 EndpointKindV1::Ingress => entry.producers = entry.producers.saturating_add(1),
27 EndpointKindV1::Egress => entry.consumers = entry.consumers.saturating_add(1),
28 }
29 }
30 }
31
32 for role in &lg.roles {
33 match role {
34 byteor_pipeline_spec::RoleCfgV1::Stage(cfg) => {
35 if let Some(entry) = stats.get_mut(cfg.rx.as_str()) {
36 entry.consumers = entry.consumers.saturating_add(1);
37 }
38 if let Some(entry) = stats.get_mut(cfg.tx.as_str()) {
39 entry.producers = entry.producers.saturating_add(1);
40 }
41 }
42 byteor_pipeline_spec::RoleCfgV1::Bridge(cfg) => {
43 if let Some(entry) = stats.get_mut(cfg.rx.as_str()) {
44 entry.consumers = entry.consumers.saturating_add(1);
45 }
46 if let Some(entry) = stats.get_mut(cfg.tx.as_str()) {
47 entry.producers = entry.producers.saturating_add(1);
48 }
49 }
50 byteor_pipeline_spec::RoleCfgV1::Router(cfg) => {
51 if let Some(entry) = stats.get_mut(cfg.rx.as_str()) {
52 entry.consumers = entry.consumers.saturating_add(1);
53 }
54 for tx in &cfg.tx {
55 if let Some(entry) = stats.get_mut(tx.as_str()) {
56 entry.producers = entry.producers.saturating_add(1);
57 }
58 }
59 }
60 byteor_pipeline_spec::RoleCfgV1::Merge(cfg) => {
61 for rx in &cfg.rx {
62 if let Some(entry) = stats.get_mut(rx.as_str()) {
63 entry.consumers = entry.consumers.saturating_add(1);
64 }
65 }
66 if let Some(entry) = stats.get_mut(cfg.tx.as_str()) {
67 entry.producers = entry.producers.saturating_add(1);
68 }
69 }
70 }
71 }
72
73 stats
74}
75
76fn sanitize_lane_name(lane: &str) -> String {
77 lane.chars()
78 .map(|c| {
79 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
80 c
81 } else {
82 '_'
83 }
84 })
85 .collect()
86}
87
88fn default_lane_graph_shm_dir_for_spec(spec_path: &Path) -> PathBuf {
89 let parent = spec_path.parent().unwrap_or_else(|| Path::new("."));
90 let stem = spec_path
91 .file_stem()
92 .and_then(|value| value.to_str())
93 .unwrap_or("lane_graph");
94 parent.join(format!("{stem}.lanes"))
95}
96
97fn role_kind_label(role: &byteor_pipeline_spec::RoleCfgV1) -> &'static str {
98 match role {
99 byteor_pipeline_spec::RoleCfgV1::Stage(_) => "stage",
100 byteor_pipeline_spec::RoleCfgV1::Bridge(_) => "bridge",
101 byteor_pipeline_spec::RoleCfgV1::Router(_) => "router",
102 byteor_pipeline_spec::RoleCfgV1::Merge(_) => "merge",
103 }
104}
105
106fn role_lane_refs(role: &byteor_pipeline_spec::RoleCfgV1) -> (Vec<String>, Vec<String>) {
107 match role {
108 byteor_pipeline_spec::RoleCfgV1::Stage(cfg) => (vec![cfg.rx.clone()], vec![cfg.tx.clone()]),
109 byteor_pipeline_spec::RoleCfgV1::Bridge(cfg) => {
110 (vec![cfg.rx.clone()], vec![cfg.tx.clone()])
111 }
112 byteor_pipeline_spec::RoleCfgV1::Router(cfg) => (vec![cfg.rx.clone()], cfg.tx.clone()),
113 byteor_pipeline_spec::RoleCfgV1::Merge(cfg) => (cfg.rx.clone(), vec![cfg.tx.clone()]),
114 }
115}
116
117fn snapshot_lane_graph_roles(
118 spec: &byteor_pipeline_spec::LaneGraphV1,
119 role_activity: &[LaneGraphRoleActivitySnapshot],
120) -> serde_json::Value {
121 let activity_by_name = role_activity
122 .iter()
123 .map(|role| (role.name.as_str(), role))
124 .collect::<std::collections::BTreeMap<_, _>>();
125
126 serde_json::json!(spec
127 .roles
128 .iter()
129 .map(|role| {
130 let activity = activity_by_name.get(role.name());
131 let (input_lanes, output_lanes) = role_lane_refs(role);
132 serde_json::json!({
133 "name": role.name(),
134 "kind": role_kind_label(role),
135 "status": activity.map(|entry| entry.status()).unwrap_or("unavailable"),
136 "active_work": activity.map(|entry| entry.active_work),
137 "processed_count": activity.map(|entry| entry.processed_count),
138 "progress_epoch": activity.map(|entry| entry.progress_epoch),
139 "input_lanes": input_lanes,
140 "output_lanes": output_lanes,
141 })
142 })
143 .collect::<Vec<_>>())
144}
145
146pub(crate) fn lane_graph_lane_path(root: &Path, lane: &str) -> PathBuf {
147 root.join(format!("{}.mmap", sanitize_lane_name(lane)))
148}
149
150fn load_abi_u32(v: &indexbus_abi::IndexbusAtomicU32) -> u32 {
151 unsafe { (&*(v as *const _ as *const AtomicU32)).load(Ordering::Acquire) }
152}
153
154fn load_abi_u64(v: &indexbus_abi::IndexbusAtomicU64) -> u64 {
155 unsafe { (&*(v as *const _ as *const AtomicU64)).load(Ordering::Acquire) }
156}
157
158fn snapshot_index_queue(q: &indexbus_abi::layouts::IndexQueue) -> serde_json::Value {
159 let head = load_abi_u64(&q.head);
160 let tail = load_abi_u64(&q.tail);
161 serde_json::json!({
162 "head": head,
163 "tail": tail,
164 "depth": head.wrapping_sub(tail),
165 })
166}
167
168fn snapshot_mpsc_queue(q: &indexbus_abi::layouts::MpscQueue) -> serde_json::Value {
169 let write = load_abi_u64(&q.write);
170 let read = load_abi_u64(&q.read);
171 serde_json::json!({
172 "write": write,
173 "read": read,
174 "depth": write.wrapping_sub(read),
175 })
176}
177
178fn snapshot_events_lane(path: &Path, producers: usize) -> Result<serde_json::Value, String> {
179 let region = byteor_transport_shm::EventsChainRegion::<4>::open_path_with(
180 path,
181 byteor_transport_shm::OpenOptions::new().blocking(false),
182 )
183 .map_err(|e| format!("attach failed: {e:?}"))?;
184
185 let layout = unsafe { &*region.as_ptr() };
186 let selected_queue_kind = if producers > 1 { "mpsc" } else { "spsc" };
187
188 Ok(serde_json::json!({
189 "selected_queue_kind": selected_queue_kind,
190 "spsc_queue": snapshot_index_queue(&layout.queues[0]),
191 "mpsc_queue": snapshot_mpsc_queue(&layout.mpsc_queues[0]),
192 }))
193}
194
195fn snapshot_fanout_lane(
196 path: &Path,
197 configured_consumers: usize,
198) -> Result<serde_json::Value, String> {
199 let region = indexbus_transport_shm::FanoutRegion::<4>::open_path_with(
200 path,
201 indexbus_transport_shm::OpenOptions::new().blocking(false),
202 )
203 .map_err(|e| format!("attach failed: {e:?}"))?;
204
205 let layout = unsafe { &*region.as_ptr() };
206 let consumers = configured_consumers.min(layout.consumer_queues.len());
207 let consumer_queues = layout.consumer_queues[..consumers]
208 .iter()
209 .enumerate()
210 .map(|(idx, q)| {
211 serde_json::json!({
212 "consumer": idx,
213 "queue": snapshot_index_queue(q),
214 })
215 })
216 .collect::<Vec<_>>();
217
218 Ok(serde_json::json!({
219 "router_rr": load_abi_u32(&layout.router_rr),
220 "producer_spsc": snapshot_index_queue(&layout.producer_queue),
221 "producer_mpsc": snapshot_mpsc_queue(&layout.producer_queue_mpsc),
222 "consumer_queues": consumer_queues,
223 }))
224}
225
226fn snapshot_journal_lane(
227 path: &Path,
228 active_subscribers: usize,
229) -> Result<serde_json::Value, String> {
230 let region = indexbus_transport_shm::JournalRegion4::open_path_with(
231 path,
232 indexbus_transport_shm::JournalOpenOptions::new().blocking(false),
233 )
234 .map_err(|e| format!("attach failed: {e:?}"))?;
235
236 let layout = unsafe { &*region.as_ptr() };
237 let pub_pos = load_abi_u64(&layout.pub_pos);
238 let subscribers = layout
239 .sub_pos
240 .iter()
241 .enumerate()
242 .map(|(idx, pos)| {
243 let pos = load_abi_u64(pos);
244 serde_json::json!({
245 "subscriber": idx,
246 "configured": idx < active_subscribers,
247 "pos": pos,
248 "lag_bytes": pub_pos.saturating_sub(pos),
249 })
250 })
251 .collect::<Vec<_>>();
252
253 let segments = layout
254 .segments
255 .iter()
256 .enumerate()
257 .map(|(idx, seg)| {
258 serde_json::json!({
259 "segment": idx,
260 "segment_id": load_abi_u32(&seg.segment_id),
261 "segment_len": load_abi_u32(&seg.segment_len),
262 "tail": load_abi_u64(&seg.tail),
263 })
264 })
265 .collect::<Vec<_>>();
266
267 Ok(serde_json::json!({
268 "pub_pos": pub_pos,
269 "active_segment": load_abi_u32(&layout.active_segment),
270 "subscriber_positions": subscribers,
271 "segments": segments,
272 }))
273}
274
275fn snapshot_sequenced_slots_lane(
276 path: &Path,
277 active_gating: usize,
278) -> Result<serde_json::Value, String> {
279 let opts = byteor_pipeline_backings_shm::AttachOptions {
280 blocking: false,
281 path: Some(path.to_path_buf()),
282 prefault: false,
283 queue: 0,
284 };
285 let shm = byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, "ignored")
286 .map_err(|e| format!("attach failed: {e}"))?;
287 let snap = shm.snapshot();
288 let active_gating = active_gating.clamp(1, 4);
289
290 Ok(serde_json::json!({
291 "cursor": snap.cursor,
292 "gating": snap.gating,
293 "min_gating": snap.min_gating(active_gating),
294 "min_lag": snap.min_lag(active_gating),
295 "wrap_pressure": snap.wrap_pressure(active_gating, shm.window_capacity()),
296 "headroom": snap.headroom(active_gating, shm.window_capacity()),
297 "window_capacity": shm.window_capacity(),
298 }))
299}
300
301pub(crate) fn snapshot_lane_graph_state(
302 spec: &byteor_pipeline_spec::LaneGraphV1,
303 lane_root: Option<&Path>,
304) -> serde_json::Value {
305 snapshot_lane_graph_state_with_role_activity(spec, lane_root, None)
306}
307
308pub(crate) fn snapshot_lane_graph_state_with_role_activity(
309 spec: &byteor_pipeline_spec::LaneGraphV1,
310 lane_root: Option<&Path>,
311 role_activity: Option<&[LaneGraphRoleActivitySnapshot]>,
312) -> serde_json::Value {
313 use byteor_pipeline_spec::LaneKindV1;
314
315 let usage = lane_graph_usage(spec);
316 let Some(lane_root) = lane_root else {
317 let mut snapshot = serde_json::json!({
318 "ok": false,
319 "error": "lane root not provided",
320 "lanes": [],
321 });
322 if let Some(role_activity) = role_activity {
323 snapshot["roles"] = snapshot_lane_graph_roles(spec, role_activity);
324 }
325 return snapshot;
326 };
327
328 let lanes = spec
329 .lanes
330 .iter()
331 .map(|lane| {
332 let usage = usage.get(lane.name.as_str()).copied().unwrap_or_default();
333 let lane_path = lane_graph_lane_path(lane_root, lane.name.as_str());
334 let mut value = serde_json::json!({
335 "name": lane.name,
336 "kind": match lane.kind {
337 LaneKindV1::Events => "events",
338 LaneKindV1::Journal => "journal",
339 LaneKindV1::SequencedSlots { .. } => "sequenced_slots",
340 LaneKindV1::FanoutBroadcast { .. } => "fanout_broadcast",
341 },
342 "path": lane_path.display().to_string(),
343 "usage": {
344 "producers": usage.producers,
345 "consumers": usage.consumers,
346 },
347 });
348
349 let snap = match lane.kind {
350 LaneKindV1::Events => snapshot_events_lane(&lane_path, usage.producers),
351 LaneKindV1::Journal => snapshot_journal_lane(&lane_path, usage.consumers),
352 LaneKindV1::SequencedSlots { .. } => {
353 snapshot_sequenced_slots_lane(&lane_path, usage.consumers.max(1))
354 }
355 LaneKindV1::FanoutBroadcast { consumers } => {
356 snapshot_fanout_lane(&lane_path, consumers as usize)
357 }
358 };
359
360 match snap {
361 Ok(state) => {
362 value["ok"] = serde_json::json!(true);
363 value["state"] = state;
364 }
365 Err(err) => {
366 value["ok"] = serde_json::json!(false);
367 value["error"] = serde_json::json!(err);
368 }
369 }
370
371 value
372 })
373 .collect::<Vec<_>>();
374
375 let ok = lanes
376 .iter()
377 .all(|lane| lane["ok"] == serde_json::json!(true));
378 let mut snapshot = serde_json::json!({
379 "ok": ok,
380 "lane_root": lane_root.display().to_string(),
381 "lanes": lanes,
382 });
383 if let Some(role_activity) = role_activity {
384 snapshot["roles"] = snapshot_lane_graph_roles(spec, role_activity);
385 }
386 snapshot
387}
388
389pub fn default_lane_graph_shm_dir(spec_path: &Path) -> PathBuf {
391 default_lane_graph_shm_dir_for_spec(spec_path)
392}
393
394pub fn snapshot_lane_graph_json(
396 spec_path: &Path,
397 lane_root: Option<&Path>,
398 role_activity: Option<&[LaneGraphRoleActivitySnapshot]>,
399) -> Result<serde_json::Value, String> {
400 let spec = crate::spec::read_spec_kv_v1(spec_path)?;
401 let lane_graph = match spec {
402 byteor_pipeline_spec::PipelineSpecV1::LaneGraph(lane_graph) => lane_graph,
403 other => {
404 return Err(format!(
405 "spec model {:?} does not support lane-graph snapshots",
406 other.model()
407 ));
408 }
409 };
410 let default_lane_root = lane_root
411 .is_none()
412 .then(|| default_lane_graph_shm_dir_for_spec(spec_path));
413 Ok(snapshot_lane_graph_state_with_role_activity(
414 &lane_graph,
415 lane_root.or(default_lane_root.as_deref()),
416 role_activity,
417 ))
418}
419
420pub fn snapshot_single_ring_json(
422 path: PathBuf,
423 active_gating: usize,
424) -> Result<serde_json::Value, String> {
425 let opts = byteor_pipeline_backings_shm::AttachOptions {
426 blocking: false,
427 path: Some(path),
428 prefault: false,
429 queue: 0,
430 };
431 let lane = "ignored";
432 let shm = byteor_pipeline_backings_shm::ShmSequencedSlots::attach(&opts, lane)
433 .map_err(|e| format!("attach failed: {e}"))?;
434
435 let snap = shm.snapshot();
436 Ok(serde_json::json!({
437 "cursor": snap.cursor,
438 "gating": snap.gating,
439 "min_gating": snap.min_gating(active_gating),
440 "min_lag": snap.min_lag(active_gating),
441 "wrap_pressure": snap.wrap_pressure(active_gating, shm.window_capacity()),
442 }))
443}