byteor_runtime/
tuning.rs

1use std::path::PathBuf;
2
3use serde::{Deserialize, Serialize};
4
5use crate::host::shm_runtime_context;
6use crate::{MlockallPreset, PinningPreset, SchedPreset, WaitPreset};
7
8#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
9#[serde(rename_all = "snake_case")]
10pub enum TuningProfile {
11    Default,
12    LowLatency,
13    IsolatedCore,
14}
15
16impl TuningProfile {
17    pub fn parse(s: &str) -> Option<Self> {
18        match s {
19            "default" => Some(Self::Default),
20            "low-latency" | "low_latency" => Some(Self::LowLatency),
21            "isolated-core" | "isolated_core" => Some(Self::IsolatedCore),
22            _ => None,
23        }
24    }
25
26    pub fn as_str(self) -> &'static str {
27        match self {
28            Self::Default => "default",
29            Self::LowLatency => "low_latency",
30            Self::IsolatedCore => "isolated_core",
31        }
32    }
33}
34
35#[derive(Clone, Debug, PartialEq, Eq)]
36pub struct ResolvedTuning {
37    pub profile: TuningProfile,
38    pub wait: WaitPreset,
39    pub pinning: PinningPreset,
40    pub mlockall: MlockallPreset,
41    pub sched: SchedPreset,
42    pub shm_parent: PathBuf,
43}
44
45#[derive(Clone, Debug)]
46pub struct TuningCliState {
47    profile: TuningProfile,
48    default_shm_parent: PathBuf,
49    wait_override: Option<WaitPreset>,
50    pinning_override: Option<PinningPreset>,
51    mlockall_override: Option<MlockallPreset>,
52    sched_override: Option<SchedPreset>,
53    shm_parent_override: Option<PathBuf>,
54}
55
56impl TuningCliState {
57    pub fn new(default_shm_parent: PathBuf) -> Self {
58        Self {
59            profile: TuningProfile::Default,
60            default_shm_parent,
61            wait_override: None,
62            pinning_override: None,
63            mlockall_override: None,
64            sched_override: None,
65            shm_parent_override: None,
66        }
67    }
68
69    pub fn set_profile(&mut self, profile: TuningProfile) {
70        self.profile = profile;
71    }
72
73    pub fn profile(&self) -> TuningProfile {
74        self.profile
75    }
76
77    pub fn set_wait(&mut self, wait: WaitPreset) {
78        self.wait_override = Some(wait);
79    }
80
81    pub fn set_pinning(&mut self, pinning: PinningPreset) {
82        self.pinning_override = Some(pinning);
83    }
84
85    pub fn set_mlockall(&mut self, mlockall: MlockallPreset) {
86        self.mlockall_override = Some(mlockall);
87    }
88
89    pub fn set_sched(&mut self, sched: SchedPreset) {
90        self.sched_override = Some(sched);
91    }
92
93    pub fn set_shm_parent(&mut self, shm_parent: PathBuf) {
94        self.shm_parent_override = Some(shm_parent);
95    }
96
97    pub fn resolve(&self) -> ResolvedTuning {
98        let mut resolved = profile_defaults(self.profile, self.default_shm_parent.clone());
99        if let Some(wait) = self.wait_override {
100            resolved.wait = wait;
101        }
102        if let Some(pinning) = self.pinning_override {
103            resolved.pinning = pinning;
104        }
105        if let Some(mlockall) = self.mlockall_override {
106            resolved.mlockall = mlockall;
107        }
108        if let Some(sched) = self.sched_override {
109            resolved.sched = sched;
110        }
111        if let Some(shm_parent) = &self.shm_parent_override {
112            resolved.shm_parent = shm_parent.clone();
113        }
114        resolved
115    }
116}
117
118#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
119pub struct EffectiveTuningSettings {
120    #[serde(default, skip_serializing_if = "Option::is_none", rename = "wait_preset")]
121    pub wait_preset: Option<String>,
122    #[serde(default, skip_serializing_if = "Option::is_none", rename = "cpu_set")]
123    pub cpu_set: Option<String>,
124    #[serde(default, skip_serializing_if = "Option::is_none", rename = "pinning_mode")]
125    pub pinning_mode: Option<String>,
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    pub mlockall: Option<bool>,
128    #[serde(default, skip_serializing_if = "Option::is_none", rename = "sched_preset")]
129    pub sched_preset: Option<String>,
130    #[serde(default, skip_serializing_if = "Option::is_none", rename = "rt_prio")]
131    pub rt_prio: Option<u8>,
132    #[serde(default, skip_serializing_if = "Option::is_none", rename = "shm_parent")]
133    pub shm_parent: Option<String>,
134}
135
136#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
137pub struct EffectiveTuningHostObservations {
138    #[serde(default, skip_serializing_if = "Vec::is_empty")]
139    pub notes: Vec<String>,
140    #[serde(default, skip_serializing_if = "Option::is_none")]
141    pub checked_at: Option<String>,
142    #[serde(default, skip_serializing_if = "Option::is_none")]
143    pub cpu_pinning_supported: Option<bool>,
144    #[serde(default, skip_serializing_if = "Option::is_none")]
145    pub mlockall_supported: Option<bool>,
146    #[serde(default, skip_serializing_if = "Option::is_none")]
147    pub realtime_scheduler_supported: Option<bool>,
148    #[serde(default, skip_serializing_if = "Option::is_none")]
149    pub observed_cpu_set: Option<String>,
150    #[serde(default, skip_serializing_if = "Option::is_none")]
151    pub shm_path: Option<String>,
152    #[serde(default, skip_serializing_if = "Option::is_none")]
153    pub shm_mount_fstype: Option<String>,
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    pub shm_hugetlbfs: Option<bool>,
156    #[serde(default, skip_serializing_if = "Option::is_none")]
157    pub shm_hugepage_size: Option<u64>,
158}
159
160#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
161pub struct EffectiveTuningAgentStatus {
162    pub agent_id: String,
163    pub agent_name: String,
164    pub status: String,
165    pub summary: String,
166    pub requested: EffectiveTuningSettings,
167    pub applied: EffectiveTuningSettings,
168    pub degraded: EffectiveTuningSettings,
169    pub host_observations: Option<EffectiveTuningHostObservations>,
170    pub failure_reasons: Vec<String>,
171}
172
173#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
174pub struct EffectiveTuningReport {
175    pub profile: String,
176    pub profile_defaults: EffectiveTuningSettings,
177    pub status: String,
178    pub summary: String,
179    pub requested: EffectiveTuningSettings,
180    pub applied: EffectiveTuningSettings,
181    pub degraded: EffectiveTuningSettings,
182    pub failure_reasons: Vec<String>,
183    pub host_observations: Option<EffectiveTuningHostObservations>,
184    pub agents: Vec<EffectiveTuningAgentStatus>,
185    pub applied_agent_ids: Vec<String>,
186    pub degraded_agent_ids: Vec<String>,
187    pub pending_agent_ids: Vec<String>,
188}
189
190pub fn parse_wait_preset(s: &str) -> Option<WaitPreset> {
191    match s {
192        "spin" | "SpinWait" => Some(WaitPreset::MaxPerfSpin),
193        "backoff" | "StdBackoff" => Some(WaitPreset::StdBackoff),
194        "Yield" | "yield" => Some(WaitPreset::Yield),
195        _ => None,
196    }
197}
198
199pub fn profile_defaults(profile: TuningProfile, shm_parent: PathBuf) -> ResolvedTuning {
200    let (wait, pinning, mlockall, sched) = match profile {
201        TuningProfile::Default => (
202            WaitPreset::StdBackoff,
203            PinningPreset::None,
204            MlockallPreset::None,
205            SchedPreset::Other,
206        ),
207        TuningProfile::LowLatency => (
208            WaitPreset::MaxPerfSpin,
209            PinningPreset::Balanced,
210            MlockallPreset::On,
211            SchedPreset::Other,
212        ),
213        TuningProfile::IsolatedCore => (
214            WaitPreset::MaxPerfSpin,
215            PinningPreset::Physical,
216            MlockallPreset::On,
217            SchedPreset::Fifo { prio: 80 },
218        ),
219    };
220
221    ResolvedTuning {
222        profile,
223        wait,
224        pinning,
225        mlockall,
226        sched,
227        shm_parent,
228    }
229}
230
231pub fn effective_tuning_report(
232    profile: TuningProfile,
233    tuning: &ResolvedTuning,
234) -> EffectiveTuningReport {
235    let defaults = profile_defaults(profile, tuning.shm_parent.clone());
236    let shm = shm_runtime_context(&tuning.shm_parent);
237    let mut notes = Vec::new();
238    if tuning.pinning != PinningPreset::None {
239        notes.push(
240            "pinning is applied best-effort per runtime thread; failed thread-level pinning is logged and execution continues".to_string(),
241        );
242    }
243    if tuning.sched.is_realtime() {
244        notes.push(
245            "realtime scheduler presets require CAP_SYS_NICE or a sufficient realtime priority rlimit".to_string(),
246        );
247    }
248    if tuning.mlockall == MlockallPreset::On {
249        notes.push(
250            "mlockall requires CAP_IPC_LOCK or a sufficient memlock rlimit".to_string(),
251        );
252    }
253    if shm.hugetlbfs {
254        notes.push(
255            "shm_parent is on hugetlbfs; HugeTLB page reservation remains a host-readiness prerequisite".to_string(),
256        );
257    }
258
259    let requested = effective_tuning_settings(tuning);
260    let host_observations = EffectiveTuningHostObservations {
261        notes,
262        checked_at: None,
263        cpu_pinning_supported: None,
264        mlockall_supported: None,
265        realtime_scheduler_supported: None,
266        observed_cpu_set: None,
267        shm_path: Some(shm.path.clone()),
268        shm_mount_fstype: shm.mount_fstype.clone(),
269        shm_hugetlbfs: Some(shm.hugetlbfs),
270        shm_hugepage_size: None,
271    };
272
273    let mut report = EffectiveTuningReport {
274        profile: profile.as_str().to_string(),
275        profile_defaults: effective_tuning_settings(&defaults),
276        status: String::new(),
277        summary: String::new(),
278        requested: requested.clone(),
279        applied: requested.clone(),
280        degraded: EffectiveTuningSettings::default(),
281        failure_reasons: Vec::new(),
282        host_observations: Some(host_observations),
283        agents: vec![EffectiveTuningAgentStatus {
284            agent_id: "local-runtime".to_string(),
285            agent_name: "Local runtime".to_string(),
286            status: String::new(),
287            summary: String::new(),
288            requested: requested.clone(),
289            applied: requested,
290            degraded: EffectiveTuningSettings::default(),
291            host_observations: None,
292            failure_reasons: Vec::new(),
293        }],
294        applied_agent_ids: Vec::new(),
295        degraded_agent_ids: Vec::new(),
296        pending_agent_ids: Vec::new(),
297    };
298    refresh_effective_tuning_rollup(&mut report);
299    report
300}
301
302pub fn tuning_config_error(detail: impl Into<String>) -> String {
303    format!("CONFIG ERROR: {}", detail.into())
304}
305
306pub fn tuning_host_readiness_error(detail: impl Into<String>) -> String {
307    format!("HOST READINESS ERROR: {}", detail.into())
308}
309
310fn effective_tuning_settings(tuning: &ResolvedTuning) -> EffectiveTuningSettings {
311    EffectiveTuningSettings {
312        wait_preset: Some(wait_preset_name(tuning.wait).to_string()),
313        cpu_set: None,
314        pinning_mode: (tuning.pinning != PinningPreset::None).then(|| tuning.pinning.as_str().to_string()),
315        mlockall: Some(tuning.mlockall == MlockallPreset::On),
316        sched_preset: sched_preset_name(tuning.sched),
317        rt_prio: tuning.sched.rt_prio(),
318        shm_parent: Some(tuning.shm_parent.display().to_string()),
319    }
320}
321
322pub(crate) fn refresh_effective_tuning_rollup(report: &mut EffectiveTuningReport) {
323    let degraded = !report.failure_reasons.is_empty();
324    report.status = if degraded { "degraded" } else { "applied" }.to_string();
325    report.summary = if degraded {
326        format!(
327            "Local runtime applied the requested tuning with degradation: {}",
328            report
329                .failure_reasons
330                .first()
331                .cloned()
332                .unwrap_or_else(|| "host readiness constraints were reported".to_string())
333        )
334    } else {
335        "Local runtime applied the requested tuning.".to_string()
336    };
337
338    report.applied_agent_ids = if degraded {
339        Vec::new()
340    } else {
341        vec!["local-runtime".to_string()]
342    };
343    report.degraded_agent_ids = if degraded {
344        vec!["local-runtime".to_string()]
345    } else {
346        Vec::new()
347    };
348    report.pending_agent_ids = Vec::new();
349
350    if let Some(agent) = report.agents.first_mut() {
351        agent.status = report.status.clone();
352        agent.summary = report.summary.clone();
353        agent.requested = report.requested.clone();
354        agent.applied = report.applied.clone();
355        agent.degraded = report.degraded.clone();
356        agent.host_observations = report.host_observations.clone();
357        agent.failure_reasons = report.failure_reasons.clone();
358    }
359}
360
361fn sched_preset_name(sched: SchedPreset) -> Option<String> {
362    match sched {
363        SchedPreset::Other => None,
364        SchedPreset::Fifo { prio } => Some(format!("Fifo{prio}")),
365        SchedPreset::Rr { prio } => Some(format!("RoundRobin{prio}")),
366    }
367}
368
369fn wait_preset_name(wait: WaitPreset) -> &'static str {
370    match wait {
371        WaitPreset::MaxPerfSpin => "SpinWait",
372        WaitPreset::StdBackoff => "StdBackoff",
373        WaitPreset::Yield => "Yield",
374    }
375}
376
377/// Build a per-thread init hook that applies pinning to executor threads.
378///
379/// This is intentionally best-effort: if pinning fails, we log a warning and continue unpinned.
380pub fn thread_init_hook_for_pinning_best_effort(
381    preset: PinningPreset,
382) -> Result<Option<byteor_pipeline_exec::ThreadInitHook>, String> {
383    if preset == PinningPreset::None {
384        return Ok(None);
385    }
386
387    let mut assigner = byteor_ops::PinningAssigner::new(preset)
388        .map_err(|e| format!("pinning init failed: {e}"))?;
389    let _ = assigner.reserve_cpu_for_role("");
390
391    let assigner = std::sync::Arc::new(std::sync::Mutex::new(assigner));
392    let preset_s = preset.as_str();
393
394    Ok(Some(std::sync::Arc::new(move |ctx| {
395        let mut g = assigner
396            .lock()
397            .map_err(|_| byteor_pipeline_exec::ExecError::new("pinning assigner poisoned"))?;
398        if let Err(e) = g.apply_for_role(ctx.role_name.as_str()) {
399            eprintln!(
400                "WARN pinning failed: preset={} role={} err={}",
401                preset_s, ctx.role_name, e
402            );
403        }
404        Ok(())
405    })))
406}
407
408/// Build a per-thread init hook that applies tuning to executor threads.
409///
410/// - Pinning is best-effort: failures warn and continue.
411/// - Scheduler changes are enforced when requested: failures return an error.
412pub fn thread_init_hook_for_tuning(
413    pinning: PinningPreset,
414    sched: SchedPreset,
415) -> Result<Option<byteor_pipeline_exec::ThreadInitHook>, String> {
416    if pinning == PinningPreset::None && sched == SchedPreset::Other {
417        return Ok(None);
418    }
419
420    let assigner = if pinning == PinningPreset::None {
421        None
422    } else {
423        let mut a = byteor_ops::PinningAssigner::new(pinning)
424            .map_err(|e| format!("pinning init failed: {e}"))?;
425        let _ = a.reserve_cpu_for_role("");
426        Some(std::sync::Arc::new(std::sync::Mutex::new(a)))
427    };
428
429    let pinning_s = pinning.as_str();
430    let sched_kind = sched.kind_str();
431    let sched_prio = sched.rt_prio();
432
433    Ok(Some(std::sync::Arc::new(move |ctx| {
434        if let Some(assigner) = &assigner {
435            let mut g = assigner
436                .lock()
437                .map_err(|_| byteor_pipeline_exec::ExecError::new("pinning assigner poisoned"))?;
438            if let Err(e) = g.apply_for_role(ctx.role_name.as_str()) {
439                eprintln!(
440                    "WARN pinning failed: preset={} role={} err={}",
441                    pinning_s, ctx.role_name, e
442                );
443            }
444        }
445
446        if sched != SchedPreset::Other {
447            if let Err(e) = byteor_ops::apply_sched(sched) {
448                return Err(byteor_pipeline_exec::ExecError::with_context(
449                    "sched apply failed",
450                    format!(
451                        "sched={sched_kind} prio={:?} role={}: {e}",
452                        sched_prio, ctx.role_name
453                    ),
454                ));
455            }
456        }
457
458        Ok(())
459    })))
460}