1use std::path::PathBuf;
2use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
3use std::thread::{self, JoinHandle};
4use std::time::Duration;
5
6fn report_adapter_snapshot(
7 reporter: Option<&byteor_app_kit::agent::RuntimeReporter>,
8 stats: &byteor_adapters::AdapterLoopStats,
9 error: Option<String>,
10) {
11 let Some(reporter) = reporter else {
12 return;
13 };
14
15 if let Err(report_error) = reporter.report_snapshot_now("single_ring", || {
16 Ok(adapter_snapshot_payload(stats, error.clone()))
17 }) {
18 eprintln!("agent snapshot failed: {report_error}");
19 }
20 if let Err(report_error) = reporter.report_heartbeat_now() {
21 eprintln!("agent heartbeat failed: {report_error}");
22 }
23}
24
25fn adapter_snapshot_payload(
26 stats: &byteor_adapters::AdapterLoopStats,
27 error: Option<String>,
28) -> serde_json::Value {
29 let mut snapshot = serde_json::json!({
30 "mode": "adapter_loop",
31 "ok": error.is_none(),
32 "ingress_messages": stats.ingress_messages,
33 "egress_messages": stats.egress_messages,
34 });
35 if let Some(error) = error {
36 snapshot["error"] = serde_json::json!(error);
37 }
38 stats.merge_into_snapshot(&mut snapshot);
39 snapshot
40}
41
42fn idle_adapter_snapshot_payload(
43 ingress_name: &str,
44 ingress_transport: &str,
45 egress_name: &str,
46 egress_transport: &str,
47) -> serde_json::Value {
48 serde_json::json!({
49 "mode": "adapter_loop",
50 "ok": true,
51 "ingress_messages": 0,
52 "egress_messages": 0,
53 "adapters": [
54 {
55 "name": ingress_name,
56 "role": "ingress",
57 "transport": ingress_transport,
58 "connection_state": "ready",
59 "message_count": 0,
60 "recent_failures": 0,
61 "last_error": null,
62 "lag": null,
63 "details": {}
64 },
65 {
66 "name": egress_name,
67 "role": "egress",
68 "transport": egress_transport,
69 "connection_state": "ready",
70 "message_count": 0,
71 "recent_failures": 0,
72 "last_error": null,
73 "lag": null,
74 "details": {}
75 }
76 ]
77 })
78}
79
80fn spawn_runtime_reporting_keepalive(
81 reporter: Option<&byteor_app_kit::agent::RuntimeReporter>,
82 idle_snapshot: Option<serde_json::Value>,
83) -> Option<(Arc<AtomicBool>, JoinHandle<()>)> {
84 let mut reporter = reporter?.clone();
85 let stop = Arc::new(AtomicBool::new(false));
86 let stop_signal = Arc::clone(&stop);
87 let handle = thread::spawn(move || {
88 while !stop_signal.load(Ordering::Relaxed) {
89 if let Err(error) = reporter.report_heartbeat_if_due() {
90 eprintln!("agent heartbeat failed: {error}");
91 }
92 if let Some(snapshot) = idle_snapshot.as_ref() {
93 if let Err(error) = reporter.report_snapshot_if_due("single_ring", || Ok(snapshot.clone())) {
94 eprintln!("agent snapshot failed: {error}");
95 }
96 }
97 thread::sleep(Duration::from_millis(250));
98 }
99 });
100 Some((stop, handle))
101}
102
103fn stop_runtime_reporting_keepalive(
104 keepalive: Option<(Arc<AtomicBool>, JoinHandle<()>)>,
105) {
106 let Some((stop, handle)) = keepalive else {
107 return;
108 };
109 stop.store(true, Ordering::Relaxed);
110 let _ = handle.join();
111}
112
113fn main() {
114 let mut args = std::env::args().skip(1);
118 let cmd = args.next().unwrap_or_else(|| "help".to_string());
119
120 match cmd.as_str() {
121 "help" | "-h" | "--help" => {
122 eprintln!(
123 "byteor-edgeplane\n\nUSAGE:\n byteor-edgeplane validate --spec=/path\n byteor-edgeplane describe --spec=/path\n byteor-edgeplane dot --spec=/path\n byteor-edgeplane run --spec=/path [--profile=default|low-latency|isolated-core] [--config=/path] [--lane-path=/path] [--shm-parent=/path] [--mlockall=on|off] [--wait=spin|backoff] [--pin=none|balanced|physical] [--sched=other|fifo|rr] [--rt-prio=N] [--dry-run] [--env=dev|staging|prod] [--approval=<credential>] [--approval-key-set=/path] [--adapter-max-messages=N] # long-running (Ctrl-C to stop unless bounded)\n byteor-edgeplane run --sp --spec=/path [--profile=default|low-latency|isolated-core] [--config=/path] [--lane-path=/path] [--shm-parent=/path] [--mlockall=on|off] [--wait=spin|backoff] [--pin=none|balanced|physical] [--sched=other|fifo|rr] [--rt-prio=N] [--input-hex=..|--input-ascii=..] [--dry-run] [--env=dev|staging|prod] [--approval=<credential>] [--approval-key-set=/path] # one-shot SingleRing or long-running LaneGraph\n byteor-edgeplane doctor [--profile=default|low-latency|isolated-core] [--mlockall=on|off] [--shm-parent=/path]\n byteor-edgeplane snapshot --ring-path=/path [--active-gating=N]\n byteor-edgeplane incident-bundle --out=/path --spec=/path [--config=/path] [--ring-path=/path]\n\nNOTES:\n - ResolverKey stages supported:\n - http_post:<url>\n - exec:<program>|<arg>|...\n"
124 );
125 std::process::exit(2);
126 }
127 "validate" => {
128 let mut spec: Option<PathBuf> = None;
129 for a in args {
130 if let Some(v) = a.strip_prefix("--spec=") {
131 spec = Some(PathBuf::from(v));
132 } else {
133 eprintln!("unknown arg: {a}");
134 std::process::exit(2);
135 }
136 }
137
138 let Some(spec) = spec else {
139 eprintln!("missing --spec");
140 std::process::exit(2);
141 };
142
143 match byteor_runtime::validate_spec_kv_v1(&spec) {
144 Ok(()) => {
145 println!("{}", serde_json::json!({"ok": true}));
146 std::process::exit(0);
147 }
148 Err(e) => {
149 eprintln!("validate failed: {e}");
150 println!("{}", serde_json::json!({"ok": false, "error": e}));
151 std::process::exit(1);
152 }
153 }
154 }
155 "describe" => {
156 let mut spec: Option<PathBuf> = None;
157 for a in args {
158 if let Some(v) = a.strip_prefix("--spec=") {
159 spec = Some(PathBuf::from(v));
160 } else {
161 eprintln!("unknown arg: {a}");
162 std::process::exit(2);
163 }
164 }
165
166 let Some(spec) = spec else {
167 eprintln!("missing --spec");
168 std::process::exit(2);
169 };
170
171 match byteor_runtime::describe_spec_kv_v1(&spec) {
172 Ok(s) => {
173 print!("{s}");
174 std::process::exit(0);
175 }
176 Err(e) => {
177 eprintln!("describe failed: {e}");
178 std::process::exit(1);
179 }
180 }
181 }
182 "dot" => {
183 let mut spec: Option<PathBuf> = None;
184 for a in args {
185 if let Some(v) = a.strip_prefix("--spec=") {
186 spec = Some(PathBuf::from(v));
187 } else {
188 eprintln!("unknown arg: {a}");
189 std::process::exit(2);
190 }
191 }
192
193 let Some(spec) = spec else {
194 eprintln!("missing --spec");
195 std::process::exit(2);
196 };
197
198 match byteor_runtime::dot_spec_kv_v1(&spec) {
199 Ok(s) => {
200 print!("{s}");
201 std::process::exit(0);
202 }
203 Err(e) => {
204 eprintln!("dot failed: {e}");
205 std::process::exit(1);
206 }
207 }
208 }
209 "run" => {
210 let mut sp = false;
211 let mut spec: Option<PathBuf> = None;
212 let mut lane_path: Option<PathBuf> = None;
213 let mut rt_prio: Option<u8> = None;
214 let mut input_hex: Option<String> = None;
215 let mut input_ascii: Option<String> = None;
216 let mut dry_run = false;
217 let mut env = byteor_core::config::Environment::Dev;
218 let mut approval: Option<String> = None;
219 let mut approval_key_sets: Vec<PathBuf> = Vec::new();
220 let mut config_path: Option<PathBuf> = None;
221 let mut adapter_max_messages: Option<u64> = None;
222 let mut tuning = byteor_runtime::TuningCliState::new(byteor_core::paths::shm_parent_from_env());
223
224 for a in args {
225 if a == "--sp" {
226 sp = true;
227 } else if a == "--dry-run" {
228 dry_run = true;
229 } else if let Some(v) = a.strip_prefix("--dry-run=") {
230 dry_run = matches!(v, "on" | "1" | "true" | "yes");
231 } else if let Some(v) = a.strip_prefix("--spec=") {
232 spec = Some(PathBuf::from(v));
233 } else if let Some(v) = a.strip_prefix("--lane-path=") {
234 lane_path = Some(PathBuf::from(v));
235 } else if let Some(v) = a.strip_prefix("--profile=") {
236 let profile = byteor_runtime::TuningProfile::parse(v).unwrap_or_else(|| {
237 eprintln!(
238 "unknown --profile: {v} (expected default|low-latency|isolated-core)"
239 );
240 std::process::exit(2);
241 });
242 tuning.set_profile(profile);
243 } else if let Some(v) = a.strip_prefix("--shm-parent=") {
244 tuning.set_shm_parent(PathBuf::from(v));
245 } else if let Some(v) = a.strip_prefix("--mlockall=") {
246 let preset = match v {
247 "on" | "1" | "true" | "yes" => byteor_runtime::MlockallPreset::On,
248 "off" | "0" | "false" | "no" | "none" => {
249 byteor_runtime::MlockallPreset::None
250 }
251 _ => {
252 eprintln!("unknown --mlockall: {v} (expected on|off)");
253 std::process::exit(2);
254 }
255 };
256 tuning.set_mlockall(preset);
257 } else if let Some(v) = a.strip_prefix("--sched=") {
258 let parsed_sched = match byteor_runtime::SchedPreset::parse_kind(v) {
259 Some(x) => x,
260 None => {
261 eprintln!("unknown --sched: {v} (expected other|fifo|rr)");
262 std::process::exit(2);
263 }
264 };
265 tuning.set_sched(parsed_sched);
266 } else if let Some(v) = a.strip_prefix("--rt-prio=") {
267 rt_prio = Some(v.parse().unwrap_or_else(|_| {
268 eprintln!("invalid --rt-prio: {v}");
269 std::process::exit(2);
270 }));
271 } else if let Some(v) = a.strip_prefix("--env=") {
272 env = match byteor_core::config::Environment::parse(v) {
273 Some(x) => x,
274 None => {
275 eprintln!("unknown --env: {v} (expected dev|staging|prod)");
276 std::process::exit(2);
277 }
278 };
279 } else if let Some(v) = a.strip_prefix("--approval=") {
280 approval = Some(v.to_string());
281 } else if let Some(v) = a.strip_prefix("--approval-key-set=") {
282 approval_key_sets.push(PathBuf::from(v));
283 } else if let Some(v) = a.strip_prefix("--config=") {
284 config_path = Some(PathBuf::from(v));
285 } else if let Some(v) = a.strip_prefix("--adapter-max-messages=") {
286 let parsed = v.parse().unwrap_or_else(|_| {
287 eprintln!("invalid --adapter-max-messages: {v}");
288 std::process::exit(2);
289 });
290 if parsed == 0 {
291 eprintln!("invalid --adapter-max-messages: {v} (expected > 0)");
292 std::process::exit(2);
293 }
294 adapter_max_messages = Some(parsed);
295 } else if let Some(v) = a.strip_prefix("--wait=") {
296 let parsed_wait = byteor_runtime::parse_wait_preset(v).unwrap_or_else(|| {
297 eprintln!("unknown --wait preset: {v}");
298 std::process::exit(2);
299 });
300 tuning.set_wait(parsed_wait);
301 } else if let Some(v) = a.strip_prefix("--pin=") {
302 let parsed_pinning = match byteor_runtime::PinningPreset::parse(v) {
303 Some(p) => p,
304 None => {
305 eprintln!(
306 "unknown --pin preset: {v} (expected none|balanced|physical)"
307 );
308 std::process::exit(2);
309 }
310 };
311 tuning.set_pinning(parsed_pinning);
312 } else if let Some(v) = a.strip_prefix("--input-hex=") {
313 input_hex = Some(v.to_string());
314 } else if let Some(v) = a.strip_prefix("--input-ascii=") {
315 input_ascii = Some(v.to_string());
316 } else {
317 eprintln!("unknown arg: {a}");
318 std::process::exit(2);
319 }
320 }
321 let Some(spec) = spec else {
322 eprintln!("missing --spec");
323 std::process::exit(2);
324 };
325
326 let mut resolved_tuning = tuning.resolve();
327 if let Some(p) = rt_prio {
328 if resolved_tuning.sched == byteor_runtime::SchedPreset::Other {
329 eprintln!("--rt-prio requires --sched=fifo|rr");
330 std::process::exit(2);
331 }
332 if !(1..=99).contains(&p) {
333 eprintln!("invalid --rt-prio: {p} (expected 1..=99)");
334 std::process::exit(2);
335 }
336 resolved_tuning.sched = resolved_tuning.sched.with_rt_prio(p);
337 }
338 let profile = resolved_tuning.profile;
339 let wait = resolved_tuning.wait;
340 let pinning = resolved_tuning.pinning;
341 let mlockall = resolved_tuning.mlockall;
342 let sched = resolved_tuning.sched;
343 let shm_parent = resolved_tuning.shm_parent.clone();
344 let tuning_report = byteor_runtime::effective_tuning_report(profile, &resolved_tuning);
345
346 let lane_path = byteor_core::paths::default_lane_path_for_run(
347 byteor_core::Product::Edgeplane,
348 lane_path,
349 &shm_parent,
350 );
351
352 if dry_run {
353 match byteor_runtime::read_spec_kv_v1(&spec) {
354 Ok(spec_v1) => {
355 let stages = spec_v1.single_ring().map(|r| r.stages.len()).unwrap_or(0);
356 let resolved_config_path = config_path
357 .clone()
358 .or_else(|| byteor_runtime::default_bundle_config_path(&spec));
359 println!(
360 "{}",
361 serde_json::json!({
362 "ok": true,
363 "dry_run": true,
364 "model": format!("{:?}", spec_v1.model()).to_lowercase(),
365 "stages": stages,
366 "lane_path": lane_path.display().to_string(),
367 "config_path": resolved_config_path
368 .as_ref()
369 .map(|path| path.display().to_string()),
370 })
371 );
372 std::process::exit(0);
373 }
374 Err(e) => {
375 eprintln!("dry-run failed: {e}");
376 println!("{}", serde_json::json!({"ok": false, "error": e}));
377 std::process::exit(1);
378 }
379 }
380 }
381
382 if let Err(e) = byteor_runtime::apply_mlockall_preset(mlockall) {
383 eprintln!(
384 "{}",
385 byteor_runtime::tuning_host_readiness_error(format!(
386 "mlockall apply failed: preset={} err={e}",
387 mlockall.as_str()
388 ))
389 );
390 std::process::exit(1);
391 }
392
393 if let Err(e) = byteor_runtime::apply_sched_preset(sched) {
394 eprintln!(
395 "{}",
396 byteor_runtime::tuning_host_readiness_error(format!(
397 "sched apply failed: sched={} prio={:?} err={e}",
398 sched.kind_str(),
399 sched.rt_prio()
400 ))
401 );
402 std::process::exit(1);
403 }
404
405 let canonical_spec_kv = match byteor_runtime::canonical_encode_spec_kv_v1(&spec) {
406 Ok(spec_kv) => spec_kv,
407 Err(e) => {
408 eprintln!("canonical spec failed: {e}");
409 println!("{}", serde_json::json!({"ok": false, "error": e}));
410 std::process::exit(1);
411 }
412 };
413
414 if approval.is_none() {
415 approval =
416 match byteor_app_kit::approval::load_approval_credential_from_bundle(&spec) {
417 Ok(credential) => credential,
418 Err(e) => {
419 eprintln!("approval credential load failed: {e}");
420 println!("{}", serde_json::json!({"ok": false, "error": e}));
421 std::process::exit(1);
422 }
423 };
424 }
425
426 if approval_key_sets.is_empty() {
427 approval_key_sets =
428 byteor_app_kit::approval::discover_trusted_approval_key_paths(&spec);
429 }
430
431 let approval_refresh_bootstrap =
432 match byteor_app_kit::approval::approval_refresh_bootstrap_from_env() {
433 Ok(bootstrap) => bootstrap,
434 Err(e) => {
435 eprintln!("approval key refresh bootstrap failed: {e}");
436 println!("{}", serde_json::json!({"ok": false, "error": e}));
437 std::process::exit(1);
438 }
439 };
440
441 let trusted_approval_cache =
442 match byteor_app_kit::approval::load_runtime_trusted_approval_key_cache(
443 &spec,
444 &approval_key_sets,
445 approval_refresh_bootstrap.as_ref(),
446 ) {
447 Ok(cache) => cache,
448 Err(e) => {
449 eprintln!("approval key set load failed: {e}");
450 println!("{}", serde_json::json!({"ok": false, "error": e}));
451 std::process::exit(1);
452 }
453 };
454
455 let approval_resolution = match byteor_app_kit::approval::resolve_runtime_approval(
456 &canonical_spec_kv,
457 env,
458 approval.as_deref(),
459 &trusted_approval_cache.keys,
460 ) {
461 Ok(resolution) => resolution,
462 Err(byteor_app_kit::approval::ApprovalVerificationError::UnknownKeyId(_))
463 if approval_refresh_bootstrap.is_some() =>
464 {
465 let refreshed_cache =
466 match byteor_app_kit::approval::refresh_runtime_trusted_approval_key_cache(
467 &spec,
468 &approval_key_sets,
469 approval_refresh_bootstrap
470 .as_ref()
471 .expect("refresh bootstrap present"),
472 ) {
473 Ok(cache) => cache,
474 Err(refresh_error) => {
475 let message = format!(
476 "approval verification failed: unknown approval signing key and refresh failed: {refresh_error}"
477 );
478 eprintln!("{message}");
479 println!("{}", serde_json::json!({"ok": false, "error": message}));
480 std::process::exit(1);
481 }
482 };
483
484 match byteor_app_kit::approval::resolve_runtime_approval(
485 &canonical_spec_kv,
486 env,
487 approval.as_deref(),
488 &refreshed_cache.keys,
489 ) {
490 Ok(resolution) => resolution,
491 Err(e) => {
492 let message = format!("approval verification failed: {e}");
493 eprintln!("{message}");
494 println!("{}", serde_json::json!({"ok": false, "error": message}));
495 std::process::exit(1);
496 }
497 }
498 }
499 Err(e) => {
500 let message = format!("approval verification failed: {e}");
501 eprintln!("{message}");
502 println!("{}", serde_json::json!({"ok": false, "error": message}));
503 std::process::exit(1);
504 }
505 };
506
507 let reporting_config = match byteor_app_kit::agent::reporting_config_from_bootstrap(
508 approval_refresh_bootstrap.as_ref(),
509 ) {
510 Ok(config) => config,
511 Err(e) => {
512 eprintln!("agent reporting bootstrap failed: {e}");
513 println!("{}", serde_json::json!({"ok": false, "error": e}));
514 std::process::exit(1);
515 }
516 };
517
518 let approval_trust = match byteor_app_kit::agent::approval_trust_status(
519 &trusted_approval_cache,
520 approval.as_deref(),
521 ) {
522 Ok(status) => Some(status),
523 Err(e) => {
524 eprintln!("agent approval trust export failed: {e}");
525 println!("{}", serde_json::json!({"ok": false, "error": e}));
526 std::process::exit(1);
527 }
528 };
529
530 let mut stage_catalog = byteor_runtime::default_stage_catalog_entries();
531 stage_catalog.extend(byteor_app_kit::actions::action_stage_catalog_entries());
532 let capability_document =
533 byteor_app_kit::capabilities::runtime_capability_document(
534 "byteor-edgeplane",
535 stage_catalog,
536 );
537
538 let mut runtime_reporter = reporting_config.map(|config| {
539 byteor_app_kit::agent::RuntimeReporter::new(
540 config,
541 env!("CARGO_PKG_VERSION").to_string(),
542 approval_resolution.spec_hash.clone(),
543 capability_document,
544 approval_trust.clone(),
545 )
546 });
547
548 let resolver = byteor_app_kit::actions::build_action_stages_default_replay(
551 env,
552 approval_resolution.approval_token_present,
553 false, false, );
556
557 let spec_v1 = match byteor_runtime::read_spec_kv_v1(&spec) {
558 Ok(spec_v1) => spec_v1,
559 Err(e) => {
560 eprintln!("read spec failed: {e}");
561 println!("{}", serde_json::json!({"ok": false, "error": e}));
562 std::process::exit(1);
563 }
564 };
565
566 if spec_v1.single_ring().is_some()
567 && !sp
568 && input_hex.is_none()
569 && input_ascii.is_none()
570 {
571 let bundle_bindings = match byteor_runtime::load_single_ring_bundle_adapter_bindings(
572 &spec,
573 config_path.as_deref(),
574 ) {
575 Ok(bindings) => bindings,
576 Err(e) => {
577 eprintln!("adapter bundle load failed: {e}");
578 println!("{}", serde_json::json!({"ok": false, "error": e}));
579 std::process::exit(1);
580 }
581 };
582
583 if let Some(bindings) = bundle_bindings {
584 let ingress_label = bindings.ingress.label();
585 let egress_label = bindings.egress.label();
586 let mut ingress = match byteor_runtime::open_ingress_endpoint(&bindings.ingress)
587 {
588 Ok(ingress) => ingress,
589 Err(e) => {
590 eprintln!("open ingress failed: {e}");
591 println!("{}", serde_json::json!({"ok": false, "error": e}));
592 std::process::exit(1);
593 }
594 };
595 let mut egress = match byteor_runtime::open_egress_endpoint(&bindings.egress) {
596 Ok(egress) => egress,
597 Err(e) => {
598 eprintln!("open egress failed: {e}");
599 println!("{}", serde_json::json!({"ok": false, "error": e}));
600 std::process::exit(1);
601 }
602 };
603
604 println!(
605 "{}",
606 serde_json::json!({
607 "ok": true,
608 "running": true,
609 "model": "single_ring",
610 "mode": "adapter_loop",
611 "config_path": bindings.config_path.display().to_string(),
612 "ingress_endpoint": bindings.ingress_name,
613 "egress_endpoint": bindings.egress_name,
614 "ingress_adapter": bindings.ingress_adapter,
615 "egress_adapter": bindings.egress_adapter,
616 "ingress": ingress_label,
617 "egress": egress_label,
618 "tuning": tuning_report.clone(),
619 })
620 );
621
622 if let Some(reporter) = runtime_reporter.as_ref() {
623 if let Err(e) = reporter.report_heartbeat_now() {
624 eprintln!("agent heartbeat failed: {e}");
625 }
626 }
627
628 let reporting_keepalive = spawn_runtime_reporting_keepalive(
629 runtime_reporter.as_ref(),
630 Some(idle_adapter_snapshot_payload(
631 &bindings.ingress_name,
632 &bindings.ingress_adapter,
633 &bindings.egress_name,
634 &bindings.egress_adapter,
635 )),
636 );
637
638 match byteor_adapters::run_single_ring_adapter_loop_with_options_and_observer(
639 &spec_v1,
640 &resolver,
641 &mut ingress,
642 &mut egress,
643 byteor_adapters::AdapterLoopOptions {
644 max_ingress_messages: adapter_max_messages,
645 },
646 |stats| {
647 if let Some(reporter) = runtime_reporter.as_mut() {
648 if let Err(e) = reporter.report_snapshot_if_due("single_ring", || {
649 Ok(adapter_snapshot_payload(stats, None))
650 }) {
651 eprintln!("agent snapshot failed: {e}");
652 }
653 }
654 },
655 ) {
656 Ok(stats) => {
657 stop_runtime_reporting_keepalive(reporting_keepalive);
658 report_adapter_snapshot(runtime_reporter.as_ref(), &stats, None);
659 println!(
660 "{}",
661 serde_json::json!({
662 "ok": true,
663 "stopped": true,
664 "model": "single_ring",
665 "mode": "adapter_loop",
666 "ingress_messages": stats.ingress_messages,
667 "egress_messages": stats.egress_messages,
668 })
669 );
670 std::process::exit(0);
671 }
672 Err(e) => {
673 stop_runtime_reporting_keepalive(reporting_keepalive);
674 report_adapter_snapshot(
675 runtime_reporter.as_ref(),
676 &e.stats,
677 Some(e.to_string()),
678 );
679 eprintln!("run failed: {e}");
680 println!(
681 "{}",
682 serde_json::json!({
683 "ok": false,
684 "error": e.to_string(),
685 "model": "single_ring",
686 "mode": "adapter_loop",
687 "ingress_messages": e.stats.ingress_messages,
688 "egress_messages": e.stats.egress_messages,
689 })
690 );
691 std::process::exit(1);
692 }
693 }
694 }
695 }
696
697 if spec_v1.single_ring().is_none() {
698 if input_hex.is_some() || input_ascii.is_some() {
699 eprintln!("--input-* is not supported for lane_graph");
700 std::process::exit(2);
701 }
702
703 let stop = Arc::new(AtomicBool::new(false));
704 {
705 let stop2 = stop.clone();
706 ctrlc::set_handler(move || {
707 stop2.store(true, Ordering::Relaxed);
708 })
709 .unwrap_or_else(|e| {
710 eprintln!("failed to install ctrlc handler: {e}");
711 std::process::exit(2);
712 });
713 }
714
715 let thread_init = byteor_runtime::thread_init_hook_for_tuning(pinning, sched)
716 .unwrap_or_else(|e| {
717 eprintln!(
718 "{}",
719 byteor_runtime::tuning_host_readiness_error(format!(
720 "tuning init failed: {e}"
721 ))
722 );
723 std::process::exit(2);
724 });
725
726 let engine = byteor_engine::Engine::new(byteor_engine::EngineConfig {
727 stage_failure: byteor_engine::StageFailurePolicy::StopPipeline,
728 wait,
729 thread_init,
730 });
731
732 println!(
733 "{}",
734 serde_json::json!({
735 "ok": true,
736 "running": true,
737 "model": "lane_graph",
738 "mode": "sp",
739 "tuning": tuning_report,
740 })
741 );
742
743 let rt = engine
744 .spawn_lane_graph_kv_v1_runtime(&spec, &resolver)
745 .unwrap_or_else(|e| {
746 eprintln!("run failed: {e}");
747 println!(
748 "{}",
749 serde_json::json!({"ok": false, "error": e.to_string()})
750 );
751 std::process::exit(1);
752 });
753
754 let lane_root = byteor_runtime::default_lane_graph_shm_dir(&spec);
755
756 while !stop.load(Ordering::Relaxed) {
757 if let Some(reporter) = runtime_reporter.as_mut() {
758 if let Err(e) = reporter.report_heartbeat_if_due() {
759 eprintln!("agent heartbeat failed: {e}");
760 }
761 if let Err(e) = reporter.report_snapshot_if_due("lane_graph", || {
762 let role_activity = rt.role_activity_snapshot();
763 byteor_runtime::snapshot_lane_graph_json(
764 &spec,
765 Some(lane_root.as_path()),
766 Some(role_activity.as_slice()),
767 )
768 }) {
769 eprintln!("agent snapshot failed: {e}");
770 }
771 }
772 std::thread::sleep(std::time::Duration::from_millis(50));
773 }
774
775 if let Some(reporter) = runtime_reporter.as_ref() {
776 if let Err(e) = reporter.report_heartbeat_now() {
777 eprintln!("agent heartbeat failed: {e}");
778 }
779 if let Err(e) = reporter.report_snapshot_now("lane_graph", || {
780 let role_activity = rt.role_activity_snapshot();
781 byteor_runtime::snapshot_lane_graph_json(
782 &spec,
783 Some(lane_root.as_path()),
784 Some(role_activity.as_slice()),
785 )
786 }) {
787 eprintln!("agent snapshot failed: {e}");
788 }
789 }
790
791 if let Err(e) = rt.stop_and_join() {
792 eprintln!("stop/join failed: {e}");
793 println!(
794 "{}",
795 serde_json::json!({"ok": false, "error": e.to_string()})
796 );
797 std::process::exit(1);
798 }
799
800 println!("{}", serde_json::json!({"ok": true, "stopped": true}));
801 std::process::exit(0);
802 }
803
804 if !sp {
805 if input_hex.is_some() || input_ascii.is_some() {
806 eprintln!("--input-* is only supported for --sp");
807 std::process::exit(2);
808 }
809
810 let stop = Arc::new(AtomicBool::new(false));
811 {
812 let stop2 = stop.clone();
813 ctrlc::set_handler(move || {
814 stop2.store(true, Ordering::Relaxed);
815 })
816 .unwrap_or_else(|e| {
817 eprintln!("failed to install ctrlc handler: {e}");
818 std::process::exit(2);
819 });
820 }
821
822 println!(
823 "{}",
824 serde_json::json!({
825 "ok": true,
826 "running": true,
827 "model": "single_ring",
828 "lane_path": lane_path.display().to_string(),
829 })
830 );
831
832 match byteor_runtime::run_kv_v1_single_ring_supervised_until_stop_with_resolver_and_tuning(
833 &spec,
834 lane_path,
835 wait,
836 stop,
837 &resolver,
838 pinning,
839 sched,
840 ) {
841 Ok(()) => {
842 println!("{}", serde_json::json!({"ok": true, "stopped": true}));
843 std::process::exit(0);
844 }
845 Err(e) => {
846 eprintln!("run failed: {e}");
847 println!("{}", serde_json::json!({"ok": false, "error": e}));
848 std::process::exit(1);
849 }
850 }
851 }
852
853 let input =
854 match byteor_runtime::parse_run_input(input_hex.as_deref(), input_ascii.as_deref())
855 {
856 Ok(v) => v,
857 Err(e) => {
858 eprintln!("invalid input: {e}");
859 std::process::exit(2);
860 }
861 };
862
863 match byteor_runtime::run_sp_kv_v1_single_ring_with_resolver_and_tuning(
864 &spec, lane_path, input, wait, &resolver, pinning, sched,
865 ) {
866 Ok(v) => {
867 println!("{}", serde_json::to_string_pretty(&v).unwrap());
868 std::process::exit(0);
869 }
870 Err(e) => {
871 eprintln!("run failed: {e}");
872 println!("{}", serde_json::json!({"ok": false, "error": e}));
873 std::process::exit(1);
874 }
875 }
876 }
877 "doctor" => {
878 let mut tuning = byteor_runtime::TuningCliState::new(byteor_core::paths::shm_parent_from_env());
879
880 for a in args {
881 if let Some(v) = a.strip_prefix("--profile=") {
882 let profile = byteor_runtime::TuningProfile::parse(v).unwrap_or_else(|| {
883 eprintln!(
884 "unknown --profile: {v} (expected default|low-latency|isolated-core)"
885 );
886 std::process::exit(2);
887 });
888 tuning.set_profile(profile);
889 } else if let Some(v) = a.strip_prefix("--mlockall=") {
890 let mlockall = match v {
891 "on" | "1" | "true" | "yes" => byteor_runtime::MlockallPreset::On,
892 "off" | "0" | "false" | "no" | "none" => byteor_runtime::MlockallPreset::None,
893 _ => {
894 eprintln!("unknown --mlockall: {v} (expected on|off)");
895 std::process::exit(2);
896 }
897 };
898 tuning.set_mlockall(mlockall);
899 } else if let Some(v) = a.strip_prefix("--shm-parent=") {
900 tuning.set_shm_parent(PathBuf::from(v));
901 } else {
902 eprintln!("unknown arg: {a}");
903 std::process::exit(2);
904 }
905 }
906
907 let resolved = tuning.resolve();
908 let report = byteor_runtime::doctor_with_tuning(&resolved);
909 byteor_runtime::eprint_human(&report.checks);
910 println!("{}", serde_json::to_string_pretty(&report).unwrap());
911 std::process::exit(if report.ok { 0 } else { 1 });
912 }
913 "snapshot" => {
914 let mut ring_path: Option<PathBuf> = None;
915 let mut active_gating: usize = 4;
916
917 for a in args {
918 if let Some(v) = a.strip_prefix("--ring-path=") {
919 ring_path = Some(PathBuf::from(v));
920 } else if let Some(v) = a.strip_prefix("--active-gating=") {
921 active_gating = v.parse().unwrap_or(4);
922 } else {
923 eprintln!("unknown arg: {a}");
924 std::process::exit(2);
925 }
926 }
927
928 let Some(ring_path) = ring_path else {
929 eprintln!("missing --ring-path");
930 std::process::exit(2);
931 };
932
933 match byteor_runtime::snapshot_single_ring_json(ring_path, active_gating) {
934 Ok(v) => {
935 println!("{}", serde_json::to_string_pretty(&v).unwrap());
936 std::process::exit(0);
937 }
938 Err(e) => {
939 eprintln!("snapshot failed: {e}");
940 std::process::exit(1);
941 }
942 }
943 }
944 "incident-bundle" => {
945 let mut out: Option<PathBuf> = None;
946 let mut spec: Option<PathBuf> = None;
947 let mut config: Option<PathBuf> = None;
948 let mut ring_path: Option<PathBuf> = None;
949
950 for a in args {
951 if let Some(v) = a.strip_prefix("--out=") {
952 out = Some(PathBuf::from(v));
953 } else if let Some(v) = a.strip_prefix("--spec=") {
954 spec = Some(PathBuf::from(v));
955 } else if let Some(v) = a.strip_prefix("--config=") {
956 config = Some(PathBuf::from(v));
957 } else if let Some(v) = a.strip_prefix("--ring-path=") {
958 ring_path = Some(PathBuf::from(v));
959 } else {
960 eprintln!("unknown arg: {a}");
961 std::process::exit(2);
962 }
963 }
964
965 let Some(out) = out else {
966 eprintln!("missing --out");
967 std::process::exit(2);
968 };
969 let Some(spec) = spec else {
970 eprintln!("missing --spec");
971 std::process::exit(2);
972 };
973
974 let res = byteor_runtime::incident_bundle(
975 &out,
976 &spec,
977 config.as_deref(),
978 ring_path.as_deref(),
979 );
980
981 match res {
982 Ok(()) => {
983 println!(
984 "{}",
985 serde_json::json!({"ok": true, "out": out.display().to_string() })
986 );
987 std::process::exit(0);
988 }
989 Err(e) => {
990 eprintln!("incident-bundle failed: {e}");
991 println!("{}", serde_json::json!({"ok": false, "error": e }));
992 std::process::exit(1);
993 }
994 }
995 }
996 other => {
997 eprintln!("unknown command: {other}");
998 std::process::exit(2);
999 }
1000 }
1001}