byteor_runtime/
runner.rs

1use std::path::{Path, PathBuf};
2use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
3
4use crate::spec::engine_err_to_string;
5use crate::{policy_audit_record_for_spec, stage, thread_init_hook_for_tuning};
6use crate::{PinningPreset, SchedPreset, WaitPreset};
7
8fn parse_hex_bytes(s: &str) -> Result<Vec<u8>, String> {
9    let s = s.trim();
10    let s = s.strip_prefix("0x").unwrap_or(s);
11    if s.len() % 2 != 0 {
12        return Err("hex input must have even length".to_string());
13    }
14    let mut out = Vec::with_capacity(s.len() / 2);
15    let bytes = s.as_bytes();
16    let mut i = 0;
17    while i < bytes.len() {
18        let hi = (bytes[i] as char).to_digit(16);
19        let lo = (bytes[i + 1] as char).to_digit(16);
20        let Some(hi) = hi else {
21            return Err("invalid hex".to_string());
22        };
23        let Some(lo) = lo else {
24            return Err("invalid hex".to_string());
25        };
26        out.push(((hi << 4) | lo) as u8);
27        i += 2;
28    }
29    Ok(out)
30}
31
32/// Validate and execute a v1 `spec.kv` file using the OSS bring-up SingleRing executor.
33///
34/// Notes:
35/// - Only `model=single_ring` is supported today.
36/// - Execution uses a temporary in-process producer+subscriber over SHM.
37pub fn run_sp_kv_v1_single_ring(
38    spec_path: &Path,
39    lane_path: PathBuf,
40    input: Vec<u8>,
41    wait: WaitPreset,
42) -> Result<serde_json::Value, String> {
43    run_sp_kv_v1_single_ring_and_pinning(spec_path, lane_path, input, wait, PinningPreset::None)
44}
45
46/// Validate and execute a v1 `spec.kv` file using the OSS bring-up SingleRing executor,
47/// applying a pinning preset to stage threads (best-effort).
48pub fn run_sp_kv_v1_single_ring_and_pinning(
49    spec_path: &Path,
50    lane_path: PathBuf,
51    input: Vec<u8>,
52    wait: WaitPreset,
53    pinning: PinningPreset,
54) -> Result<serde_json::Value, String> {
55    run_sp_kv_v1_single_ring_and_tuning(
56        spec_path,
57        lane_path,
58        input,
59        wait,
60        pinning,
61        SchedPreset::Other,
62    )
63}
64
65/// Validate and execute a v1 `spec.kv` file using the OSS bring-up SingleRing executor,
66/// applying tuning to stage threads.
67///
68/// Pinning is best-effort; scheduler changes are enforced when requested.
69pub fn run_sp_kv_v1_single_ring_and_tuning(
70    spec_path: &Path,
71    lane_path: PathBuf,
72    input: Vec<u8>,
73    wait: WaitPreset,
74    pinning: PinningPreset,
75    sched: SchedPreset,
76) -> Result<serde_json::Value, String> {
77    let thread_init = thread_init_hook_for_tuning(pinning, sched)?;
78
79    let mut v = byteor_engine::run_sp_kv_v1_single_ring_with_resolver_and_thread_init(
80        spec_path,
81        lane_path.clone(),
82        input,
83        wait,
84        None,
85        thread_init,
86    )
87    .map_err(engine_err_to_string)?;
88    if let serde_json::Value::Object(obj) = &mut v {
89        obj.insert(
90            "policy".to_string(),
91            policy_audit_record_for_spec(spec_path, "run_sp", Some(&lane_path), Some(wait)),
92        );
93    }
94    Ok(v)
95}
96
97/// Validate and execute a v1 `spec.kv` file using the bring-up SingleRing executor,
98/// resolving any `StageOpV1::ResolverKey` stages using `resolver`.
99pub fn run_sp_kv_v1_single_ring_with_resolver(
100    spec_path: &Path,
101    lane_path: PathBuf,
102    input: Vec<u8>,
103    wait: WaitPreset,
104    resolver: &dyn stage::StageResolver,
105) -> Result<serde_json::Value, String> {
106    run_sp_kv_v1_single_ring_with_resolver_and_pinning(
107        spec_path,
108        lane_path,
109        input,
110        wait,
111        resolver,
112        PinningPreset::None,
113    )
114}
115
116/// Validate and execute a v1 `spec.kv` file using the bring-up SingleRing executor,
117/// resolving any `StageOpV1::ResolverKey` stages using `resolver` and applying pinning
118/// (best-effort) to stage threads.
119pub fn run_sp_kv_v1_single_ring_with_resolver_and_pinning(
120    spec_path: &Path,
121    lane_path: PathBuf,
122    input: Vec<u8>,
123    wait: WaitPreset,
124    resolver: &dyn stage::StageResolver,
125    pinning: PinningPreset,
126) -> Result<serde_json::Value, String> {
127    run_sp_kv_v1_single_ring_with_resolver_and_tuning(
128        spec_path,
129        lane_path,
130        input,
131        wait,
132        resolver,
133        pinning,
134        SchedPreset::Other,
135    )
136}
137
138/// Validate and execute a v1 `spec.kv` file using the bring-up SingleRing executor,
139/// resolving any `StageOpV1::ResolverKey` stages using `resolver` and applying tuning.
140///
141/// Pinning is best-effort; scheduler changes are enforced when requested.
142pub fn run_sp_kv_v1_single_ring_with_resolver_and_tuning(
143    spec_path: &Path,
144    lane_path: PathBuf,
145    input: Vec<u8>,
146    wait: WaitPreset,
147    resolver: &dyn stage::StageResolver,
148    pinning: PinningPreset,
149    sched: SchedPreset,
150) -> Result<serde_json::Value, String> {
151    let thread_init = thread_init_hook_for_tuning(pinning, sched)?;
152
153    let mut v = byteor_engine::run_sp_kv_v1_single_ring_with_resolver_and_thread_init(
154        spec_path,
155        lane_path.clone(),
156        input,
157        wait,
158        Some(resolver),
159        thread_init,
160    )
161    .map_err(engine_err_to_string)?;
162
163    if let serde_json::Value::Object(obj) = &mut v {
164        obj.insert(
165            "policy".to_string(),
166            policy_audit_record_for_spec(spec_path, "run_sp", Some(&lane_path), Some(wait)),
167        );
168    }
169    Ok(v)
170}
171
172/// Run a long-lived v1 SingleRing runtime until `stop` is set.
173///
174/// Supervision policy (v1): if any stage thread exits unexpectedly, request stop and return an
175/// error after joining.
176pub fn run_kv_v1_single_ring_supervised_until_stop(
177    spec_path: &Path,
178    lane_path: PathBuf,
179    wait: WaitPreset,
180    stop: Arc<AtomicBool>,
181) -> Result<(), String> {
182    run_kv_v1_single_ring_supervised_until_stop_and_pinning(
183        spec_path,
184        lane_path,
185        wait,
186        stop,
187        PinningPreset::None,
188    )
189}
190
191/// Run a long-lived v1 SingleRing runtime until `stop` is set, applying pinning to stage threads
192/// (best-effort).
193pub fn run_kv_v1_single_ring_supervised_until_stop_and_pinning(
194    spec_path: &Path,
195    lane_path: PathBuf,
196    wait: WaitPreset,
197    stop: Arc<AtomicBool>,
198    pinning: PinningPreset,
199) -> Result<(), String> {
200    run_kv_v1_single_ring_supervised_until_stop_and_tuning(
201        spec_path,
202        lane_path,
203        wait,
204        stop,
205        pinning,
206        SchedPreset::Other,
207    )
208}
209
210/// Run a long-lived v1 SingleRing runtime until `stop` is set, applying tuning to stage threads.
211///
212/// Pinning is best-effort; scheduler changes are enforced when requested.
213pub fn run_kv_v1_single_ring_supervised_until_stop_and_tuning(
214    spec_path: &Path,
215    lane_path: PathBuf,
216    wait: WaitPreset,
217    stop: Arc<AtomicBool>,
218    pinning: PinningPreset,
219    sched: SchedPreset,
220) -> Result<(), String> {
221    let thread_init = thread_init_hook_for_tuning(pinning, sched)?;
222
223    let eng = byteor_engine::Engine::new(byteor_engine::EngineConfig {
224        stage_failure: byteor_engine::StageFailurePolicy::StopPipeline,
225        wait,
226        thread_init,
227    });
228
229    let rt = eng
230        .spawn_single_ring_kv_v1_runtime(spec_path, lane_path)
231        .map_err(engine_err_to_string)?;
232
233    while !stop.load(Ordering::Relaxed) {
234        if rt.any_stage_finished() {
235            rt.request_stop();
236            return rt.stop_and_join().map_err(|e| format!("stage exited: {e}"));
237        }
238        std::thread::sleep(std::time::Duration::from_millis(20));
239    }
240
241    rt.stop_and_join()
242        .map_err(|e| format!("stop/join failed: {e}"))
243}
244
245/// Run a long-lived v1 SingleRing runtime until `stop` is set, using a stage resolver.
246pub fn run_kv_v1_single_ring_supervised_until_stop_with_resolver(
247    spec_path: &Path,
248    lane_path: PathBuf,
249    wait: WaitPreset,
250    stop: Arc<AtomicBool>,
251    resolver: &dyn stage::StageResolver,
252) -> Result<(), String> {
253    run_kv_v1_single_ring_supervised_until_stop_with_resolver_and_pinning(
254        spec_path,
255        lane_path,
256        wait,
257        stop,
258        resolver,
259        PinningPreset::None,
260    )
261}
262
263/// Run a long-lived v1 SingleRing runtime until `stop` is set, using a stage resolver and
264/// applying pinning to stage threads (best-effort).
265pub fn run_kv_v1_single_ring_supervised_until_stop_with_resolver_and_pinning(
266    spec_path: &Path,
267    lane_path: PathBuf,
268    wait: WaitPreset,
269    stop: Arc<AtomicBool>,
270    resolver: &dyn stage::StageResolver,
271    pinning: PinningPreset,
272) -> Result<(), String> {
273    run_kv_v1_single_ring_supervised_until_stop_with_resolver_and_tuning(
274        spec_path,
275        lane_path,
276        wait,
277        stop,
278        resolver,
279        pinning,
280        SchedPreset::Other,
281    )
282}
283
284/// Run a long-lived v1 SingleRing runtime until `stop` is set, using a stage resolver and
285/// applying tuning.
286///
287/// Pinning is best-effort; scheduler changes are enforced when requested.
288pub fn run_kv_v1_single_ring_supervised_until_stop_with_resolver_and_tuning(
289    spec_path: &Path,
290    lane_path: PathBuf,
291    wait: WaitPreset,
292    stop: Arc<AtomicBool>,
293    resolver: &dyn stage::StageResolver,
294    pinning: PinningPreset,
295    sched: SchedPreset,
296) -> Result<(), String> {
297    let thread_init = thread_init_hook_for_tuning(pinning, sched)?;
298
299    let eng = byteor_engine::Engine::new(byteor_engine::EngineConfig {
300        stage_failure: byteor_engine::StageFailurePolicy::StopPipeline,
301        wait,
302        thread_init,
303    });
304
305    let rt = eng
306        .spawn_single_ring_kv_v1_runtime_with_resolver(spec_path, lane_path, resolver)
307        .map_err(engine_err_to_string)?;
308
309    while !stop.load(Ordering::Relaxed) {
310        if rt.any_stage_finished() {
311            rt.request_stop();
312            return rt.stop_and_join().map_err(|e| format!("stage exited: {e}"));
313        }
314        std::thread::sleep(std::time::Duration::from_millis(20));
315    }
316
317    rt.stop_and_join()
318        .map_err(|e| format!("stop/join failed: {e}"))
319}
320
321/// Parse input payload selection for `run --sp`.
322pub fn parse_run_input(
323    input_hex: Option<&str>,
324    input_ascii: Option<&str>,
325) -> Result<Vec<u8>, String> {
326    match (input_hex, input_ascii) {
327        (Some(_), Some(_)) => Err("provide only one of --input-hex or --input-ascii".to_string()),
328        (Some(h), None) => parse_hex_bytes(h),
329        (None, Some(s)) => Ok(s.as_bytes().to_vec()),
330        (None, None) => Ok(b"hello".to_vec()),
331    }
332}