1use std::time::{Duration, Instant};
2
3use serde::Serialize;
4use serde_json::Value;
5use time::{format_description::well_known::Rfc3339, OffsetDateTime};
6
7use crate::approval::{
8 approval_credential_expires_at, ApprovalRefreshBootstrap, TrustedApprovalKeyCache,
9};
10use crate::capabilities::RuntimeCapabilityDocument;
11
12#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct AgentReportingConfig {
15 pub api_base_url: String,
17 pub agent_id: String,
19 pub agent_api_key: String,
21 pub heartbeat_interval: Duration,
23 pub snapshot_interval: Duration,
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
29#[serde(rename_all = "camelCase")]
30pub struct AgentApprovalTrustStatus {
31 pub key_source: crate::approval::ApprovalTrustSource,
33 #[serde(skip_serializing_if = "Option::is_none")]
35 pub last_refresh_at: Option<String>,
36 #[serde(skip_serializing_if = "Option::is_none")]
38 pub credential_expires_at: Option<String>,
39}
40
41#[derive(Debug, Serialize)]
42#[serde(rename_all = "camelCase")]
43struct AgentHeartbeatRequest<'a> {
44 runtime_version: &'a str,
45 os: &'a str,
46 arch: &'a str,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 hostname: Option<&'a str>,
49 capabilities: &'a RuntimeCapabilityDocument,
50 active_spec_hash: &'a str,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 approval_trust: Option<&'a AgentApprovalTrustStatus>,
53}
54
55#[derive(Debug, Serialize)]
56#[serde(rename_all = "camelCase")]
57struct AgentSnapshotRequest {
58 spec_hash: String,
59 model: String,
60 captured_at: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 single_ring: Option<Value>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 lane_graph: Option<Value>,
65}
66
67#[derive(Debug, Clone)]
69pub struct RuntimeReporter {
70 config: AgentReportingConfig,
71 runtime_version: String,
72 os: String,
73 arch: String,
74 hostname: Option<String>,
75 capabilities: RuntimeCapabilityDocument,
76 active_spec_hash: String,
77 approval_trust: Option<AgentApprovalTrustStatus>,
78 next_heartbeat_at: Instant,
79 next_snapshot_at: Instant,
80}
81
82impl RuntimeReporter {
83 pub fn new(
85 config: AgentReportingConfig,
86 runtime_version: String,
87 active_spec_hash: String,
88 capabilities: RuntimeCapabilityDocument,
89 approval_trust: Option<AgentApprovalTrustStatus>,
90 ) -> Self {
91 let now = Instant::now();
92 Self {
93 config,
94 runtime_version,
95 os: std::env::consts::OS.to_string(),
96 arch: std::env::consts::ARCH.to_string(),
97 hostname: hostname_from_env(),
98 capabilities,
99 active_spec_hash,
100 approval_trust,
101 next_heartbeat_at: now,
102 next_snapshot_at: now,
103 }
104 }
105
106 pub fn report_heartbeat_now(&self) -> Result<(), String> {
108 let payload = AgentHeartbeatRequest {
109 runtime_version: self.runtime_version.as_str(),
110 os: self.os.as_str(),
111 arch: self.arch.as_str(),
112 hostname: self.hostname.as_deref(),
113 capabilities: &self.capabilities,
114 active_spec_hash: self.active_spec_hash.as_str(),
115 approval_trust: self.approval_trust.as_ref(),
116 };
117 post_json(
118 &self.config,
119 format!("/api/v1/agents/{}/heartbeat", self.config.agent_id).as_str(),
120 &payload,
121 )
122 }
123
124 pub fn report_heartbeat_if_due(&mut self) -> Result<(), String> {
126 if !self.heartbeat_due() {
127 return Ok(());
128 }
129 self.report_heartbeat_now()
130 }
131
132 pub fn report_snapshot_now<F>(&self, model: &str, build_snapshot: F) -> Result<(), String>
134 where
135 F: FnOnce() -> Result<Value, String>,
136 {
137 let model = match model {
138 "single_ring" => "SingleRing",
139 "lane_graph" => "LaneGraph",
140 other => return Err(format!("unsupported snapshot model: {other}")),
141 };
142 let snapshot = build_snapshot()?;
143 let captured_at = capture_timestamp()?;
144 let (single_ring, lane_graph) = match model {
145 "SingleRing" => (Some(snapshot), None),
146 "LaneGraph" => (None, Some(snapshot)),
147 _ => unreachable!("snapshot model already validated"),
148 };
149 let payload = AgentSnapshotRequest {
150 spec_hash: self.active_spec_hash.clone(),
151 model: model.to_string(),
152 captured_at,
153 single_ring,
154 lane_graph,
155 };
156 post_json(
157 &self.config,
158 format!("/api/v1/agents/{}/snapshot", self.config.agent_id).as_str(),
159 &payload,
160 )
161 }
162
163 pub fn report_snapshot_if_due<F>(
165 &mut self,
166 model: &str,
167 build_snapshot: F,
168 ) -> Result<(), String>
169 where
170 F: FnOnce() -> Result<Value, String>,
171 {
172 if !self.snapshot_due() {
173 return Ok(());
174 }
175 self.report_snapshot_now(model, build_snapshot)
176 }
177
178 fn heartbeat_due(&mut self) -> bool {
179 let now = Instant::now();
180 if now < self.next_heartbeat_at {
181 return false;
182 }
183 self.next_heartbeat_at = now + self.config.heartbeat_interval;
184 true
185 }
186
187 fn snapshot_due(&mut self) -> bool {
188 let now = Instant::now();
189 if now < self.next_snapshot_at {
190 return false;
191 }
192 self.next_snapshot_at = now + self.config.snapshot_interval;
193 true
194 }
195}
196
197pub fn reporting_config_from_bootstrap(
199 bootstrap: Option<&ApprovalRefreshBootstrap>,
200) -> Result<Option<AgentReportingConfig>, String> {
201 let Some(bootstrap) = bootstrap else {
202 return Ok(None);
203 };
204 Ok(Some(AgentReportingConfig {
205 api_base_url: bootstrap.api_base_url.clone(),
206 agent_id: bootstrap.agent_id.clone(),
207 agent_api_key: bootstrap.agent_api_key.clone(),
208 heartbeat_interval: interval_from_env("BYTEOR_HEARTBEAT_INTERVAL_SECS", 30)?,
209 snapshot_interval: interval_from_env("BYTEOR_SNAPSHOT_INTERVAL_SECS", 10)?,
210 }))
211}
212
213pub fn approval_trust_status(
215 cache: &TrustedApprovalKeyCache,
216 approval_credential: Option<&str>,
217) -> Result<AgentApprovalTrustStatus, String> {
218 let credential_expires_at = match approval_credential {
219 Some(credential) => Some(approval_credential_expires_at(credential)?),
220 None => None,
221 };
222 Ok(AgentApprovalTrustStatus {
223 key_source: cache.source.clone(),
224 last_refresh_at: cache.last_refresh_at.clone(),
225 credential_expires_at,
226 })
227}
228
229fn interval_from_env(name: &str, default_secs: u64) -> Result<Duration, String> {
230 let value = match std::env::var(name) {
231 Ok(raw) => raw
232 .parse::<u64>()
233 .map_err(|_| format!("{name} must be an integer number of seconds"))?,
234 Err(std::env::VarError::NotPresent) => default_secs,
235 Err(std::env::VarError::NotUnicode(_)) => {
236 return Err(format!("{name} must be valid unicode"));
237 }
238 };
239 if value == 0 {
240 return Err(format!("{name} must be greater than zero"));
241 }
242 Ok(Duration::from_secs(value))
243}
244
245fn hostname_from_env() -> Option<String> {
246 std::env::var("HOSTNAME")
247 .ok()
248 .filter(|value| !value.trim().is_empty())
249 .or_else(|| {
250 std::env::var("COMPUTERNAME")
251 .ok()
252 .filter(|value| !value.trim().is_empty())
253 })
254}
255
256fn capture_timestamp() -> Result<String, String> {
257 OffsetDateTime::now_utc()
258 .format(&Rfc3339)
259 .map_err(|error| format!("format runtime report timestamp: {error}"))
260}
261
262fn post_json<T>(config: &AgentReportingConfig, path: &str, payload: &T) -> Result<(), String>
263where
264 T: Serialize,
265{
266 let url = format!("{}{}", config.api_base_url.trim_end_matches('/'), path);
267 ureq::AgentBuilder::new()
268 .timeout(Duration::from_secs(10))
269 .build()
270 .post(&url)
271 .set("authorization", &format!("Bearer {}", config.agent_api_key))
272 .set("accept", "application/json")
273 .set("content-type", "application/json")
274 .set("user-agent", "byteor-agent/0 (api/v1)")
275 .send_json(ureq::json!(payload))
276 .map_err(|error| format!("post runtime report to {url}: {error}"))?;
277 Ok(())
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
285 use serde_json::json;
286 use std::io::{Read, Write};
287 use std::net::TcpListener;
288 use std::sync::mpsc;
289 use std::thread;
290
291 fn test_reporting_config(listener: &TcpListener) -> AgentReportingConfig {
292 AgentReportingConfig {
293 api_base_url: format!("http://{}", listener.local_addr().unwrap()),
294 agent_id: "01JAGT000TEST0000000000000".to_string(),
295 agent_api_key: "agent-api-key".to_string(),
296 heartbeat_interval: Duration::from_secs(30),
297 snapshot_interval: Duration::from_secs(10),
298 }
299 }
300
301 fn spawn_http_capture() -> (TcpListener, mpsc::Receiver<String>) {
302 let listener = TcpListener::bind("127.0.0.1:0").expect("bind listener");
303 let acceptor = listener.try_clone().expect("clone listener");
304 let (tx, rx) = mpsc::channel();
305 thread::spawn(move || {
306 let (mut stream, _) = acceptor.accept().expect("accept connection");
307 let mut buffer = Vec::new();
308 let mut chunk = [0u8; 4096];
309 loop {
310 let read = stream.read(&mut chunk).expect("read request");
311 if read == 0 {
312 break;
313 }
314 buffer.extend_from_slice(&chunk[..read]);
315 if let Some(header_end) = buffer.windows(4).position(|window| window == b"\r\n\r\n")
316 {
317 let header_len = header_end + 4;
318 let headers = String::from_utf8_lossy(&buffer[..header_len]);
319 let content_length = headers
320 .lines()
321 .find_map(|line| {
322 let lower = line.to_ascii_lowercase();
323 lower
324 .strip_prefix("content-length:")
325 .and_then(|value| value.trim().parse::<usize>().ok())
326 })
327 .unwrap_or(0);
328 if buffer.len() >= header_len + content_length {
329 break;
330 }
331 }
332 }
333 tx.send(String::from_utf8(buffer).expect("utf8 request"))
334 .expect("send request");
335 stream
336 .write_all(
337 b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 12\r\nConnection: close\r\n\r\n{\"ack\":true}",
338 )
339 .expect("write response");
340 });
341 (listener, rx)
342 }
343
344 fn request_json_body(request: &str) -> serde_json::Value {
345 let body = request
346 .split_once("\r\n\r\n")
347 .map(|(_, body)| body)
348 .expect("request body separator");
349 serde_json::from_str(body).expect("valid json body")
350 }
351
352 #[test]
353 fn approval_trust_status_reports_credential_expiry() {
354 let payload = json!({
355 "v": 1,
356 "kid": "01JKEY000TEST0000000000000",
357 "spec_hash": "sha256:test",
358 "capabilities": ["execute_side_effects"],
359 "environment_posture": "prod",
360 "expires_at": "2099-03-21T14:32:00Z"
361 });
362 let credential = format!("{}.", URL_SAFE_NO_PAD.encode(payload.to_string()));
363 let cache = TrustedApprovalKeyCache {
364 source: crate::approval::ApprovalTrustSource::Refreshed,
365 last_refresh_at: Some("2026-03-20T14:35:00Z".to_string()),
366 keys: Vec::new(),
367 };
368
369 let trust = approval_trust_status(&cache, Some(&credential)).expect("trust status");
370
371 assert_eq!(
372 trust.last_refresh_at.as_deref(),
373 Some("2026-03-20T14:35:00Z")
374 );
375 assert_eq!(
376 trust.credential_expires_at.as_deref(),
377 Some("2099-03-21T14:32:00Z")
378 );
379 }
380
381 #[test]
382 fn runtime_reporter_posts_heartbeat_request() {
383 let (listener, rx) = spawn_http_capture();
384 let reporter = RuntimeReporter::new(
385 test_reporting_config(&listener),
386 "0.12.0".to_string(),
387 "sha256:test-spec".to_string(),
388 test_capabilities(),
389 None,
390 );
391
392 reporter.report_heartbeat_now().expect("heartbeat post");
393
394 let request = rx.recv().expect("captured request");
395 assert!(request
396 .starts_with("POST /api/v1/agents/01JAGT000TEST0000000000000/heartbeat HTTP/1.1"));
397 assert!(
398 request.contains("Authorization: Bearer agent-api-key")
399 || request.contains("authorization: Bearer agent-api-key")
400 );
401 let body = request_json_body(&request);
402 assert_eq!(body["capabilities"]["schemaVersion"], serde_json::json!(1));
403 assert_eq!(body["capabilities"]["product"], serde_json::json!("byteor-runtime"));
404 assert!(!request.contains("stagePacks"));
405 }
406
407 #[test]
408 fn runtime_reporter_posts_lane_graph_snapshot_request() {
409 let (listener, rx) = spawn_http_capture();
410 let reporter = RuntimeReporter::new(
411 test_reporting_config(&listener),
412 "0.12.0".to_string(),
413 "sha256:test-spec".to_string(),
414 test_capabilities(),
415 None,
416 );
417
418 reporter
419 .report_snapshot_now("lane_graph", || Ok(json!({"roles": []})))
420 .expect("snapshot post");
421
422 let request = rx.recv().expect("captured request");
423 assert!(
424 request.starts_with("POST /api/v1/agents/01JAGT000TEST0000000000000/snapshot HTTP/1.1")
425 );
426 assert!(request.contains("\"model\":\"LaneGraph\""));
427 assert!(request.contains("\"laneGraph\":{\"roles\":[]}"));
428 }
429
430 #[test]
431 fn runtime_reporter_rejects_unsupported_snapshot_model_without_building_snapshot() {
432 let (listener, _rx) = spawn_http_capture();
433 let reporter = RuntimeReporter::new(
434 test_reporting_config(&listener),
435 "0.12.0".to_string(),
436 "sha256:test-spec".to_string(),
437 test_capabilities(),
438 None,
439 );
440
441 let mut invoked = false;
442 let err = reporter
443 .report_snapshot_now("adapter", || {
444 invoked = true;
445 Ok(json!({"adapters": []}))
446 })
447 .unwrap_err();
448
449 assert_eq!(err, "unsupported snapshot model: adapter");
450 assert!(!invoked);
451 }
452
453 #[test]
454 fn runtime_reporter_skips_snapshot_builder_when_snapshot_is_not_due() {
455 let (listener, _rx) = spawn_http_capture();
456 let mut reporter = RuntimeReporter::new(
457 test_reporting_config(&listener),
458 "0.12.0".to_string(),
459 "sha256:test-spec".to_string(),
460 test_capabilities(),
461 None,
462 );
463 reporter.next_snapshot_at = Instant::now() + Duration::from_secs(60);
464
465 let mut invoked = false;
466 reporter
467 .report_snapshot_if_due("lane_graph", || {
468 invoked = true;
469 Ok(json!({"roles": []}))
470 })
471 .expect("snapshot skip succeeds");
472
473 assert!(!invoked);
474 }
475
476 fn test_capabilities() -> RuntimeCapabilityDocument {
477 crate::capabilities::runtime_capability_document(
478 "byteor-runtime",
479 vec![crate::capabilities::stable_stage_catalog_entry(
480 "identity",
481 "Built-in identity stage",
482 vec![
483 crate::capabilities::ExecutionTarget::SingleRing,
484 crate::capabilities::ExecutionTarget::LaneGraph,
485 ],
486 crate::capabilities::PolicyClassId::PureTransform,
487 crate::capabilities::ReplaySupport::Allowed,
488 vec![
489 crate::capabilities::RuntimeEnvironment::Dev,
490 crate::capabilities::RuntimeEnvironment::Staging,
491 crate::capabilities::RuntimeEnvironment::Prod,
492 ],
493 Vec::new(),
494 )],
495 )
496 }
497}