1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4
5use crate::host::shm_runtime_context;
6use crate::{MlockallPreset, PinningPreset, SchedPreset, WaitPreset};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
9#[serde(rename_all = "snake_case")]
10pub enum TuningProfile {
11 Default,
12 LowLatency,
13 IsolatedCore,
14}
15
16impl TuningProfile {
17 pub fn parse(s: &str) -> Option<Self> {
18 match s {
19 "default" => Some(Self::Default),
20 "low-latency" | "low_latency" => Some(Self::LowLatency),
21 "isolated-core" | "isolated_core" => Some(Self::IsolatedCore),
22 _ => None,
23 }
24 }
25
26 pub fn as_str(self) -> &'static str {
27 match self {
28 Self::Default => "default",
29 Self::LowLatency => "low_latency",
30 Self::IsolatedCore => "isolated_core",
31 }
32 }
33}
34
35#[derive(Clone, Debug, PartialEq, Eq)]
36pub struct ResolvedTuning {
37 pub profile: TuningProfile,
38 pub wait: WaitPreset,
39 pub pinning: PinningPreset,
40 pub mlockall: MlockallPreset,
41 pub sched: SchedPreset,
42 pub shm_parent: PathBuf,
43}
44
45#[derive(Clone, Debug)]
46pub struct TuningCliState {
47 profile: TuningProfile,
48 default_shm_parent: PathBuf,
49 wait_override: Option<WaitPreset>,
50 pinning_override: Option<PinningPreset>,
51 mlockall_override: Option<MlockallPreset>,
52 sched_override: Option<SchedPreset>,
53 shm_parent_override: Option<PathBuf>,
54}
55
56impl TuningCliState {
57 pub fn new(default_shm_parent: PathBuf) -> Self {
58 Self {
59 profile: TuningProfile::Default,
60 default_shm_parent,
61 wait_override: None,
62 pinning_override: None,
63 mlockall_override: None,
64 sched_override: None,
65 shm_parent_override: None,
66 }
67 }
68
69 pub fn set_profile(&mut self, profile: TuningProfile) {
70 self.profile = profile;
71 }
72
73 pub fn profile(&self) -> TuningProfile {
74 self.profile
75 }
76
77 pub fn set_wait(&mut self, wait: WaitPreset) {
78 self.wait_override = Some(wait);
79 }
80
81 pub fn set_pinning(&mut self, pinning: PinningPreset) {
82 self.pinning_override = Some(pinning);
83 }
84
85 pub fn set_mlockall(&mut self, mlockall: MlockallPreset) {
86 self.mlockall_override = Some(mlockall);
87 }
88
89 pub fn set_sched(&mut self, sched: SchedPreset) {
90 self.sched_override = Some(sched);
91 }
92
93 pub fn set_shm_parent(&mut self, shm_parent: PathBuf) {
94 self.shm_parent_override = Some(shm_parent);
95 }
96
97 pub fn resolve(&self) -> ResolvedTuning {
98 let mut resolved = profile_defaults(self.profile, self.default_shm_parent.clone());
99 if let Some(wait) = self.wait_override {
100 resolved.wait = wait;
101 }
102 if let Some(pinning) = self.pinning_override {
103 resolved.pinning = pinning;
104 }
105 if let Some(mlockall) = self.mlockall_override {
106 resolved.mlockall = mlockall;
107 }
108 if let Some(sched) = self.sched_override {
109 resolved.sched = sched;
110 }
111 if let Some(shm_parent) = &self.shm_parent_override {
112 resolved.shm_parent = shm_parent.clone();
113 }
114 resolved
115 }
116}
117
118#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
119pub struct EffectiveTuningSettings {
120 #[serde(default, skip_serializing_if = "Option::is_none", rename = "wait_preset")]
121 pub wait_preset: Option<String>,
122 #[serde(default, skip_serializing_if = "Option::is_none", rename = "cpu_set")]
123 pub cpu_set: Option<String>,
124 #[serde(default, skip_serializing_if = "Option::is_none", rename = "pinning_mode")]
125 pub pinning_mode: Option<String>,
126 #[serde(default, skip_serializing_if = "Option::is_none")]
127 pub mlockall: Option<bool>,
128 #[serde(default, skip_serializing_if = "Option::is_none", rename = "sched_preset")]
129 pub sched_preset: Option<String>,
130 #[serde(default, skip_serializing_if = "Option::is_none", rename = "rt_prio")]
131 pub rt_prio: Option<u8>,
132 #[serde(default, skip_serializing_if = "Option::is_none", rename = "shm_parent")]
133 pub shm_parent: Option<String>,
134}
135
136#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
137pub struct EffectiveTuningHostObservations {
138 #[serde(default, skip_serializing_if = "Vec::is_empty")]
139 pub notes: Vec<String>,
140 #[serde(default, skip_serializing_if = "Option::is_none")]
141 pub checked_at: Option<String>,
142 #[serde(default, skip_serializing_if = "Option::is_none")]
143 pub cpu_pinning_supported: Option<bool>,
144 #[serde(default, skip_serializing_if = "Option::is_none")]
145 pub mlockall_supported: Option<bool>,
146 #[serde(default, skip_serializing_if = "Option::is_none")]
147 pub realtime_scheduler_supported: Option<bool>,
148 #[serde(default, skip_serializing_if = "Option::is_none")]
149 pub observed_cpu_set: Option<String>,
150 #[serde(default, skip_serializing_if = "Option::is_none")]
151 pub shm_path: Option<String>,
152 #[serde(default, skip_serializing_if = "Option::is_none")]
153 pub shm_mount_fstype: Option<String>,
154 #[serde(default, skip_serializing_if = "Option::is_none")]
155 pub shm_hugetlbfs: Option<bool>,
156 #[serde(default, skip_serializing_if = "Option::is_none")]
157 pub shm_hugepage_size: Option<u64>,
158}
159
160#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
161pub struct EffectiveTuningAgentStatus {
162 pub agent_id: String,
163 pub agent_name: String,
164 pub status: String,
165 pub summary: String,
166 pub requested: EffectiveTuningSettings,
167 pub applied: EffectiveTuningSettings,
168 pub degraded: EffectiveTuningSettings,
169 pub host_observations: Option<EffectiveTuningHostObservations>,
170 pub failure_reasons: Vec<String>,
171}
172
173#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
174pub struct EffectiveTuningReport {
175 pub profile: String,
176 pub profile_defaults: EffectiveTuningSettings,
177 pub status: String,
178 pub summary: String,
179 pub requested: EffectiveTuningSettings,
180 pub applied: EffectiveTuningSettings,
181 pub degraded: EffectiveTuningSettings,
182 pub failure_reasons: Vec<String>,
183 pub host_observations: Option<EffectiveTuningHostObservations>,
184 pub agents: Vec<EffectiveTuningAgentStatus>,
185 pub applied_agent_ids: Vec<String>,
186 pub degraded_agent_ids: Vec<String>,
187 pub pending_agent_ids: Vec<String>,
188}
189
190pub fn parse_wait_preset(s: &str) -> Option<WaitPreset> {
191 match s {
192 "spin" | "SpinWait" => Some(WaitPreset::MaxPerfSpin),
193 "backoff" | "StdBackoff" => Some(WaitPreset::StdBackoff),
194 "Yield" | "yield" => Some(WaitPreset::Yield),
195 _ => None,
196 }
197}
198
199pub fn profile_defaults(profile: TuningProfile, shm_parent: PathBuf) -> ResolvedTuning {
200 let (wait, pinning, mlockall, sched) = match profile {
201 TuningProfile::Default => (
202 WaitPreset::StdBackoff,
203 PinningPreset::None,
204 MlockallPreset::None,
205 SchedPreset::Other,
206 ),
207 TuningProfile::LowLatency => (
208 WaitPreset::MaxPerfSpin,
209 PinningPreset::Balanced,
210 MlockallPreset::On,
211 SchedPreset::Other,
212 ),
213 TuningProfile::IsolatedCore => (
214 WaitPreset::MaxPerfSpin,
215 PinningPreset::Physical,
216 MlockallPreset::On,
217 SchedPreset::Fifo { prio: 80 },
218 ),
219 };
220
221 ResolvedTuning {
222 profile,
223 wait,
224 pinning,
225 mlockall,
226 sched,
227 shm_parent,
228 }
229}
230
231pub fn effective_tuning_report(
232 profile: TuningProfile,
233 tuning: &ResolvedTuning,
234) -> EffectiveTuningReport {
235 let defaults = profile_defaults(profile, tuning.shm_parent.clone());
236 let shm = shm_runtime_context(&tuning.shm_parent);
237 let mut notes = Vec::new();
238 if tuning.pinning != PinningPreset::None {
239 notes.push(
240 "pinning is applied best-effort per runtime thread; failed thread-level pinning is logged and execution continues".to_string(),
241 );
242 }
243 if tuning.sched.is_realtime() {
244 notes.push(
245 "realtime scheduler presets require CAP_SYS_NICE or a sufficient realtime priority rlimit".to_string(),
246 );
247 }
248 if tuning.mlockall == MlockallPreset::On {
249 notes.push(
250 "mlockall requires CAP_IPC_LOCK or a sufficient memlock rlimit".to_string(),
251 );
252 }
253 if shm.hugetlbfs {
254 notes.push(
255 "shm_parent is on hugetlbfs; HugeTLB page reservation remains a host-readiness prerequisite".to_string(),
256 );
257 }
258
259 let requested = effective_tuning_settings(tuning);
260 let host_observations = EffectiveTuningHostObservations {
261 notes,
262 checked_at: None,
263 cpu_pinning_supported: None,
264 mlockall_supported: None,
265 realtime_scheduler_supported: None,
266 observed_cpu_set: None,
267 shm_path: Some(shm.path.clone()),
268 shm_mount_fstype: shm.mount_fstype.clone(),
269 shm_hugetlbfs: Some(shm.hugetlbfs),
270 shm_hugepage_size: None,
271 };
272
273 let mut report = EffectiveTuningReport {
274 profile: profile.as_str().to_string(),
275 profile_defaults: effective_tuning_settings(&defaults),
276 status: String::new(),
277 summary: String::new(),
278 requested: requested.clone(),
279 applied: requested.clone(),
280 degraded: EffectiveTuningSettings::default(),
281 failure_reasons: Vec::new(),
282 host_observations: Some(host_observations),
283 agents: vec![EffectiveTuningAgentStatus {
284 agent_id: "local-runtime".to_string(),
285 agent_name: "Local runtime".to_string(),
286 status: String::new(),
287 summary: String::new(),
288 requested: requested.clone(),
289 applied: requested,
290 degraded: EffectiveTuningSettings::default(),
291 host_observations: None,
292 failure_reasons: Vec::new(),
293 }],
294 applied_agent_ids: Vec::new(),
295 degraded_agent_ids: Vec::new(),
296 pending_agent_ids: Vec::new(),
297 };
298 refresh_effective_tuning_rollup(&mut report);
299 report
300}
301
302pub fn tuning_config_error(detail: impl Into<String>) -> String {
303 format!("CONFIG ERROR: {}", detail.into())
304}
305
306pub fn tuning_host_readiness_error(detail: impl Into<String>) -> String {
307 format!("HOST READINESS ERROR: {}", detail.into())
308}
309
310fn effective_tuning_settings(tuning: &ResolvedTuning) -> EffectiveTuningSettings {
311 EffectiveTuningSettings {
312 wait_preset: Some(wait_preset_name(tuning.wait).to_string()),
313 cpu_set: None,
314 pinning_mode: (tuning.pinning != PinningPreset::None).then(|| tuning.pinning.as_str().to_string()),
315 mlockall: Some(tuning.mlockall == MlockallPreset::On),
316 sched_preset: sched_preset_name(tuning.sched),
317 rt_prio: tuning.sched.rt_prio(),
318 shm_parent: Some(tuning.shm_parent.display().to_string()),
319 }
320}
321
322pub(crate) fn refresh_effective_tuning_rollup(report: &mut EffectiveTuningReport) {
323 let degraded = !report.failure_reasons.is_empty();
324 report.status = if degraded { "degraded" } else { "applied" }.to_string();
325 report.summary = if degraded {
326 format!(
327 "Local runtime applied the requested tuning with degradation: {}",
328 report
329 .failure_reasons
330 .first()
331 .cloned()
332 .unwrap_or_else(|| "host readiness constraints were reported".to_string())
333 )
334 } else {
335 "Local runtime applied the requested tuning.".to_string()
336 };
337
338 report.applied_agent_ids = if degraded {
339 Vec::new()
340 } else {
341 vec!["local-runtime".to_string()]
342 };
343 report.degraded_agent_ids = if degraded {
344 vec!["local-runtime".to_string()]
345 } else {
346 Vec::new()
347 };
348 report.pending_agent_ids = Vec::new();
349
350 if let Some(agent) = report.agents.first_mut() {
351 agent.status = report.status.clone();
352 agent.summary = report.summary.clone();
353 agent.requested = report.requested.clone();
354 agent.applied = report.applied.clone();
355 agent.degraded = report.degraded.clone();
356 agent.host_observations = report.host_observations.clone();
357 agent.failure_reasons = report.failure_reasons.clone();
358 }
359}
360
361fn sched_preset_name(sched: SchedPreset) -> Option<String> {
362 match sched {
363 SchedPreset::Other => None,
364 SchedPreset::Fifo { prio } => Some(format!("Fifo{prio}")),
365 SchedPreset::Rr { prio } => Some(format!("RoundRobin{prio}")),
366 }
367}
368
369fn wait_preset_name(wait: WaitPreset) -> &'static str {
370 match wait {
371 WaitPreset::MaxPerfSpin => "SpinWait",
372 WaitPreset::StdBackoff => "StdBackoff",
373 WaitPreset::Yield => "Yield",
374 }
375}
376
377pub fn thread_init_hook_for_pinning_best_effort(
381 preset: PinningPreset,
382) -> Result<Option<byteor_pipeline_exec::ThreadInitHook>, String> {
383 if preset == PinningPreset::None {
384 return Ok(None);
385 }
386
387 let mut assigner = byteor_ops::PinningAssigner::new(preset)
388 .map_err(|e| format!("pinning init failed: {e}"))?;
389 let _ = assigner.reserve_cpu_for_role("");
390
391 let assigner = std::sync::Arc::new(std::sync::Mutex::new(assigner));
392 let preset_s = preset.as_str();
393
394 Ok(Some(std::sync::Arc::new(move |ctx| {
395 let mut g = assigner
396 .lock()
397 .map_err(|_| byteor_pipeline_exec::ExecError::new("pinning assigner poisoned"))?;
398 if let Err(e) = g.apply_for_role(ctx.role_name.as_str()) {
399 eprintln!(
400 "WARN pinning failed: preset={} role={} err={}",
401 preset_s, ctx.role_name, e
402 );
403 }
404 Ok(())
405 })))
406}
407
408pub fn thread_init_hook_for_tuning(
413 pinning: PinningPreset,
414 sched: SchedPreset,
415) -> Result<Option<byteor_pipeline_exec::ThreadInitHook>, String> {
416 if pinning == PinningPreset::None && sched == SchedPreset::Other {
417 return Ok(None);
418 }
419
420 let assigner = if pinning == PinningPreset::None {
421 None
422 } else {
423 let mut a = byteor_ops::PinningAssigner::new(pinning)
424 .map_err(|e| format!("pinning init failed: {e}"))?;
425 let _ = a.reserve_cpu_for_role("");
426 Some(std::sync::Arc::new(std::sync::Mutex::new(a)))
427 };
428
429 let pinning_s = pinning.as_str();
430 let sched_kind = sched.kind_str();
431 let sched_prio = sched.rt_prio();
432
433 Ok(Some(std::sync::Arc::new(move |ctx| {
434 if let Some(assigner) = &assigner {
435 let mut g = assigner
436 .lock()
437 .map_err(|_| byteor_pipeline_exec::ExecError::new("pinning assigner poisoned"))?;
438 if let Err(e) = g.apply_for_role(ctx.role_name.as_str()) {
439 eprintln!(
440 "WARN pinning failed: preset={} role={} err={}",
441 pinning_s, ctx.role_name, e
442 );
443 }
444 }
445
446 if sched != SchedPreset::Other {
447 if let Err(e) = byteor_ops::apply_sched(sched) {
448 return Err(byteor_pipeline_exec::ExecError::with_context(
449 "sched apply failed",
450 format!(
451 "sched={sched_kind} prio={:?} role={}: {e}",
452 sched_prio, ctx.role_name
453 ),
454 ));
455 }
456 }
457
458 Ok(())
459 })))
460}