1use std::path::{Path, PathBuf};
2use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc};
3
4use crate::spec::engine_err_to_string;
5use crate::{policy_audit_record_for_spec, stage, thread_init_hook_for_tuning};
6use crate::{PinningPreset, SchedPreset, WaitPreset};
7
8fn parse_hex_bytes(s: &str) -> Result<Vec<u8>, String> {
9 let s = s.trim();
10 let s = s.strip_prefix("0x").unwrap_or(s);
11 if s.len() % 2 != 0 {
12 return Err("hex input must have even length".to_string());
13 }
14 let mut out = Vec::with_capacity(s.len() / 2);
15 let bytes = s.as_bytes();
16 let mut i = 0;
17 while i < bytes.len() {
18 let hi = (bytes[i] as char).to_digit(16);
19 let lo = (bytes[i + 1] as char).to_digit(16);
20 let Some(hi) = hi else {
21 return Err("invalid hex".to_string());
22 };
23 let Some(lo) = lo else {
24 return Err("invalid hex".to_string());
25 };
26 out.push(((hi << 4) | lo) as u8);
27 i += 2;
28 }
29 Ok(out)
30}
31
32pub fn run_sp_kv_v1_single_ring(
38 spec_path: &Path,
39 lane_path: PathBuf,
40 input: Vec<u8>,
41 wait: WaitPreset,
42) -> Result<serde_json::Value, String> {
43 run_sp_kv_v1_single_ring_and_pinning(spec_path, lane_path, input, wait, PinningPreset::None)
44}
45
46pub fn run_sp_kv_v1_single_ring_and_pinning(
49 spec_path: &Path,
50 lane_path: PathBuf,
51 input: Vec<u8>,
52 wait: WaitPreset,
53 pinning: PinningPreset,
54) -> Result<serde_json::Value, String> {
55 run_sp_kv_v1_single_ring_and_tuning(
56 spec_path,
57 lane_path,
58 input,
59 wait,
60 pinning,
61 SchedPreset::Other,
62 )
63}
64
65pub fn run_sp_kv_v1_single_ring_and_tuning(
70 spec_path: &Path,
71 lane_path: PathBuf,
72 input: Vec<u8>,
73 wait: WaitPreset,
74 pinning: PinningPreset,
75 sched: SchedPreset,
76) -> Result<serde_json::Value, String> {
77 let thread_init = thread_init_hook_for_tuning(pinning, sched)?;
78
79 let mut v = byteor_engine::run_sp_kv_v1_single_ring_with_resolver_and_thread_init(
80 spec_path,
81 lane_path.clone(),
82 input,
83 wait,
84 None,
85 thread_init,
86 )
87 .map_err(engine_err_to_string)?;
88 if let serde_json::Value::Object(obj) = &mut v {
89 obj.insert(
90 "policy".to_string(),
91 policy_audit_record_for_spec(spec_path, "run_sp", Some(&lane_path), Some(wait)),
92 );
93 }
94 Ok(v)
95}
96
97pub fn run_sp_kv_v1_single_ring_with_resolver(
100 spec_path: &Path,
101 lane_path: PathBuf,
102 input: Vec<u8>,
103 wait: WaitPreset,
104 resolver: &dyn stage::StageResolver,
105) -> Result<serde_json::Value, String> {
106 run_sp_kv_v1_single_ring_with_resolver_and_pinning(
107 spec_path,
108 lane_path,
109 input,
110 wait,
111 resolver,
112 PinningPreset::None,
113 )
114}
115
116pub fn run_sp_kv_v1_single_ring_with_resolver_and_pinning(
120 spec_path: &Path,
121 lane_path: PathBuf,
122 input: Vec<u8>,
123 wait: WaitPreset,
124 resolver: &dyn stage::StageResolver,
125 pinning: PinningPreset,
126) -> Result<serde_json::Value, String> {
127 run_sp_kv_v1_single_ring_with_resolver_and_tuning(
128 spec_path,
129 lane_path,
130 input,
131 wait,
132 resolver,
133 pinning,
134 SchedPreset::Other,
135 )
136}
137
138pub fn run_sp_kv_v1_single_ring_with_resolver_and_tuning(
143 spec_path: &Path,
144 lane_path: PathBuf,
145 input: Vec<u8>,
146 wait: WaitPreset,
147 resolver: &dyn stage::StageResolver,
148 pinning: PinningPreset,
149 sched: SchedPreset,
150) -> Result<serde_json::Value, String> {
151 let thread_init = thread_init_hook_for_tuning(pinning, sched)?;
152
153 let mut v = byteor_engine::run_sp_kv_v1_single_ring_with_resolver_and_thread_init(
154 spec_path,
155 lane_path.clone(),
156 input,
157 wait,
158 Some(resolver),
159 thread_init,
160 )
161 .map_err(engine_err_to_string)?;
162
163 if let serde_json::Value::Object(obj) = &mut v {
164 obj.insert(
165 "policy".to_string(),
166 policy_audit_record_for_spec(spec_path, "run_sp", Some(&lane_path), Some(wait)),
167 );
168 }
169 Ok(v)
170}
171
172pub fn run_kv_v1_single_ring_supervised_until_stop(
177 spec_path: &Path,
178 lane_path: PathBuf,
179 wait: WaitPreset,
180 stop: Arc<AtomicBool>,
181) -> Result<(), String> {
182 run_kv_v1_single_ring_supervised_until_stop_and_pinning(
183 spec_path,
184 lane_path,
185 wait,
186 stop,
187 PinningPreset::None,
188 )
189}
190
191pub fn run_kv_v1_single_ring_supervised_until_stop_and_pinning(
194 spec_path: &Path,
195 lane_path: PathBuf,
196 wait: WaitPreset,
197 stop: Arc<AtomicBool>,
198 pinning: PinningPreset,
199) -> Result<(), String> {
200 run_kv_v1_single_ring_supervised_until_stop_and_tuning(
201 spec_path,
202 lane_path,
203 wait,
204 stop,
205 pinning,
206 SchedPreset::Other,
207 )
208}
209
210pub fn run_kv_v1_single_ring_supervised_until_stop_and_tuning(
214 spec_path: &Path,
215 lane_path: PathBuf,
216 wait: WaitPreset,
217 stop: Arc<AtomicBool>,
218 pinning: PinningPreset,
219 sched: SchedPreset,
220) -> Result<(), String> {
221 let thread_init = thread_init_hook_for_tuning(pinning, sched)?;
222
223 let eng = byteor_engine::Engine::new(byteor_engine::EngineConfig {
224 stage_failure: byteor_engine::StageFailurePolicy::StopPipeline,
225 wait,
226 thread_init,
227 });
228
229 let rt = eng
230 .spawn_single_ring_kv_v1_runtime(spec_path, lane_path)
231 .map_err(engine_err_to_string)?;
232
233 while !stop.load(Ordering::Relaxed) {
234 if rt.any_stage_finished() {
235 rt.request_stop();
236 return rt.stop_and_join().map_err(|e| format!("stage exited: {e}"));
237 }
238 std::thread::sleep(std::time::Duration::from_millis(20));
239 }
240
241 rt.stop_and_join()
242 .map_err(|e| format!("stop/join failed: {e}"))
243}
244
245pub fn run_kv_v1_single_ring_supervised_until_stop_with_resolver(
247 spec_path: &Path,
248 lane_path: PathBuf,
249 wait: WaitPreset,
250 stop: Arc<AtomicBool>,
251 resolver: &dyn stage::StageResolver,
252) -> Result<(), String> {
253 run_kv_v1_single_ring_supervised_until_stop_with_resolver_and_pinning(
254 spec_path,
255 lane_path,
256 wait,
257 stop,
258 resolver,
259 PinningPreset::None,
260 )
261}
262
263pub fn run_kv_v1_single_ring_supervised_until_stop_with_resolver_and_pinning(
266 spec_path: &Path,
267 lane_path: PathBuf,
268 wait: WaitPreset,
269 stop: Arc<AtomicBool>,
270 resolver: &dyn stage::StageResolver,
271 pinning: PinningPreset,
272) -> Result<(), String> {
273 run_kv_v1_single_ring_supervised_until_stop_with_resolver_and_tuning(
274 spec_path,
275 lane_path,
276 wait,
277 stop,
278 resolver,
279 pinning,
280 SchedPreset::Other,
281 )
282}
283
284pub fn run_kv_v1_single_ring_supervised_until_stop_with_resolver_and_tuning(
289 spec_path: &Path,
290 lane_path: PathBuf,
291 wait: WaitPreset,
292 stop: Arc<AtomicBool>,
293 resolver: &dyn stage::StageResolver,
294 pinning: PinningPreset,
295 sched: SchedPreset,
296) -> Result<(), String> {
297 let thread_init = thread_init_hook_for_tuning(pinning, sched)?;
298
299 let eng = byteor_engine::Engine::new(byteor_engine::EngineConfig {
300 stage_failure: byteor_engine::StageFailurePolicy::StopPipeline,
301 wait,
302 thread_init,
303 });
304
305 let rt = eng
306 .spawn_single_ring_kv_v1_runtime_with_resolver(spec_path, lane_path, resolver)
307 .map_err(engine_err_to_string)?;
308
309 while !stop.load(Ordering::Relaxed) {
310 if rt.any_stage_finished() {
311 rt.request_stop();
312 return rt.stop_and_join().map_err(|e| format!("stage exited: {e}"));
313 }
314 std::thread::sleep(std::time::Duration::from_millis(20));
315 }
316
317 rt.stop_and_join()
318 .map_err(|e| format!("stop/join failed: {e}"))
319}
320
321pub fn parse_run_input(
323 input_hex: Option<&str>,
324 input_ascii: Option<&str>,
325) -> Result<Vec<u8>, String> {
326 match (input_hex, input_ascii) {
327 (Some(_), Some(_)) => Err("provide only one of --input-hex or --input-ascii".to_string()),
328 (Some(h), None) => parse_hex_bytes(h),
329 (None, Some(s)) => Ok(s.as_bytes().to_vec()),
330 (None, None) => Ok(b"hello".to_vec()),
331 }
332}