byteor_app_kit/
agent.rs

1use std::time::{Duration, Instant};
2
3use serde::Serialize;
4use serde_json::Value;
5use time::{format_description::well_known::Rfc3339, OffsetDateTime};
6
7use crate::approval::{
8    approval_credential_expires_at, ApprovalRefreshBootstrap, TrustedApprovalKeyCache,
9};
10use crate::capabilities::RuntimeCapabilityDocument;
11
12/// Parsed control-plane reporting configuration for an enrolled runtime.
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct AgentReportingConfig {
15    /// Control plane API base URL.
16    pub api_base_url: String,
17    /// Runtime agent id.
18    pub agent_id: String,
19    /// Runtime agent API key.
20    pub agent_api_key: String,
21    /// Heartbeat cadence.
22    pub heartbeat_interval: Duration,
23    /// Snapshot cadence.
24    pub snapshot_interval: Duration,
25}
26
27/// Approval trust metadata reported alongside agent heartbeats.
28#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
29#[serde(rename_all = "camelCase")]
30pub struct AgentApprovalTrustStatus {
31    /// Whether trust was refreshed online or loaded from cache.
32    pub key_source: crate::approval::ApprovalTrustSource,
33    /// RFC3339 timestamp for the last successful key refresh, when known.
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub last_refresh_at: Option<String>,
36    /// RFC3339 credential expiry timestamp, when available.
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub credential_expires_at: Option<String>,
39}
40
41#[derive(Debug, Serialize)]
42#[serde(rename_all = "camelCase")]
43struct AgentHeartbeatRequest<'a> {
44    runtime_version: &'a str,
45    os: &'a str,
46    arch: &'a str,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    hostname: Option<&'a str>,
49    capabilities: &'a RuntimeCapabilityDocument,
50    active_spec_hash: &'a str,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    approval_trust: Option<&'a AgentApprovalTrustStatus>,
53}
54
55#[derive(Debug, Serialize)]
56#[serde(rename_all = "camelCase")]
57struct AgentSnapshotRequest {
58    spec_hash: String,
59    model: String,
60    captured_at: String,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    single_ring: Option<Value>,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    lane_graph: Option<Value>,
65}
66
67/// Lightweight scheduler and transport for agent heartbeat/snapshot reporting.
68#[derive(Debug, Clone)]
69pub struct RuntimeReporter {
70    config: AgentReportingConfig,
71    runtime_version: String,
72    os: String,
73    arch: String,
74    hostname: Option<String>,
75    capabilities: RuntimeCapabilityDocument,
76    active_spec_hash: String,
77    approval_trust: Option<AgentApprovalTrustStatus>,
78    next_heartbeat_at: Instant,
79    next_snapshot_at: Instant,
80}
81
82impl RuntimeReporter {
83    /// Construct a runtime reporter from explicit config.
84    pub fn new(
85        config: AgentReportingConfig,
86        runtime_version: String,
87        active_spec_hash: String,
88        capabilities: RuntimeCapabilityDocument,
89        approval_trust: Option<AgentApprovalTrustStatus>,
90    ) -> Self {
91        let now = Instant::now();
92        Self {
93            config,
94            runtime_version,
95            os: std::env::consts::OS.to_string(),
96            arch: std::env::consts::ARCH.to_string(),
97            hostname: hostname_from_env(),
98            capabilities,
99            active_spec_hash,
100            approval_trust,
101            next_heartbeat_at: now,
102            next_snapshot_at: now,
103        }
104    }
105
106    /// Send a heartbeat immediately.
107    pub fn report_heartbeat_now(&self) -> Result<(), String> {
108        let payload = AgentHeartbeatRequest {
109            runtime_version: self.runtime_version.as_str(),
110            os: self.os.as_str(),
111            arch: self.arch.as_str(),
112            hostname: self.hostname.as_deref(),
113            capabilities: &self.capabilities,
114            active_spec_hash: self.active_spec_hash.as_str(),
115            approval_trust: self.approval_trust.as_ref(),
116        };
117        post_json(
118            &self.config,
119            format!("/api/v1/agents/{}/heartbeat", self.config.agent_id).as_str(),
120            &payload,
121        )
122    }
123
124    /// Send a heartbeat when its interval has elapsed.
125    pub fn report_heartbeat_if_due(&mut self) -> Result<(), String> {
126        if !self.heartbeat_due() {
127            return Ok(());
128        }
129        self.report_heartbeat_now()
130    }
131
132    /// Send a snapshot immediately.
133    pub fn report_snapshot_now<F>(&self, model: &str, build_snapshot: F) -> Result<(), String>
134    where
135        F: FnOnce() -> Result<Value, String>,
136    {
137        let model = match model {
138            "single_ring" => "SingleRing",
139            "lane_graph" => "LaneGraph",
140            other => return Err(format!("unsupported snapshot model: {other}")),
141        };
142        let snapshot = build_snapshot()?;
143        let captured_at = capture_timestamp()?;
144        let (single_ring, lane_graph) = match model {
145            "SingleRing" => (Some(snapshot), None),
146            "LaneGraph" => (None, Some(snapshot)),
147            _ => unreachable!("snapshot model already validated"),
148        };
149        let payload = AgentSnapshotRequest {
150            spec_hash: self.active_spec_hash.clone(),
151            model: model.to_string(),
152            captured_at,
153            single_ring,
154            lane_graph,
155        };
156        post_json(
157            &self.config,
158            format!("/api/v1/agents/{}/snapshot", self.config.agent_id).as_str(),
159            &payload,
160        )
161    }
162
163    /// Send a snapshot when its interval has elapsed.
164    pub fn report_snapshot_if_due<F>(
165        &mut self,
166        model: &str,
167        build_snapshot: F,
168    ) -> Result<(), String>
169    where
170        F: FnOnce() -> Result<Value, String>,
171    {
172        if !self.snapshot_due() {
173            return Ok(());
174        }
175        self.report_snapshot_now(model, build_snapshot)
176    }
177
178    fn heartbeat_due(&mut self) -> bool {
179        let now = Instant::now();
180        if now < self.next_heartbeat_at {
181            return false;
182        }
183        self.next_heartbeat_at = now + self.config.heartbeat_interval;
184        true
185    }
186
187    fn snapshot_due(&mut self) -> bool {
188        let now = Instant::now();
189        if now < self.next_snapshot_at {
190            return false;
191        }
192        self.next_snapshot_at = now + self.config.snapshot_interval;
193        true
194    }
195}
196
197/// Build reporting config from the enrolled-agent bootstrap env if present.
198pub fn reporting_config_from_bootstrap(
199    bootstrap: Option<&ApprovalRefreshBootstrap>,
200) -> Result<Option<AgentReportingConfig>, String> {
201    let Some(bootstrap) = bootstrap else {
202        return Ok(None);
203    };
204    Ok(Some(AgentReportingConfig {
205        api_base_url: bootstrap.api_base_url.clone(),
206        agent_id: bootstrap.agent_id.clone(),
207        agent_api_key: bootstrap.agent_api_key.clone(),
208        heartbeat_interval: interval_from_env("BYTEOR_HEARTBEAT_INTERVAL_SECS", 30)?,
209        snapshot_interval: interval_from_env("BYTEOR_SNAPSHOT_INTERVAL_SECS", 10)?,
210    }))
211}
212
213/// Build approval trust metadata suitable for heartbeat payloads.
214pub fn approval_trust_status(
215    cache: &TrustedApprovalKeyCache,
216    approval_credential: Option<&str>,
217) -> Result<AgentApprovalTrustStatus, String> {
218    let credential_expires_at = match approval_credential {
219        Some(credential) => Some(approval_credential_expires_at(credential)?),
220        None => None,
221    };
222    Ok(AgentApprovalTrustStatus {
223        key_source: cache.source.clone(),
224        last_refresh_at: cache.last_refresh_at.clone(),
225        credential_expires_at,
226    })
227}
228
229fn interval_from_env(name: &str, default_secs: u64) -> Result<Duration, String> {
230    let value = match std::env::var(name) {
231        Ok(raw) => raw
232            .parse::<u64>()
233            .map_err(|_| format!("{name} must be an integer number of seconds"))?,
234        Err(std::env::VarError::NotPresent) => default_secs,
235        Err(std::env::VarError::NotUnicode(_)) => {
236            return Err(format!("{name} must be valid unicode"));
237        }
238    };
239    if value == 0 {
240        return Err(format!("{name} must be greater than zero"));
241    }
242    Ok(Duration::from_secs(value))
243}
244
245fn hostname_from_env() -> Option<String> {
246    std::env::var("HOSTNAME")
247        .ok()
248        .filter(|value| !value.trim().is_empty())
249        .or_else(|| {
250            std::env::var("COMPUTERNAME")
251                .ok()
252                .filter(|value| !value.trim().is_empty())
253        })
254}
255
256fn capture_timestamp() -> Result<String, String> {
257    OffsetDateTime::now_utc()
258        .format(&Rfc3339)
259        .map_err(|error| format!("format runtime report timestamp: {error}"))
260}
261
262fn post_json<T>(config: &AgentReportingConfig, path: &str, payload: &T) -> Result<(), String>
263where
264    T: Serialize,
265{
266    let url = format!("{}{}", config.api_base_url.trim_end_matches('/'), path);
267    ureq::AgentBuilder::new()
268        .timeout(Duration::from_secs(10))
269        .build()
270        .post(&url)
271        .set("authorization", &format!("Bearer {}", config.agent_api_key))
272        .set("accept", "application/json")
273        .set("content-type", "application/json")
274        .set("user-agent", "byteor-agent/0 (api/v1)")
275        .send_json(ureq::json!(payload))
276        .map_err(|error| format!("post runtime report to {url}: {error}"))?;
277    Ok(())
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
285    use serde_json::json;
286    use std::io::{Read, Write};
287    use std::net::TcpListener;
288    use std::sync::mpsc;
289    use std::thread;
290
291    fn test_reporting_config(listener: &TcpListener) -> AgentReportingConfig {
292        AgentReportingConfig {
293            api_base_url: format!("http://{}", listener.local_addr().unwrap()),
294            agent_id: "01JAGT000TEST0000000000000".to_string(),
295            agent_api_key: "agent-api-key".to_string(),
296            heartbeat_interval: Duration::from_secs(30),
297            snapshot_interval: Duration::from_secs(10),
298        }
299    }
300
301    fn spawn_http_capture() -> (TcpListener, mpsc::Receiver<String>) {
302        let listener = TcpListener::bind("127.0.0.1:0").expect("bind listener");
303        let acceptor = listener.try_clone().expect("clone listener");
304        let (tx, rx) = mpsc::channel();
305        thread::spawn(move || {
306            let (mut stream, _) = acceptor.accept().expect("accept connection");
307            let mut buffer = Vec::new();
308            let mut chunk = [0u8; 4096];
309            loop {
310                let read = stream.read(&mut chunk).expect("read request");
311                if read == 0 {
312                    break;
313                }
314                buffer.extend_from_slice(&chunk[..read]);
315                if let Some(header_end) = buffer.windows(4).position(|window| window == b"\r\n\r\n")
316                {
317                    let header_len = header_end + 4;
318                    let headers = String::from_utf8_lossy(&buffer[..header_len]);
319                    let content_length = headers
320                        .lines()
321                        .find_map(|line| {
322                            let lower = line.to_ascii_lowercase();
323                            lower
324                                .strip_prefix("content-length:")
325                                .and_then(|value| value.trim().parse::<usize>().ok())
326                        })
327                        .unwrap_or(0);
328                    if buffer.len() >= header_len + content_length {
329                        break;
330                    }
331                }
332            }
333            tx.send(String::from_utf8(buffer).expect("utf8 request"))
334                .expect("send request");
335            stream
336                .write_all(
337                    b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 12\r\nConnection: close\r\n\r\n{\"ack\":true}",
338                )
339                .expect("write response");
340        });
341        (listener, rx)
342    }
343
344    fn request_json_body(request: &str) -> serde_json::Value {
345        let body = request
346            .split_once("\r\n\r\n")
347            .map(|(_, body)| body)
348            .expect("request body separator");
349        serde_json::from_str(body).expect("valid json body")
350    }
351
352    #[test]
353    fn approval_trust_status_reports_credential_expiry() {
354        let payload = json!({
355            "v": 1,
356            "kid": "01JKEY000TEST0000000000000",
357            "spec_hash": "sha256:test",
358            "capabilities": ["execute_side_effects"],
359            "environment_posture": "prod",
360            "expires_at": "2099-03-21T14:32:00Z"
361        });
362        let credential = format!("{}.", URL_SAFE_NO_PAD.encode(payload.to_string()));
363        let cache = TrustedApprovalKeyCache {
364            source: crate::approval::ApprovalTrustSource::Refreshed,
365            last_refresh_at: Some("2026-03-20T14:35:00Z".to_string()),
366            keys: Vec::new(),
367        };
368
369        let trust = approval_trust_status(&cache, Some(&credential)).expect("trust status");
370
371        assert_eq!(
372            trust.last_refresh_at.as_deref(),
373            Some("2026-03-20T14:35:00Z")
374        );
375        assert_eq!(
376            trust.credential_expires_at.as_deref(),
377            Some("2099-03-21T14:32:00Z")
378        );
379    }
380
381    #[test]
382    fn runtime_reporter_posts_heartbeat_request() {
383        let (listener, rx) = spawn_http_capture();
384        let reporter = RuntimeReporter::new(
385            test_reporting_config(&listener),
386            "0.12.0".to_string(),
387            "sha256:test-spec".to_string(),
388            test_capabilities(),
389            None,
390        );
391
392        reporter.report_heartbeat_now().expect("heartbeat post");
393
394        let request = rx.recv().expect("captured request");
395        assert!(request
396            .starts_with("POST /api/v1/agents/01JAGT000TEST0000000000000/heartbeat HTTP/1.1"));
397        assert!(
398            request.contains("Authorization: Bearer agent-api-key")
399                || request.contains("authorization: Bearer agent-api-key")
400        );
401        let body = request_json_body(&request);
402        assert_eq!(body["capabilities"]["schemaVersion"], serde_json::json!(1));
403        assert_eq!(body["capabilities"]["product"], serde_json::json!("byteor-runtime"));
404        assert!(!request.contains("stagePacks"));
405    }
406
407    #[test]
408    fn runtime_reporter_posts_lane_graph_snapshot_request() {
409        let (listener, rx) = spawn_http_capture();
410        let reporter = RuntimeReporter::new(
411            test_reporting_config(&listener),
412            "0.12.0".to_string(),
413            "sha256:test-spec".to_string(),
414            test_capabilities(),
415            None,
416        );
417
418        reporter
419            .report_snapshot_now("lane_graph", || Ok(json!({"roles": []})))
420            .expect("snapshot post");
421
422        let request = rx.recv().expect("captured request");
423        assert!(
424            request.starts_with("POST /api/v1/agents/01JAGT000TEST0000000000000/snapshot HTTP/1.1")
425        );
426        assert!(request.contains("\"model\":\"LaneGraph\""));
427        assert!(request.contains("\"laneGraph\":{\"roles\":[]}"));
428    }
429
430    #[test]
431    fn runtime_reporter_rejects_unsupported_snapshot_model_without_building_snapshot() {
432        let (listener, _rx) = spawn_http_capture();
433        let reporter = RuntimeReporter::new(
434            test_reporting_config(&listener),
435            "0.12.0".to_string(),
436            "sha256:test-spec".to_string(),
437            test_capabilities(),
438            None,
439        );
440
441        let mut invoked = false;
442        let err = reporter
443            .report_snapshot_now("adapter", || {
444                invoked = true;
445                Ok(json!({"adapters": []}))
446            })
447            .unwrap_err();
448
449        assert_eq!(err, "unsupported snapshot model: adapter");
450        assert!(!invoked);
451    }
452
453    #[test]
454    fn runtime_reporter_skips_snapshot_builder_when_snapshot_is_not_due() {
455        let (listener, _rx) = spawn_http_capture();
456        let mut reporter = RuntimeReporter::new(
457            test_reporting_config(&listener),
458            "0.12.0".to_string(),
459            "sha256:test-spec".to_string(),
460            test_capabilities(),
461            None,
462        );
463        reporter.next_snapshot_at = Instant::now() + Duration::from_secs(60);
464
465        let mut invoked = false;
466        reporter
467            .report_snapshot_if_due("lane_graph", || {
468                invoked = true;
469                Ok(json!({"roles": []}))
470            })
471            .expect("snapshot skip succeeds");
472
473        assert!(!invoked);
474    }
475
476    fn test_capabilities() -> RuntimeCapabilityDocument {
477        crate::capabilities::runtime_capability_document(
478            "byteor-runtime",
479            vec![crate::capabilities::stable_stage_catalog_entry(
480                "identity",
481                "Built-in identity stage",
482                vec![
483                    crate::capabilities::ExecutionTarget::SingleRing,
484                    crate::capabilities::ExecutionTarget::LaneGraph,
485                ],
486                crate::capabilities::PolicyClassId::PureTransform,
487                crate::capabilities::ReplaySupport::Allowed,
488                vec![
489                    crate::capabilities::RuntimeEnvironment::Dev,
490                    crate::capabilities::RuntimeEnvironment::Staging,
491                    crate::capabilities::RuntimeEnvironment::Prod,
492                ],
493                Vec::new(),
494            )],
495        )
496    }
497}