byteor_edgeplane/
main.rs

1use std::path::PathBuf;
2use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
3use std::thread::{self, JoinHandle};
4use std::time::Duration;
5
6fn report_adapter_snapshot(
7    reporter: Option<&byteor_app_kit::agent::RuntimeReporter>,
8    stats: &byteor_adapters::AdapterLoopStats,
9    error: Option<String>,
10) {
11    let Some(reporter) = reporter else {
12        return;
13    };
14
15    if let Err(report_error) = reporter.report_snapshot_now("single_ring", || {
16        Ok(adapter_snapshot_payload(stats, error.clone()))
17    }) {
18        eprintln!("agent snapshot failed: {report_error}");
19    }
20    if let Err(report_error) = reporter.report_heartbeat_now() {
21        eprintln!("agent heartbeat failed: {report_error}");
22    }
23}
24
25fn adapter_snapshot_payload(
26    stats: &byteor_adapters::AdapterLoopStats,
27    error: Option<String>,
28) -> serde_json::Value {
29    let mut snapshot = serde_json::json!({
30        "mode": "adapter_loop",
31        "ok": error.is_none(),
32        "ingress_messages": stats.ingress_messages,
33        "egress_messages": stats.egress_messages,
34    });
35    if let Some(error) = error {
36        snapshot["error"] = serde_json::json!(error);
37    }
38    stats.merge_into_snapshot(&mut snapshot);
39    snapshot
40}
41
42fn idle_adapter_snapshot_payload(
43    ingress_name: &str,
44    ingress_transport: &str,
45    egress_name: &str,
46    egress_transport: &str,
47) -> serde_json::Value {
48    serde_json::json!({
49        "mode": "adapter_loop",
50        "ok": true,
51        "ingress_messages": 0,
52        "egress_messages": 0,
53        "adapters": [
54            {
55                "name": ingress_name,
56                "role": "ingress",
57                "transport": ingress_transport,
58                "connection_state": "ready",
59                "message_count": 0,
60                "recent_failures": 0,
61                "last_error": null,
62                "lag": null,
63                "details": {}
64            },
65            {
66                "name": egress_name,
67                "role": "egress",
68                "transport": egress_transport,
69                "connection_state": "ready",
70                "message_count": 0,
71                "recent_failures": 0,
72                "last_error": null,
73                "lag": null,
74                "details": {}
75            }
76        ]
77    })
78}
79
80fn spawn_runtime_reporting_keepalive(
81    reporter: Option<&byteor_app_kit::agent::RuntimeReporter>,
82    idle_snapshot: Option<serde_json::Value>,
83) -> Option<(Arc<AtomicBool>, JoinHandle<()>)> {
84    let mut reporter = reporter?.clone();
85    let stop = Arc::new(AtomicBool::new(false));
86    let stop_signal = Arc::clone(&stop);
87    let handle = thread::spawn(move || {
88        while !stop_signal.load(Ordering::Relaxed) {
89            if let Err(error) = reporter.report_heartbeat_if_due() {
90                eprintln!("agent heartbeat failed: {error}");
91            }
92            if let Some(snapshot) = idle_snapshot.as_ref() {
93                if let Err(error) = reporter.report_snapshot_if_due("single_ring", || Ok(snapshot.clone())) {
94                    eprintln!("agent snapshot failed: {error}");
95                }
96            }
97            thread::sleep(Duration::from_millis(250));
98        }
99    });
100    Some((stop, handle))
101}
102
103fn stop_runtime_reporting_keepalive(
104    keepalive: Option<(Arc<AtomicBool>, JoinHandle<()>)>,
105) {
106    let Some((stop, handle)) = keepalive else {
107        return;
108    };
109    stop.store(true, Ordering::Relaxed);
110    let _ = handle.join();
111}
112
113fn main() {
114    // Minimal enterprise app scaffolding.
115    // In v1 we delegate operator tooling commands to `byteor-runtime` library helpers.
116
117    let mut args = std::env::args().skip(1);
118    let cmd = args.next().unwrap_or_else(|| "help".to_string());
119
120    match cmd.as_str() {
121        "help" | "-h" | "--help" => {
122            eprintln!(
123                "byteor-edgeplane\n\nUSAGE:\n  byteor-edgeplane validate --spec=/path\n  byteor-edgeplane describe --spec=/path\n  byteor-edgeplane dot --spec=/path\n  byteor-edgeplane run --spec=/path [--profile=default|low-latency|isolated-core] [--config=/path] [--lane-path=/path] [--shm-parent=/path] [--mlockall=on|off] [--wait=spin|backoff] [--pin=none|balanced|physical] [--sched=other|fifo|rr] [--rt-prio=N] [--dry-run] [--env=dev|staging|prod] [--approval=<credential>] [--approval-key-set=/path] [--adapter-max-messages=N]  # long-running (Ctrl-C to stop unless bounded)\n  byteor-edgeplane run --sp --spec=/path [--profile=default|low-latency|isolated-core] [--config=/path] [--lane-path=/path] [--shm-parent=/path] [--mlockall=on|off] [--wait=spin|backoff] [--pin=none|balanced|physical] [--sched=other|fifo|rr] [--rt-prio=N] [--input-hex=..|--input-ascii=..] [--dry-run] [--env=dev|staging|prod] [--approval=<credential>] [--approval-key-set=/path]  # one-shot SingleRing or long-running LaneGraph\n  byteor-edgeplane doctor [--profile=default|low-latency|isolated-core] [--mlockall=on|off] [--shm-parent=/path]\n  byteor-edgeplane snapshot --ring-path=/path [--active-gating=N]\n  byteor-edgeplane incident-bundle --out=/path --spec=/path [--config=/path] [--ring-path=/path]\n\nNOTES:\n  - ResolverKey stages supported:\n    - http_post:<url>\n    - exec:<program>|<arg>|...\n"
124            );
125            std::process::exit(2);
126        }
127        "validate" => {
128            let mut spec: Option<PathBuf> = None;
129            for a in args {
130                if let Some(v) = a.strip_prefix("--spec=") {
131                    spec = Some(PathBuf::from(v));
132                } else {
133                    eprintln!("unknown arg: {a}");
134                    std::process::exit(2);
135                }
136            }
137
138            let Some(spec) = spec else {
139                eprintln!("missing --spec");
140                std::process::exit(2);
141            };
142
143            match byteor_runtime::validate_spec_kv_v1(&spec) {
144                Ok(()) => {
145                    println!("{}", serde_json::json!({"ok": true}));
146                    std::process::exit(0);
147                }
148                Err(e) => {
149                    eprintln!("validate failed: {e}");
150                    println!("{}", serde_json::json!({"ok": false, "error": e}));
151                    std::process::exit(1);
152                }
153            }
154        }
155        "describe" => {
156            let mut spec: Option<PathBuf> = None;
157            for a in args {
158                if let Some(v) = a.strip_prefix("--spec=") {
159                    spec = Some(PathBuf::from(v));
160                } else {
161                    eprintln!("unknown arg: {a}");
162                    std::process::exit(2);
163                }
164            }
165
166            let Some(spec) = spec else {
167                eprintln!("missing --spec");
168                std::process::exit(2);
169            };
170
171            match byteor_runtime::describe_spec_kv_v1(&spec) {
172                Ok(s) => {
173                    print!("{s}");
174                    std::process::exit(0);
175                }
176                Err(e) => {
177                    eprintln!("describe failed: {e}");
178                    std::process::exit(1);
179                }
180            }
181        }
182        "dot" => {
183            let mut spec: Option<PathBuf> = None;
184            for a in args {
185                if let Some(v) = a.strip_prefix("--spec=") {
186                    spec = Some(PathBuf::from(v));
187                } else {
188                    eprintln!("unknown arg: {a}");
189                    std::process::exit(2);
190                }
191            }
192
193            let Some(spec) = spec else {
194                eprintln!("missing --spec");
195                std::process::exit(2);
196            };
197
198            match byteor_runtime::dot_spec_kv_v1(&spec) {
199                Ok(s) => {
200                    print!("{s}");
201                    std::process::exit(0);
202                }
203                Err(e) => {
204                    eprintln!("dot failed: {e}");
205                    std::process::exit(1);
206                }
207            }
208        }
209        "run" => {
210            let mut sp = false;
211            let mut spec: Option<PathBuf> = None;
212            let mut lane_path: Option<PathBuf> = None;
213            let mut rt_prio: Option<u8> = None;
214            let mut input_hex: Option<String> = None;
215            let mut input_ascii: Option<String> = None;
216            let mut dry_run = false;
217            let mut env = byteor_core::config::Environment::Dev;
218            let mut approval: Option<String> = None;
219            let mut approval_key_sets: Vec<PathBuf> = Vec::new();
220            let mut config_path: Option<PathBuf> = None;
221            let mut adapter_max_messages: Option<u64> = None;
222            let mut tuning = byteor_runtime::TuningCliState::new(byteor_core::paths::shm_parent_from_env());
223
224            for a in args {
225                if a == "--sp" {
226                    sp = true;
227                } else if a == "--dry-run" {
228                    dry_run = true;
229                } else if let Some(v) = a.strip_prefix("--dry-run=") {
230                    dry_run = matches!(v, "on" | "1" | "true" | "yes");
231                } else if let Some(v) = a.strip_prefix("--spec=") {
232                    spec = Some(PathBuf::from(v));
233                } else if let Some(v) = a.strip_prefix("--lane-path=") {
234                    lane_path = Some(PathBuf::from(v));
235                } else if let Some(v) = a.strip_prefix("--profile=") {
236                    let profile = byteor_runtime::TuningProfile::parse(v).unwrap_or_else(|| {
237                        eprintln!(
238                            "unknown --profile: {v} (expected default|low-latency|isolated-core)"
239                        );
240                        std::process::exit(2);
241                    });
242                    tuning.set_profile(profile);
243                } else if let Some(v) = a.strip_prefix("--shm-parent=") {
244                    tuning.set_shm_parent(PathBuf::from(v));
245                } else if let Some(v) = a.strip_prefix("--mlockall=") {
246                    let preset = match v {
247                        "on" | "1" | "true" | "yes" => byteor_runtime::MlockallPreset::On,
248                        "off" | "0" | "false" | "no" | "none" => {
249                            byteor_runtime::MlockallPreset::None
250                        }
251                        _ => {
252                            eprintln!("unknown --mlockall: {v} (expected on|off)");
253                            std::process::exit(2);
254                        }
255                    };
256                    tuning.set_mlockall(preset);
257                } else if let Some(v) = a.strip_prefix("--sched=") {
258                    let parsed_sched = match byteor_runtime::SchedPreset::parse_kind(v) {
259                        Some(x) => x,
260                        None => {
261                            eprintln!("unknown --sched: {v} (expected other|fifo|rr)");
262                            std::process::exit(2);
263                        }
264                    };
265                    tuning.set_sched(parsed_sched);
266                } else if let Some(v) = a.strip_prefix("--rt-prio=") {
267                    rt_prio = Some(v.parse().unwrap_or_else(|_| {
268                        eprintln!("invalid --rt-prio: {v}");
269                        std::process::exit(2);
270                    }));
271                } else if let Some(v) = a.strip_prefix("--env=") {
272                    env = match byteor_core::config::Environment::parse(v) {
273                        Some(x) => x,
274                        None => {
275                            eprintln!("unknown --env: {v} (expected dev|staging|prod)");
276                            std::process::exit(2);
277                        }
278                    };
279                } else if let Some(v) = a.strip_prefix("--approval=") {
280                    approval = Some(v.to_string());
281                } else if let Some(v) = a.strip_prefix("--approval-key-set=") {
282                    approval_key_sets.push(PathBuf::from(v));
283                } else if let Some(v) = a.strip_prefix("--config=") {
284                    config_path = Some(PathBuf::from(v));
285                } else if let Some(v) = a.strip_prefix("--adapter-max-messages=") {
286                    let parsed = v.parse().unwrap_or_else(|_| {
287                        eprintln!("invalid --adapter-max-messages: {v}");
288                        std::process::exit(2);
289                    });
290                    if parsed == 0 {
291                        eprintln!("invalid --adapter-max-messages: {v} (expected > 0)");
292                        std::process::exit(2);
293                    }
294                    adapter_max_messages = Some(parsed);
295                } else if let Some(v) = a.strip_prefix("--wait=") {
296                    let parsed_wait = byteor_runtime::parse_wait_preset(v).unwrap_or_else(|| {
297                        eprintln!("unknown --wait preset: {v}");
298                        std::process::exit(2);
299                    });
300                    tuning.set_wait(parsed_wait);
301                } else if let Some(v) = a.strip_prefix("--pin=") {
302                    let parsed_pinning = match byteor_runtime::PinningPreset::parse(v) {
303                        Some(p) => p,
304                        None => {
305                            eprintln!(
306                                "unknown --pin preset: {v} (expected none|balanced|physical)"
307                            );
308                            std::process::exit(2);
309                        }
310                    };
311                    tuning.set_pinning(parsed_pinning);
312                } else if let Some(v) = a.strip_prefix("--input-hex=") {
313                    input_hex = Some(v.to_string());
314                } else if let Some(v) = a.strip_prefix("--input-ascii=") {
315                    input_ascii = Some(v.to_string());
316                } else {
317                    eprintln!("unknown arg: {a}");
318                    std::process::exit(2);
319                }
320            }
321            let Some(spec) = spec else {
322                eprintln!("missing --spec");
323                std::process::exit(2);
324            };
325
326            let mut resolved_tuning = tuning.resolve();
327            if let Some(p) = rt_prio {
328                if resolved_tuning.sched == byteor_runtime::SchedPreset::Other {
329                    eprintln!("--rt-prio requires --sched=fifo|rr");
330                    std::process::exit(2);
331                }
332                if !(1..=99).contains(&p) {
333                    eprintln!("invalid --rt-prio: {p} (expected 1..=99)");
334                    std::process::exit(2);
335                }
336                resolved_tuning.sched = resolved_tuning.sched.with_rt_prio(p);
337            }
338            let profile = resolved_tuning.profile;
339            let wait = resolved_tuning.wait;
340            let pinning = resolved_tuning.pinning;
341            let mlockall = resolved_tuning.mlockall;
342            let sched = resolved_tuning.sched;
343            let shm_parent = resolved_tuning.shm_parent.clone();
344            let tuning_report = byteor_runtime::effective_tuning_report(profile, &resolved_tuning);
345
346            let lane_path = byteor_core::paths::default_lane_path_for_run(
347                byteor_core::Product::Edgeplane,
348                lane_path,
349                &shm_parent,
350            );
351
352            if dry_run {
353                match byteor_runtime::read_spec_kv_v1(&spec) {
354                    Ok(spec_v1) => {
355                        let stages = spec_v1.single_ring().map(|r| r.stages.len()).unwrap_or(0);
356                        let resolved_config_path = config_path
357                            .clone()
358                            .or_else(|| byteor_runtime::default_bundle_config_path(&spec));
359                        println!(
360                            "{}",
361                            serde_json::json!({
362                                "ok": true,
363                                "dry_run": true,
364                                "model": format!("{:?}", spec_v1.model()).to_lowercase(),
365                                "stages": stages,
366                                "lane_path": lane_path.display().to_string(),
367                                "config_path": resolved_config_path
368                                    .as_ref()
369                                    .map(|path| path.display().to_string()),
370                            })
371                        );
372                        std::process::exit(0);
373                    }
374                    Err(e) => {
375                        eprintln!("dry-run failed: {e}");
376                        println!("{}", serde_json::json!({"ok": false, "error": e}));
377                        std::process::exit(1);
378                    }
379                }
380            }
381
382            if let Err(e) = byteor_runtime::apply_mlockall_preset(mlockall) {
383                eprintln!(
384                    "{}",
385                    byteor_runtime::tuning_host_readiness_error(format!(
386                        "mlockall apply failed: preset={} err={e}",
387                        mlockall.as_str()
388                    ))
389                );
390                std::process::exit(1);
391            }
392
393            if let Err(e) = byteor_runtime::apply_sched_preset(sched) {
394                eprintln!(
395                    "{}",
396                    byteor_runtime::tuning_host_readiness_error(format!(
397                        "sched apply failed: sched={} prio={:?} err={e}",
398                        sched.kind_str(),
399                        sched.rt_prio()
400                    ))
401                );
402                std::process::exit(1);
403            }
404
405            let canonical_spec_kv = match byteor_runtime::canonical_encode_spec_kv_v1(&spec) {
406                Ok(spec_kv) => spec_kv,
407                Err(e) => {
408                    eprintln!("canonical spec failed: {e}");
409                    println!("{}", serde_json::json!({"ok": false, "error": e}));
410                    std::process::exit(1);
411                }
412            };
413
414            if approval.is_none() {
415                approval =
416                    match byteor_app_kit::approval::load_approval_credential_from_bundle(&spec) {
417                        Ok(credential) => credential,
418                        Err(e) => {
419                            eprintln!("approval credential load failed: {e}");
420                            println!("{}", serde_json::json!({"ok": false, "error": e}));
421                            std::process::exit(1);
422                        }
423                    };
424            }
425
426            if approval_key_sets.is_empty() {
427                approval_key_sets =
428                    byteor_app_kit::approval::discover_trusted_approval_key_paths(&spec);
429            }
430
431            let approval_refresh_bootstrap =
432                match byteor_app_kit::approval::approval_refresh_bootstrap_from_env() {
433                    Ok(bootstrap) => bootstrap,
434                    Err(e) => {
435                        eprintln!("approval key refresh bootstrap failed: {e}");
436                        println!("{}", serde_json::json!({"ok": false, "error": e}));
437                        std::process::exit(1);
438                    }
439                };
440
441            let trusted_approval_cache =
442                match byteor_app_kit::approval::load_runtime_trusted_approval_key_cache(
443                    &spec,
444                    &approval_key_sets,
445                    approval_refresh_bootstrap.as_ref(),
446                ) {
447                    Ok(cache) => cache,
448                    Err(e) => {
449                        eprintln!("approval key set load failed: {e}");
450                        println!("{}", serde_json::json!({"ok": false, "error": e}));
451                        std::process::exit(1);
452                    }
453                };
454
455            let approval_resolution = match byteor_app_kit::approval::resolve_runtime_approval(
456                &canonical_spec_kv,
457                env,
458                approval.as_deref(),
459                &trusted_approval_cache.keys,
460            ) {
461                Ok(resolution) => resolution,
462                Err(byteor_app_kit::approval::ApprovalVerificationError::UnknownKeyId(_))
463                    if approval_refresh_bootstrap.is_some() =>
464                {
465                    let refreshed_cache =
466                        match byteor_app_kit::approval::refresh_runtime_trusted_approval_key_cache(
467                            &spec,
468                            &approval_key_sets,
469                            approval_refresh_bootstrap
470                                .as_ref()
471                                .expect("refresh bootstrap present"),
472                        ) {
473                            Ok(cache) => cache,
474                            Err(refresh_error) => {
475                                let message = format!(
476                                    "approval verification failed: unknown approval signing key and refresh failed: {refresh_error}"
477                                );
478                                eprintln!("{message}");
479                                println!("{}", serde_json::json!({"ok": false, "error": message}));
480                                std::process::exit(1);
481                            }
482                        };
483
484                    match byteor_app_kit::approval::resolve_runtime_approval(
485                        &canonical_spec_kv,
486                        env,
487                        approval.as_deref(),
488                        &refreshed_cache.keys,
489                    ) {
490                        Ok(resolution) => resolution,
491                        Err(e) => {
492                            let message = format!("approval verification failed: {e}");
493                            eprintln!("{message}");
494                            println!("{}", serde_json::json!({"ok": false, "error": message}));
495                            std::process::exit(1);
496                        }
497                    }
498                }
499                Err(e) => {
500                    let message = format!("approval verification failed: {e}");
501                    eprintln!("{message}");
502                    println!("{}", serde_json::json!({"ok": false, "error": message}));
503                    std::process::exit(1);
504                }
505            };
506
507            let reporting_config = match byteor_app_kit::agent::reporting_config_from_bootstrap(
508                approval_refresh_bootstrap.as_ref(),
509            ) {
510                Ok(config) => config,
511                Err(e) => {
512                    eprintln!("agent reporting bootstrap failed: {e}");
513                    println!("{}", serde_json::json!({"ok": false, "error": e}));
514                    std::process::exit(1);
515                }
516            };
517
518            let approval_trust = match byteor_app_kit::agent::approval_trust_status(
519                &trusted_approval_cache,
520                approval.as_deref(),
521            ) {
522                Ok(status) => Some(status),
523                Err(e) => {
524                    eprintln!("agent approval trust export failed: {e}");
525                    println!("{}", serde_json::json!({"ok": false, "error": e}));
526                    std::process::exit(1);
527                }
528            };
529
530            let mut stage_catalog = byteor_runtime::default_stage_catalog_entries();
531            stage_catalog.extend(byteor_app_kit::actions::action_stage_catalog_entries());
532            let capability_document =
533                byteor_app_kit::capabilities::runtime_capability_document(
534                    "byteor-edgeplane",
535                    stage_catalog,
536                );
537
538            let mut runtime_reporter = reporting_config.map(|config| {
539                byteor_app_kit::agent::RuntimeReporter::new(
540                    config,
541                    env!("CARGO_PKG_VERSION").to_string(),
542                    approval_resolution.spec_hash.clone(),
543                    capability_document,
544                    approval_trust.clone(),
545                )
546            });
547
548            // Minimal Edgeplane v1 stage resolver: action stages (exec/http_post). Built-in
549            // `StageOpV1` stages (e.g. Identity/AddU8) do not require resolver wiring.
550            let resolver = byteor_app_kit::actions::build_action_stages_default_replay(
551                env,
552                approval_resolution.approval_token_present,
553                false, // is_replay
554                false, // dry_run (side effects)
555            );
556
557            let spec_v1 = match byteor_runtime::read_spec_kv_v1(&spec) {
558                Ok(spec_v1) => spec_v1,
559                Err(e) => {
560                    eprintln!("read spec failed: {e}");
561                    println!("{}", serde_json::json!({"ok": false, "error": e}));
562                    std::process::exit(1);
563                }
564            };
565
566            if spec_v1.single_ring().is_some()
567                && !sp
568                && input_hex.is_none()
569                && input_ascii.is_none()
570            {
571                let bundle_bindings = match byteor_runtime::load_single_ring_bundle_adapter_bindings(
572                    &spec,
573                    config_path.as_deref(),
574                ) {
575                    Ok(bindings) => bindings,
576                    Err(e) => {
577                        eprintln!("adapter bundle load failed: {e}");
578                        println!("{}", serde_json::json!({"ok": false, "error": e}));
579                        std::process::exit(1);
580                    }
581                };
582
583                if let Some(bindings) = bundle_bindings {
584                    let ingress_label = bindings.ingress.label();
585                    let egress_label = bindings.egress.label();
586                    let mut ingress = match byteor_runtime::open_ingress_endpoint(&bindings.ingress)
587                    {
588                        Ok(ingress) => ingress,
589                        Err(e) => {
590                            eprintln!("open ingress failed: {e}");
591                            println!("{}", serde_json::json!({"ok": false, "error": e}));
592                            std::process::exit(1);
593                        }
594                    };
595                    let mut egress = match byteor_runtime::open_egress_endpoint(&bindings.egress) {
596                        Ok(egress) => egress,
597                        Err(e) => {
598                            eprintln!("open egress failed: {e}");
599                            println!("{}", serde_json::json!({"ok": false, "error": e}));
600                            std::process::exit(1);
601                        }
602                    };
603
604                    println!(
605                        "{}",
606                        serde_json::json!({
607                            "ok": true,
608                            "running": true,
609                            "model": "single_ring",
610                            "mode": "adapter_loop",
611                            "config_path": bindings.config_path.display().to_string(),
612                            "ingress_endpoint": bindings.ingress_name,
613                            "egress_endpoint": bindings.egress_name,
614                            "ingress_adapter": bindings.ingress_adapter,
615                            "egress_adapter": bindings.egress_adapter,
616                            "ingress": ingress_label,
617                            "egress": egress_label,
618                            "tuning": tuning_report.clone(),
619                        })
620                    );
621
622                    if let Some(reporter) = runtime_reporter.as_ref() {
623                        if let Err(e) = reporter.report_heartbeat_now() {
624                            eprintln!("agent heartbeat failed: {e}");
625                        }
626                    }
627
628                    let reporting_keepalive = spawn_runtime_reporting_keepalive(
629                        runtime_reporter.as_ref(),
630                        Some(idle_adapter_snapshot_payload(
631                            &bindings.ingress_name,
632                            &bindings.ingress_adapter,
633                            &bindings.egress_name,
634                            &bindings.egress_adapter,
635                        )),
636                    );
637
638                    match byteor_adapters::run_single_ring_adapter_loop_with_options_and_observer(
639                        &spec_v1,
640                        &resolver,
641                        &mut ingress,
642                        &mut egress,
643                        byteor_adapters::AdapterLoopOptions {
644                            max_ingress_messages: adapter_max_messages,
645                        },
646                        |stats| {
647                            if let Some(reporter) = runtime_reporter.as_mut() {
648                                if let Err(e) = reporter.report_snapshot_if_due("single_ring", || {
649                                    Ok(adapter_snapshot_payload(stats, None))
650                                }) {
651                                    eprintln!("agent snapshot failed: {e}");
652                                }
653                            }
654                        },
655                    ) {
656                        Ok(stats) => {
657                            stop_runtime_reporting_keepalive(reporting_keepalive);
658                            report_adapter_snapshot(runtime_reporter.as_ref(), &stats, None);
659                            println!(
660                                "{}",
661                                serde_json::json!({
662                                    "ok": true,
663                                    "stopped": true,
664                                    "model": "single_ring",
665                                    "mode": "adapter_loop",
666                                    "ingress_messages": stats.ingress_messages,
667                                    "egress_messages": stats.egress_messages,
668                                })
669                            );
670                            std::process::exit(0);
671                        }
672                        Err(e) => {
673                            stop_runtime_reporting_keepalive(reporting_keepalive);
674                            report_adapter_snapshot(
675                                runtime_reporter.as_ref(),
676                                &e.stats,
677                                Some(e.to_string()),
678                            );
679                            eprintln!("run failed: {e}");
680                            println!(
681                                "{}",
682                                serde_json::json!({
683                                    "ok": false,
684                                    "error": e.to_string(),
685                                    "model": "single_ring",
686                                    "mode": "adapter_loop",
687                                    "ingress_messages": e.stats.ingress_messages,
688                                    "egress_messages": e.stats.egress_messages,
689                                })
690                            );
691                            std::process::exit(1);
692                        }
693                    }
694                }
695            }
696
697            if spec_v1.single_ring().is_none() {
698                if input_hex.is_some() || input_ascii.is_some() {
699                    eprintln!("--input-* is not supported for lane_graph");
700                    std::process::exit(2);
701                }
702
703                let stop = Arc::new(AtomicBool::new(false));
704                {
705                    let stop2 = stop.clone();
706                    ctrlc::set_handler(move || {
707                        stop2.store(true, Ordering::Relaxed);
708                    })
709                    .unwrap_or_else(|e| {
710                        eprintln!("failed to install ctrlc handler: {e}");
711                        std::process::exit(2);
712                    });
713                }
714
715                let thread_init = byteor_runtime::thread_init_hook_for_tuning(pinning, sched)
716                    .unwrap_or_else(|e| {
717                        eprintln!(
718                            "{}",
719                            byteor_runtime::tuning_host_readiness_error(format!(
720                                "tuning init failed: {e}"
721                            ))
722                        );
723                        std::process::exit(2);
724                    });
725
726                let engine = byteor_engine::Engine::new(byteor_engine::EngineConfig {
727                    stage_failure: byteor_engine::StageFailurePolicy::StopPipeline,
728                    wait,
729                    thread_init,
730                });
731
732                println!(
733                    "{}",
734                    serde_json::json!({
735                        "ok": true,
736                        "running": true,
737                        "model": "lane_graph",
738                        "mode": "sp",
739                        "tuning": tuning_report,
740                    })
741                );
742
743                let rt = engine
744                    .spawn_lane_graph_kv_v1_runtime(&spec, &resolver)
745                    .unwrap_or_else(|e| {
746                        eprintln!("run failed: {e}");
747                        println!(
748                            "{}",
749                            serde_json::json!({"ok": false, "error": e.to_string()})
750                        );
751                        std::process::exit(1);
752                    });
753
754                let lane_root = byteor_runtime::default_lane_graph_shm_dir(&spec);
755
756                while !stop.load(Ordering::Relaxed) {
757                    if let Some(reporter) = runtime_reporter.as_mut() {
758                        if let Err(e) = reporter.report_heartbeat_if_due() {
759                            eprintln!("agent heartbeat failed: {e}");
760                        }
761                        if let Err(e) = reporter.report_snapshot_if_due("lane_graph", || {
762                            let role_activity = rt.role_activity_snapshot();
763                            byteor_runtime::snapshot_lane_graph_json(
764                                &spec,
765                                Some(lane_root.as_path()),
766                                Some(role_activity.as_slice()),
767                            )
768                        }) {
769                            eprintln!("agent snapshot failed: {e}");
770                        }
771                    }
772                    std::thread::sleep(std::time::Duration::from_millis(50));
773                }
774
775                if let Some(reporter) = runtime_reporter.as_ref() {
776                    if let Err(e) = reporter.report_heartbeat_now() {
777                        eprintln!("agent heartbeat failed: {e}");
778                    }
779                    if let Err(e) = reporter.report_snapshot_now("lane_graph", || {
780                        let role_activity = rt.role_activity_snapshot();
781                        byteor_runtime::snapshot_lane_graph_json(
782                            &spec,
783                            Some(lane_root.as_path()),
784                            Some(role_activity.as_slice()),
785                        )
786                    }) {
787                        eprintln!("agent snapshot failed: {e}");
788                    }
789                }
790
791                if let Err(e) = rt.stop_and_join() {
792                    eprintln!("stop/join failed: {e}");
793                    println!(
794                        "{}",
795                        serde_json::json!({"ok": false, "error": e.to_string()})
796                    );
797                    std::process::exit(1);
798                }
799
800                println!("{}", serde_json::json!({"ok": true, "stopped": true}));
801                std::process::exit(0);
802            }
803
804            if !sp {
805                if input_hex.is_some() || input_ascii.is_some() {
806                    eprintln!("--input-* is only supported for --sp");
807                    std::process::exit(2);
808                }
809
810                let stop = Arc::new(AtomicBool::new(false));
811                {
812                    let stop2 = stop.clone();
813                    ctrlc::set_handler(move || {
814                        stop2.store(true, Ordering::Relaxed);
815                    })
816                    .unwrap_or_else(|e| {
817                        eprintln!("failed to install ctrlc handler: {e}");
818                        std::process::exit(2);
819                    });
820                }
821
822                println!(
823                    "{}",
824                    serde_json::json!({
825                        "ok": true,
826                        "running": true,
827                        "model": "single_ring",
828                        "lane_path": lane_path.display().to_string(),
829                    })
830                );
831
832                match byteor_runtime::run_kv_v1_single_ring_supervised_until_stop_with_resolver_and_tuning(
833                    &spec,
834                    lane_path,
835                    wait,
836                    stop,
837                    &resolver,
838                    pinning,
839                    sched,
840                ) {
841                    Ok(()) => {
842                        println!("{}", serde_json::json!({"ok": true, "stopped": true}));
843                        std::process::exit(0);
844                    }
845                    Err(e) => {
846                        eprintln!("run failed: {e}");
847                        println!("{}", serde_json::json!({"ok": false, "error": e}));
848                        std::process::exit(1);
849                    }
850                }
851            }
852
853            let input =
854                match byteor_runtime::parse_run_input(input_hex.as_deref(), input_ascii.as_deref())
855                {
856                    Ok(v) => v,
857                    Err(e) => {
858                        eprintln!("invalid input: {e}");
859                        std::process::exit(2);
860                    }
861                };
862
863            match byteor_runtime::run_sp_kv_v1_single_ring_with_resolver_and_tuning(
864                &spec, lane_path, input, wait, &resolver, pinning, sched,
865            ) {
866                Ok(v) => {
867                    println!("{}", serde_json::to_string_pretty(&v).unwrap());
868                    std::process::exit(0);
869                }
870                Err(e) => {
871                    eprintln!("run failed: {e}");
872                    println!("{}", serde_json::json!({"ok": false, "error": e}));
873                    std::process::exit(1);
874                }
875            }
876        }
877        "doctor" => {
878            let mut tuning = byteor_runtime::TuningCliState::new(byteor_core::paths::shm_parent_from_env());
879
880            for a in args {
881                if let Some(v) = a.strip_prefix("--profile=") {
882                    let profile = byteor_runtime::TuningProfile::parse(v).unwrap_or_else(|| {
883                        eprintln!(
884                            "unknown --profile: {v} (expected default|low-latency|isolated-core)"
885                        );
886                        std::process::exit(2);
887                    });
888                    tuning.set_profile(profile);
889                } else if let Some(v) = a.strip_prefix("--mlockall=") {
890                    let mlockall = match v {
891                        "on" | "1" | "true" | "yes" => byteor_runtime::MlockallPreset::On,
892                        "off" | "0" | "false" | "no" | "none" => byteor_runtime::MlockallPreset::None,
893                        _ => {
894                            eprintln!("unknown --mlockall: {v} (expected on|off)");
895                            std::process::exit(2);
896                        }
897                    };
898                    tuning.set_mlockall(mlockall);
899                } else if let Some(v) = a.strip_prefix("--shm-parent=") {
900                    tuning.set_shm_parent(PathBuf::from(v));
901                } else {
902                    eprintln!("unknown arg: {a}");
903                    std::process::exit(2);
904                }
905            }
906
907            let resolved = tuning.resolve();
908            let report = byteor_runtime::doctor_with_tuning(&resolved);
909            byteor_runtime::eprint_human(&report.checks);
910            println!("{}", serde_json::to_string_pretty(&report).unwrap());
911            std::process::exit(if report.ok { 0 } else { 1 });
912        }
913        "snapshot" => {
914            let mut ring_path: Option<PathBuf> = None;
915            let mut active_gating: usize = 4;
916
917            for a in args {
918                if let Some(v) = a.strip_prefix("--ring-path=") {
919                    ring_path = Some(PathBuf::from(v));
920                } else if let Some(v) = a.strip_prefix("--active-gating=") {
921                    active_gating = v.parse().unwrap_or(4);
922                } else {
923                    eprintln!("unknown arg: {a}");
924                    std::process::exit(2);
925                }
926            }
927
928            let Some(ring_path) = ring_path else {
929                eprintln!("missing --ring-path");
930                std::process::exit(2);
931            };
932
933            match byteor_runtime::snapshot_single_ring_json(ring_path, active_gating) {
934                Ok(v) => {
935                    println!("{}", serde_json::to_string_pretty(&v).unwrap());
936                    std::process::exit(0);
937                }
938                Err(e) => {
939                    eprintln!("snapshot failed: {e}");
940                    std::process::exit(1);
941                }
942            }
943        }
944        "incident-bundle" => {
945            let mut out: Option<PathBuf> = None;
946            let mut spec: Option<PathBuf> = None;
947            let mut config: Option<PathBuf> = None;
948            let mut ring_path: Option<PathBuf> = None;
949
950            for a in args {
951                if let Some(v) = a.strip_prefix("--out=") {
952                    out = Some(PathBuf::from(v));
953                } else if let Some(v) = a.strip_prefix("--spec=") {
954                    spec = Some(PathBuf::from(v));
955                } else if let Some(v) = a.strip_prefix("--config=") {
956                    config = Some(PathBuf::from(v));
957                } else if let Some(v) = a.strip_prefix("--ring-path=") {
958                    ring_path = Some(PathBuf::from(v));
959                } else {
960                    eprintln!("unknown arg: {a}");
961                    std::process::exit(2);
962                }
963            }
964
965            let Some(out) = out else {
966                eprintln!("missing --out");
967                std::process::exit(2);
968            };
969            let Some(spec) = spec else {
970                eprintln!("missing --spec");
971                std::process::exit(2);
972            };
973
974            let res = byteor_runtime::incident_bundle(
975                &out,
976                &spec,
977                config.as_deref(),
978                ring_path.as_deref(),
979            );
980
981            match res {
982                Ok(()) => {
983                    println!(
984                        "{}",
985                        serde_json::json!({"ok": true, "out": out.display().to_string() })
986                    );
987                    std::process::exit(0);
988                }
989                Err(e) => {
990                    eprintln!("incident-bundle failed: {e}");
991                    println!("{}", serde_json::json!({"ok": false, "error": e }));
992                    std::process::exit(1);
993                }
994            }
995        }
996        other => {
997            eprintln!("unknown command: {other}");
998            std::process::exit(2);
999        }
1000    }
1001}