byteor_ops/
pinning.rs

1//! CPU pinning presets and helpers.
2
3use std::collections::{BTreeMap, BTreeSet};
4
5use crate::error::{platform_err, OpsError, Result};
6
7/// Pinning presets.
8#[derive(Clone, Copy, Debug, PartialEq, Eq)]
9pub enum PinningPreset {
10    /// Do not pin.
11    None,
12    /// Pin to a stable CPU chosen from the allowed CPU mask.
13    Balanced,
14    /// Prefer one logical CPU per physical core (best-effort).
15    Physical,
16}
17
18impl PinningPreset {
19    /// Parse a pinning preset from a CLI/env string.
20    pub fn parse(s: &str) -> Option<Self> {
21        match s {
22            "none" => Some(Self::None),
23            "balanced" => Some(Self::Balanced),
24            "physical" => Some(Self::Physical),
25            _ => None,
26        }
27    }
28
29    /// Convert a pinning preset to its stable string form.
30    pub fn as_str(self) -> &'static str {
31        match self {
32            Self::None => "none",
33            Self::Balanced => "balanced",
34            Self::Physical => "physical",
35        }
36    }
37}
38
39/// Apply a pinning preset to the current thread.
40pub fn apply_pinning(preset: PinningPreset) -> Result<()> {
41    apply_pinning_for_role(preset, "")
42}
43
44/// Apply a pinning preset to the current thread, using a role name as a stable distribution key.
45pub fn apply_pinning_for_role(preset: PinningPreset, role_name: &str) -> Result<()> {
46    match preset {
47        PinningPreset::None => Ok(()),
48        PinningPreset::Balanced | PinningPreset::Physical => {
49            let allowed = indexbus_platform_ops::cpu::current_thread_allowed_cpus()
50                .map_err(|e| platform_err("pinning probe failed", e))?;
51            if allowed.is_empty() {
52                return Err(OpsError::Invalid {
53                    what: "pinning probe returned empty allowed CPU set".to_string(),
54                });
55            }
56
57            let pool = match preset {
58                PinningPreset::Balanced => allowed.as_slice(),
59                PinningPreset::Physical => {
60                    let reduced = physical_core_cpu_pool_best_effort(&allowed);
61                    if reduced.is_empty() {
62                        allowed.as_slice()
63                    } else {
64                        return apply_pinning_for_role_from_pool(role_name, &reduced);
65                    }
66                }
67                PinningPreset::None => unreachable!(),
68            };
69
70            apply_pinning_for_role_from_pool(role_name, pool)
71        }
72    }
73}
74
75fn apply_pinning_for_role_from_pool(role_name: &str, pool: &[usize]) -> Result<()> {
76    if pool.is_empty() {
77        return Err(OpsError::Invalid {
78            what: "pinning pool is empty".to_string(),
79        });
80    }
81    let idx = (fnv1a_u64(role_name.as_bytes()) as usize) % pool.len();
82    let cpu = pool[idx];
83    indexbus_platform_ops::cpu::pin_current_thread_to_cpu(cpu)
84        .map_err(|e| platform_err("pinning failed", e))
85}
86
87/// Pin the current thread to the given CPU.
88pub fn pin_current_thread_to_cpu(cpu: usize) -> Result<()> {
89    indexbus_platform_ops::cpu::pin_current_thread_to_cpu(cpu)
90        .map_err(|e| platform_err("pinning failed", e))
91}
92
93/// A per-process pinning assigner.
94#[derive(Debug)]
95pub struct PinningAssigner {
96    preset: PinningPreset,
97    allowed: Vec<usize>,
98    used: BTreeSet<usize>,
99    assigned: BTreeMap<String, usize>,
100}
101
102impl PinningAssigner {
103    /// Create a new assigner using the current thread's allowed CPU mask.
104    pub fn new(preset: PinningPreset) -> Result<Self> {
105        match preset {
106            PinningPreset::None => Ok(Self {
107                preset,
108                allowed: Vec::new(),
109                used: BTreeSet::new(),
110                assigned: BTreeMap::new(),
111            }),
112            PinningPreset::Balanced | PinningPreset::Physical => {
113                let allowed = indexbus_platform_ops::cpu::current_thread_allowed_cpus()
114                    .map_err(|e| platform_err("pinning probe failed", e))?;
115                if allowed.is_empty() {
116                    return Err(OpsError::Invalid {
117                        what: "pinning probe returned empty allowed CPU set".to_string(),
118                    });
119                }
120
121                let allowed = if preset == PinningPreset::Physical {
122                    let reduced = physical_core_cpu_pool_best_effort(&allowed);
123                    if reduced.is_empty() {
124                        allowed
125                    } else {
126                        reduced
127                    }
128                } else {
129                    allowed
130                };
131                Ok(Self {
132                    preset,
133                    allowed,
134                    used: BTreeSet::new(),
135                    assigned: BTreeMap::new(),
136                })
137            }
138        }
139    }
140
141    /// Choose (and reserve) a CPU for a role thread.
142    pub fn reserve_cpu_for_role(&mut self, role_name: &str) -> Result<Option<usize>> {
143        match self.preset {
144            PinningPreset::None => Ok(None),
145            PinningPreset::Balanced | PinningPreset::Physical => {
146                if let Some(&cpu) = self.assigned.get(role_name) {
147                    return Ok(Some(cpu));
148                }
149
150                let start = (fnv1a_u64(role_name.as_bytes()) as usize) % self.allowed.len();
151                for offset in 0..self.allowed.len() {
152                    let cpu = self.allowed[(start + offset) % self.allowed.len()];
153                    if !self.used.contains(&cpu) {
154                        self.used.insert(cpu);
155                        self.assigned.insert(role_name.to_string(), cpu);
156                        return Ok(Some(cpu));
157                    }
158                }
159
160                Err(OpsError::Invalid {
161                    what: format!(
162                        "not enough CPUs for balanced pinning: role={role_name} allowed={:?} used={:?}",
163                        self.allowed, self.used
164                    ),
165                })
166            }
167        }
168    }
169
170    /// Apply the reserved CPU (if any) for `role_name` to the current thread.
171    pub fn apply_for_role(&mut self, role_name: &str) -> Result<()> {
172        let Some(cpu) = self.reserve_cpu_for_role(role_name)? else {
173            return Ok(());
174        };
175        pin_current_thread_to_cpu(cpu)
176    }
177}
178
179fn fnv1a_u64(bytes: &[u8]) -> u64 {
180    let mut h: u64 = 14695981039346656037;
181    for &b in bytes {
182        h ^= b as u64;
183        h = h.wrapping_mul(1099511628211);
184    }
185    h
186}
187
188fn physical_core_cpu_pool_best_effort(allowed: &[usize]) -> Vec<usize> {
189    #[cfg(target_os = "linux")]
190    {
191        linux_physical_core_cpu_pool_best_effort(allowed)
192    }
193    #[cfg(not(target_os = "linux"))]
194    {
195        let _ = allowed;
196        Vec::new()
197    }
198}
199
200#[cfg(target_os = "linux")]
201fn linux_physical_core_cpu_pool_best_effort(allowed: &[usize]) -> Vec<usize> {
202    use std::collections::BTreeMap;
203
204    fn read_u32(path: &str) -> Option<u32> {
205        std::fs::read_to_string(path)
206            .ok()
207            .and_then(|s| s.trim().parse::<u32>().ok())
208    }
209
210    fn core_key(cpu: usize) -> Option<(u32, u32)> {
211        let base = format!("/sys/devices/system/cpu/cpu{cpu}/topology");
212        let pkg = read_u32(&format!("{base}/physical_package_id"))?;
213        let core = read_u32(&format!("{base}/core_id"))?;
214        Some((pkg, core))
215    }
216
217    let mut chosen: BTreeMap<(u32, u32), usize> = BTreeMap::new();
218    for &cpu in allowed {
219        let Some(k) = core_key(cpu) else {
220            continue;
221        };
222        chosen
223            .entry(k)
224            .and_modify(|c| *c = (*c).min(cpu))
225            .or_insert(cpu);
226    }
227
228    let mut out: Vec<usize> = chosen.values().copied().collect();
229    out.sort_unstable();
230    out
231}