1use std::collections::{BTreeMap, BTreeSet};
4
5use crate::error::{platform_err, OpsError, Result};
6
7#[derive(Clone, Copy, Debug, PartialEq, Eq)]
9pub enum PinningPreset {
10 None,
12 Balanced,
14 Physical,
16}
17
18impl PinningPreset {
19 pub fn parse(s: &str) -> Option<Self> {
21 match s {
22 "none" => Some(Self::None),
23 "balanced" => Some(Self::Balanced),
24 "physical" => Some(Self::Physical),
25 _ => None,
26 }
27 }
28
29 pub fn as_str(self) -> &'static str {
31 match self {
32 Self::None => "none",
33 Self::Balanced => "balanced",
34 Self::Physical => "physical",
35 }
36 }
37}
38
39pub fn apply_pinning(preset: PinningPreset) -> Result<()> {
41 apply_pinning_for_role(preset, "")
42}
43
44pub fn apply_pinning_for_role(preset: PinningPreset, role_name: &str) -> Result<()> {
46 match preset {
47 PinningPreset::None => Ok(()),
48 PinningPreset::Balanced | PinningPreset::Physical => {
49 let allowed = indexbus_platform_ops::cpu::current_thread_allowed_cpus()
50 .map_err(|e| platform_err("pinning probe failed", e))?;
51 if allowed.is_empty() {
52 return Err(OpsError::Invalid {
53 what: "pinning probe returned empty allowed CPU set".to_string(),
54 });
55 }
56
57 let pool = match preset {
58 PinningPreset::Balanced => allowed.as_slice(),
59 PinningPreset::Physical => {
60 let reduced = physical_core_cpu_pool_best_effort(&allowed);
61 if reduced.is_empty() {
62 allowed.as_slice()
63 } else {
64 return apply_pinning_for_role_from_pool(role_name, &reduced);
65 }
66 }
67 PinningPreset::None => unreachable!(),
68 };
69
70 apply_pinning_for_role_from_pool(role_name, pool)
71 }
72 }
73}
74
75fn apply_pinning_for_role_from_pool(role_name: &str, pool: &[usize]) -> Result<()> {
76 if pool.is_empty() {
77 return Err(OpsError::Invalid {
78 what: "pinning pool is empty".to_string(),
79 });
80 }
81 let idx = (fnv1a_u64(role_name.as_bytes()) as usize) % pool.len();
82 let cpu = pool[idx];
83 indexbus_platform_ops::cpu::pin_current_thread_to_cpu(cpu)
84 .map_err(|e| platform_err("pinning failed", e))
85}
86
87pub fn pin_current_thread_to_cpu(cpu: usize) -> Result<()> {
89 indexbus_platform_ops::cpu::pin_current_thread_to_cpu(cpu)
90 .map_err(|e| platform_err("pinning failed", e))
91}
92
93#[derive(Debug)]
95pub struct PinningAssigner {
96 preset: PinningPreset,
97 allowed: Vec<usize>,
98 used: BTreeSet<usize>,
99 assigned: BTreeMap<String, usize>,
100}
101
102impl PinningAssigner {
103 pub fn new(preset: PinningPreset) -> Result<Self> {
105 match preset {
106 PinningPreset::None => Ok(Self {
107 preset,
108 allowed: Vec::new(),
109 used: BTreeSet::new(),
110 assigned: BTreeMap::new(),
111 }),
112 PinningPreset::Balanced | PinningPreset::Physical => {
113 let allowed = indexbus_platform_ops::cpu::current_thread_allowed_cpus()
114 .map_err(|e| platform_err("pinning probe failed", e))?;
115 if allowed.is_empty() {
116 return Err(OpsError::Invalid {
117 what: "pinning probe returned empty allowed CPU set".to_string(),
118 });
119 }
120
121 let allowed = if preset == PinningPreset::Physical {
122 let reduced = physical_core_cpu_pool_best_effort(&allowed);
123 if reduced.is_empty() {
124 allowed
125 } else {
126 reduced
127 }
128 } else {
129 allowed
130 };
131 Ok(Self {
132 preset,
133 allowed,
134 used: BTreeSet::new(),
135 assigned: BTreeMap::new(),
136 })
137 }
138 }
139 }
140
141 pub fn reserve_cpu_for_role(&mut self, role_name: &str) -> Result<Option<usize>> {
143 match self.preset {
144 PinningPreset::None => Ok(None),
145 PinningPreset::Balanced | PinningPreset::Physical => {
146 if let Some(&cpu) = self.assigned.get(role_name) {
147 return Ok(Some(cpu));
148 }
149
150 let start = (fnv1a_u64(role_name.as_bytes()) as usize) % self.allowed.len();
151 for offset in 0..self.allowed.len() {
152 let cpu = self.allowed[(start + offset) % self.allowed.len()];
153 if !self.used.contains(&cpu) {
154 self.used.insert(cpu);
155 self.assigned.insert(role_name.to_string(), cpu);
156 return Ok(Some(cpu));
157 }
158 }
159
160 Err(OpsError::Invalid {
161 what: format!(
162 "not enough CPUs for balanced pinning: role={role_name} allowed={:?} used={:?}",
163 self.allowed, self.used
164 ),
165 })
166 }
167 }
168 }
169
170 pub fn apply_for_role(&mut self, role_name: &str) -> Result<()> {
172 let Some(cpu) = self.reserve_cpu_for_role(role_name)? else {
173 return Ok(());
174 };
175 pin_current_thread_to_cpu(cpu)
176 }
177}
178
179fn fnv1a_u64(bytes: &[u8]) -> u64 {
180 let mut h: u64 = 14695981039346656037;
181 for &b in bytes {
182 h ^= b as u64;
183 h = h.wrapping_mul(1099511628211);
184 }
185 h
186}
187
188fn physical_core_cpu_pool_best_effort(allowed: &[usize]) -> Vec<usize> {
189 #[cfg(target_os = "linux")]
190 {
191 linux_physical_core_cpu_pool_best_effort(allowed)
192 }
193 #[cfg(not(target_os = "linux"))]
194 {
195 let _ = allowed;
196 Vec::new()
197 }
198}
199
200#[cfg(target_os = "linux")]
201fn linux_physical_core_cpu_pool_best_effort(allowed: &[usize]) -> Vec<usize> {
202 use std::collections::BTreeMap;
203
204 fn read_u32(path: &str) -> Option<u32> {
205 std::fs::read_to_string(path)
206 .ok()
207 .and_then(|s| s.trim().parse::<u32>().ok())
208 }
209
210 fn core_key(cpu: usize) -> Option<(u32, u32)> {
211 let base = format!("/sys/devices/system/cpu/cpu{cpu}/topology");
212 let pkg = read_u32(&format!("{base}/physical_package_id"))?;
213 let core = read_u32(&format!("{base}/core_id"))?;
214 Some((pkg, core))
215 }
216
217 let mut chosen: BTreeMap<(u32, u32), usize> = BTreeMap::new();
218 for &cpu in allowed {
219 let Some(k) = core_key(cpu) else {
220 continue;
221 };
222 chosen
223 .entry(k)
224 .and_modify(|c| *c = (*c).min(cpu))
225 .or_insert(cpu);
226 }
227
228 let mut out: Vec<usize> = chosen.values().copied().collect();
229 out.sort_unstable();
230 out
231}