1#[cfg(feature = "alloc")]
4use alloc::collections::BTreeMap;
5
6#[cfg(feature = "alloc")]
7use alloc::format;
8
9#[cfg(feature = "alloc")]
10use alloc::string::{String, ToString};
11
12#[cfg(feature = "alloc")]
13use alloc::vec::Vec;
14
15#[cfg(feature = "alloc")]
16use crate::{
17 canonicalize_v1, validate_v1, BridgeRoleCfgV1, EndpointCfgV1, EndpointKindV1, LaneCfgV1,
18 LaneGraphV1, LaneKindV1, MergePolicyV1, MergeRoleCfgV1, OnFullV1, OrderingV1, PipelineSpecV1,
19 RoleCfgV1, RouterRoleCfgV1, ShardKeyV1, SingleRingProducerV1, SingleRingSchedulingV1,
20 SingleRingV1, SpecError, StageOpV1, StageRoleCfgV1, StageV1,
21};
22
23#[cfg(feature = "alloc")]
30pub fn encode_kv_v1(spec: &PipelineSpecV1) -> String {
31 let spec = canonicalize_v1(spec);
32 let mut out = String::new();
33 out.push_str("v=1\n");
34 match &spec {
35 PipelineSpecV1::LaneGraph(lg) => {
36 out.push_str("model=lane_graph\n");
37
38 out.push_str("lane_graph.on_full=");
39 match lg.on_full {
40 OnFullV1::Block => out.push_str("block"),
41 OnFullV1::Drop => out.push_str("drop"),
42 }
43 out.push('\n');
44
45 out.push_str("lane_graph.endpoints.len=");
46 out.push_str(&lg.endpoints.len().to_string());
47 out.push('\n');
48 for (i, ep) in lg.endpoints.iter().enumerate() {
49 let p = format!("lane_graph.endpoints.{i}.");
50 out.push_str(&p);
51 out.push_str("name=");
52 out.push_str(&ep.name);
53 out.push('\n');
54
55 out.push_str(&p);
56 out.push_str("kind=");
57 match ep.kind {
58 EndpointKindV1::Ingress => out.push_str("ingress"),
59 EndpointKindV1::Egress => out.push_str("egress"),
60 }
61 out.push('\n');
62
63 out.push_str(&p);
64 out.push_str("lane=");
65 out.push_str(&ep.lane);
66 out.push('\n');
67 }
68
69 out.push_str("lane_graph.lanes.len=");
70 out.push_str(&lg.lanes.len().to_string());
71 out.push('\n');
72 for (i, lane) in lg.lanes.iter().enumerate() {
73 let p = format!("lane_graph.lanes.{i}.");
74 out.push_str(&p);
75 out.push_str("name=");
76 out.push_str(&lane.name);
77 out.push('\n');
78
79 out.push_str(&p);
80 out.push_str("kind=");
81 match lane.kind {
82 LaneKindV1::Events => {
83 out.push_str("events");
84 out.push('\n');
85 }
86 LaneKindV1::Journal => {
87 out.push_str("journal");
88 out.push('\n');
89 }
90 LaneKindV1::SequencedSlots { capacity, gating } => {
91 out.push_str("sequenced_slots");
92 out.push('\n');
93 out.push_str(&p);
94 out.push_str("capacity=");
95 out.push_str(&capacity.to_string());
96 out.push('\n');
97 out.push_str(&p);
98 out.push_str("gating=");
99 out.push_str(&gating.to_string());
100 out.push('\n');
101 }
102 LaneKindV1::FanoutBroadcast { consumers } => {
103 out.push_str("fanout_broadcast");
104 out.push('\n');
105 out.push_str(&p);
106 out.push_str("consumers=");
107 out.push_str(&consumers.to_string());
108 out.push('\n');
109 }
110 }
111 }
112
113 out.push_str("lane_graph.roles.len=");
114 out.push_str(&lg.roles.len().to_string());
115 out.push('\n');
116 for (i, role) in lg.roles.iter().enumerate() {
117 let p = format!("lane_graph.roles.{i}.");
118 match role {
119 RoleCfgV1::Stage(cfg) => {
120 out.push_str(&p);
121 out.push_str("kind=stage\n");
122 out.push_str(&p);
123 out.push_str("name=");
124 out.push_str(&cfg.name);
125 out.push('\n');
126 out.push_str(&p);
127 out.push_str("stage=");
128 out.push_str(&cfg.stage);
129 out.push('\n');
130 out.push_str(&p);
131 out.push_str("rx=");
132 out.push_str(&cfg.rx);
133 out.push('\n');
134 out.push_str(&p);
135 out.push_str("tx=");
136 out.push_str(&cfg.tx);
137 out.push('\n');
138 }
139 RoleCfgV1::Bridge(cfg) => {
140 out.push_str(&p);
141 out.push_str("kind=bridge\n");
142 out.push_str(&p);
143 out.push_str("name=");
144 out.push_str(&cfg.name);
145 out.push('\n');
146 out.push_str(&p);
147 out.push_str("rx=");
148 out.push_str(&cfg.rx);
149 out.push('\n');
150 out.push_str(&p);
151 out.push_str("tx=");
152 out.push_str(&cfg.tx);
153 out.push('\n');
154 }
155 RoleCfgV1::Router(cfg) => {
156 out.push_str(&p);
157 out.push_str("kind=router\n");
158 out.push_str(&p);
159 out.push_str("name=");
160 out.push_str(&cfg.name);
161 out.push('\n');
162 out.push_str(&p);
163 out.push_str("rx=");
164 out.push_str(&cfg.rx);
165 out.push('\n');
166 out.push_str(&p);
167 out.push_str("tx.len=");
168 out.push_str(&cfg.tx.len().to_string());
169 out.push('\n');
170 for (j, lane_name) in cfg.tx.iter().enumerate() {
171 out.push_str(&p);
172 out.push_str("tx.");
173 out.push_str(&j.to_string());
174 out.push('=');
175 out.push_str(lane_name);
176 out.push('\n');
177 }
178 }
179 RoleCfgV1::Merge(cfg) => {
180 out.push_str(&p);
181 out.push_str("kind=merge\n");
182 out.push_str(&p);
183 out.push_str("name=");
184 out.push_str(&cfg.name);
185 out.push('\n');
186 out.push_str(&p);
187 out.push_str("policy=");
188 match cfg.policy {
189 MergePolicyV1::RoundRobin => out.push_str("round_robin"),
190 }
191 out.push('\n');
192 out.push_str(&p);
193 out.push_str("tx=");
194 out.push_str(&cfg.tx);
195 out.push('\n');
196 out.push_str(&p);
197 out.push_str("rx.len=");
198 out.push_str(&cfg.rx.len().to_string());
199 out.push('\n');
200 for (j, lane_name) in cfg.rx.iter().enumerate() {
201 out.push_str(&p);
202 out.push_str("rx.");
203 out.push_str(&j.to_string());
204 out.push('=');
205 out.push_str(lane_name);
206 out.push('\n');
207 }
208 }
209 }
210 }
211 }
212 PipelineSpecV1::SingleRing(ring) => {
213 out.push_str("model=single_ring\n");
214
215 out.push_str("single_ring.shards=");
216 out.push_str(&ring.shards.to_string());
217 out.push('\n');
218
219 out.push_str("single_ring.ordering=");
220 match ring.ordering {
221 OrderingV1::Strict => out.push_str("strict"),
222 OrderingV1::PerKey => out.push_str("per_key"),
223 OrderingV1::Unordered => out.push_str("unordered"),
224 }
225 out.push('\n');
226
227 out.push_str("single_ring.producer=");
228 match ring.producer {
229 SingleRingProducerV1::Single => out.push_str("single"),
230 SingleRingProducerV1::Mpmc => out.push_str("mpmc"),
231 }
232 out.push('\n');
233
234 out.push_str("single_ring.scheduling=");
235 match ring.scheduling {
236 SingleRingSchedulingV1::Dedicated => out.push_str("dedicated"),
237 SingleRingSchedulingV1::WorkQueue => out.push_str("work_queue"),
238 }
239 out.push('\n');
240
241 out.push_str("single_ring.shard_key=");
242 match ring.shard_key {
243 ShardKeyV1::FirstByte => out.push_str("first_byte"),
244 }
245 out.push('\n');
246
247 out.push_str("single_ring.stages=");
248 out.push_str(&ring.stages.len().to_string());
249 out.push('\n');
250
251 for (i, st) in ring.stages.iter().enumerate() {
252 out.push_str("stage.");
253 out.push_str(&i.to_string());
254 out.push_str(".op=");
255 match &st.op {
256 StageOpV1::Identity => out.push_str("identity\n"),
257 StageOpV1::AddU8 { delta } => {
258 out.push_str("add_u8\n");
259 out.push_str("stage.");
260 out.push_str(&i.to_string());
261 out.push_str(".delta=");
262 out.push_str(&delta.to_string());
263 out.push('\n');
264 }
265 #[cfg(feature = "alloc")]
266 StageOpV1::ResolverKey { stage } => {
267 out.push_str("resolver_key\n");
268 out.push_str("stage.");
269 out.push_str(&i.to_string());
270 out.push_str(".stage=");
271 out.push_str(stage);
272 out.push('\n');
273 }
274 }
275
276 out.push_str("stage.");
277 out.push_str(&i.to_string());
278 out.push_str(".depends_on=");
279 for (j, d) in st.depends_on.iter().enumerate() {
280 if j != 0 {
281 out.push(',');
282 }
283 out.push_str(&d.to_string());
284 }
285 out.push('\n');
286 }
287 }
288 }
289 out
290}
291
292#[cfg(feature = "alloc")]
294pub fn decode_kv_v1(input: &str) -> Result<PipelineSpecV1, SpecError> {
295 let mut map: BTreeMap<String, String> = BTreeMap::new();
296
297 for raw_line in input.lines() {
298 let line = raw_line.trim();
299 if line.is_empty() || line.starts_with('#') {
300 continue;
301 }
302 let Some((k, v)) = line.split_once('=') else {
303 return Err(SpecError::new("invalid kv line"));
304 };
305 if k.is_empty() {
306 return Err(SpecError::new("invalid kv key"));
307 }
308 map.insert(k.to_string(), v.to_string());
309 }
310
311 if let Some(v) = map.get("v") {
312 if v != "1" {
313 return Err(SpecError::new("unsupported spec version"));
314 }
315 }
316
317 let model = match map.get("model").map(|s| s.as_str()) {
318 Some("lane_graph") | None => "lane_graph",
319 Some("single_ring") => "single_ring",
320 _ => return Err(SpecError::new("unknown execution model")),
321 };
322
323 let spec = if model == "lane_graph" {
324 let lg = decode_lane_graph_v1(&map)?;
325 PipelineSpecV1::LaneGraph(lg)
326 } else {
327 let shards: u32 = map
328 .get("single_ring.shards")
329 .map(|s| s.as_str())
330 .unwrap_or("1")
331 .parse()
332 .map_err(|_| SpecError::new("invalid single_ring.shards"))?;
333
334 let ordering = match map.get("single_ring.ordering").map(|s| s.as_str()) {
338 None => {
339 if shards > 1 {
340 OrderingV1::PerKey
341 } else {
342 OrderingV1::Strict
343 }
344 }
345 Some("strict") => OrderingV1::Strict,
346 Some("per_key") => OrderingV1::PerKey,
347 Some("unordered") => OrderingV1::Unordered,
348 Some(_) => return Err(SpecError::new("unknown single_ring.ordering")),
349 };
350
351 let producer = match map.get("single_ring.producer").map(|s| s.as_str()) {
352 None => SingleRingProducerV1::Single,
353 Some("single") => SingleRingProducerV1::Single,
354 Some("mpmc") => SingleRingProducerV1::Mpmc,
355 Some(_) => return Err(SpecError::new("unknown single_ring.producer")),
356 };
357
358 let scheduling = match map.get("single_ring.scheduling").map(|s| s.as_str()) {
359 None => SingleRingSchedulingV1::Dedicated,
360 Some("dedicated") => SingleRingSchedulingV1::Dedicated,
361 Some("work_queue") => SingleRingSchedulingV1::WorkQueue,
362 Some(_) => return Err(SpecError::new("unknown single_ring.scheduling")),
363 };
364
365 let shard_key = match map.get("single_ring.shard_key").map(|s| s.as_str()) {
366 None => ShardKeyV1::FirstByte,
367 Some("first_byte") => ShardKeyV1::FirstByte,
368 Some(_) => return Err(SpecError::new("unknown single_ring.shard_key")),
369 };
370
371 let n: usize = map
372 .get("single_ring.stages")
373 .ok_or_else(|| SpecError::new("missing single_ring.stages"))?
374 .parse()
375 .map_err(|_| SpecError::new("invalid stage count"))?;
376
377 let mut stages = Vec::with_capacity(n);
378 for i in 0..n {
379 let op_key = format!("stage.{i}.op");
380 let op_val = map
381 .get(&op_key)
382 .ok_or_else(|| SpecError::new("missing stage op"))?;
383 let op = match op_val.as_str() {
384 "identity" => StageOpV1::Identity,
385 "add_u8" => {
386 let delta_key = format!("stage.{i}.delta");
387 let delta: u8 = map
388 .get(&delta_key)
389 .ok_or_else(|| SpecError::new("missing stage delta"))?
390 .parse()
391 .map_err(|_| SpecError::new("invalid stage delta"))?;
392 StageOpV1::AddU8 { delta }
393 }
394 #[cfg(feature = "alloc")]
395 "resolver" | "resolver_key" => {
396 let stage_key = format!("stage.{i}.stage");
397 let stage = map
398 .get(&stage_key)
399 .ok_or_else(|| SpecError::new("missing stage identity"))?
400 .to_string();
401 StageOpV1::ResolverKey { stage }
402 }
403 _ => return Err(SpecError::new("unknown stage op")),
404 };
405
406 let dep_key = format!("stage.{i}.depends_on");
407 let dep_val = map.get(&dep_key).map(|s| s.as_str()).unwrap_or("");
408 let mut depends_on = Vec::new();
409 let dep_val = dep_val.trim();
410 if !dep_val.is_empty() {
411 for part in dep_val.split(',') {
412 let d: u32 = part
413 .parse()
414 .map_err(|_| SpecError::new("invalid stage dependency"))?;
415 depends_on.push(d);
416 }
417 }
418
419 stages.push(StageV1 { op, depends_on });
420 }
421
422 PipelineSpecV1::SingleRing(SingleRingV1 {
423 shards,
424 ordering,
425 producer,
426 scheduling,
427 shard_key,
428 stages,
429 })
430 };
431
432 validate_v1(&spec)?;
433 Ok(spec)
434}
435
436#[cfg(feature = "alloc")]
437fn decode_lane_graph_v1(map: &BTreeMap<String, String>) -> Result<LaneGraphV1, SpecError> {
438 fn take_u32(map: &BTreeMap<String, String>, k: &str) -> Result<u32, SpecError> {
439 map.get(k)
440 .ok_or_else(|| SpecError::new("missing lane_graph key"))?
441 .parse()
442 .map_err(|_| SpecError::new("invalid lane_graph integer"))
443 }
444 fn take_usize(map: &BTreeMap<String, String>, k: &str) -> Result<usize, SpecError> {
445 map.get(k)
446 .ok_or_else(|| SpecError::new("missing lane_graph key"))?
447 .parse()
448 .map_err(|_| SpecError::new("invalid lane_graph integer"))
449 }
450 fn take_str<'a>(map: &'a BTreeMap<String, String>, k: &str) -> Result<&'a str, SpecError> {
451 map.get(k)
452 .map(|s| s.as_str())
453 .ok_or_else(|| SpecError::new("missing lane_graph key"))
454 }
455
456 let on_full = match map.get("lane_graph.on_full").map(|s| s.as_str()) {
457 Some("block") => OnFullV1::Block,
458 Some("drop") => OnFullV1::Drop,
459 Some(_) => return Err(SpecError::new("unknown lane_graph.on_full")),
460 None => return Err(SpecError::new("missing lane_graph config")),
461 };
462
463 let endpoints_len = take_usize(map, "lane_graph.endpoints.len")?;
464 let mut endpoints = Vec::with_capacity(endpoints_len);
465 for i in 0..endpoints_len {
466 let p = format!("lane_graph.endpoints.{i}.");
467 let name = take_str(map, &format!("{p}name"))?.to_string();
468 let kind = match take_str(map, &format!("{p}kind"))? {
469 "ingress" => EndpointKindV1::Ingress,
470 "egress" => EndpointKindV1::Egress,
471 _ => return Err(SpecError::new("unknown endpoint kind")),
472 };
473 let lane = take_str(map, &format!("{p}lane"))?.to_string();
474 endpoints.push(EndpointCfgV1 { name, kind, lane });
475 }
476
477 let lanes_len = take_usize(map, "lane_graph.lanes.len")?;
478 let mut lanes = Vec::with_capacity(lanes_len);
479 for i in 0..lanes_len {
480 let p = format!("lane_graph.lanes.{i}.");
481 let name = take_str(map, &format!("{p}name"))?.to_string();
482 let kind_s = take_str(map, &format!("{p}kind"))?;
483 let kind = match kind_s {
484 "events" => LaneKindV1::Events,
485 "journal" => LaneKindV1::Journal,
486 "sequenced_slots" => {
487 let capacity = take_u32(map, &format!("{p}capacity"))?;
488 let gating = take_u32(map, &format!("{p}gating"))?;
489 LaneKindV1::SequencedSlots { capacity, gating }
490 }
491 "fanout_broadcast" => {
492 let consumers = take_u32(map, &format!("{p}consumers"))?;
493 LaneKindV1::FanoutBroadcast { consumers }
494 }
495 _ => return Err(SpecError::new("unknown lane kind")),
496 };
497 lanes.push(LaneCfgV1 { name, kind });
498 }
499
500 let roles_len = take_usize(map, "lane_graph.roles.len")?;
501 let mut roles = Vec::with_capacity(roles_len);
502 for i in 0..roles_len {
503 let p = format!("lane_graph.roles.{i}.");
504 let kind_s = take_str(map, &format!("{p}kind"))?;
505 let name = take_str(map, &format!("{p}name"))?.to_string();
506
507 let role = match kind_s {
508 "stage" => {
509 let stage = take_str(map, &format!("{p}stage"))?.to_string();
510 let rx = take_str(map, &format!("{p}rx"))?.to_string();
511 let tx = take_str(map, &format!("{p}tx"))?.to_string();
512 RoleCfgV1::Stage(StageRoleCfgV1 {
513 name,
514 stage,
515 rx,
516 tx,
517 })
518 }
519 "bridge" => {
520 let rx = take_str(map, &format!("{p}rx"))?.to_string();
521 let tx = take_str(map, &format!("{p}tx"))?.to_string();
522 RoleCfgV1::Bridge(BridgeRoleCfgV1 { name, rx, tx })
523 }
524 "router" => {
525 let rx = take_str(map, &format!("{p}rx"))?.to_string();
526 let tx_len = take_usize(map, &format!("{p}tx.len"))?;
527 let mut tx = Vec::with_capacity(tx_len);
528 for j in 0..tx_len {
529 tx.push(take_str(map, &format!("{p}tx.{j}"))?.to_string());
530 }
531 RoleCfgV1::Router(RouterRoleCfgV1 { name, rx, tx })
532 }
533 "merge" => {
534 let policy = match take_str(map, &format!("{p}policy"))? {
535 "round_robin" => MergePolicyV1::RoundRobin,
536 _ => return Err(SpecError::new("unknown merge policy")),
537 };
538 let tx = take_str(map, &format!("{p}tx"))?.to_string();
539 let rx_len = take_usize(map, &format!("{p}rx.len"))?;
540 let mut rx = Vec::with_capacity(rx_len);
541 for j in 0..rx_len {
542 rx.push(take_str(map, &format!("{p}rx.{j}"))?.to_string());
543 }
544 RoleCfgV1::Merge(MergeRoleCfgV1 {
545 name,
546 rx,
547 tx,
548 policy,
549 })
550 }
551 _ => return Err(SpecError::new("unknown role kind")),
552 };
553 roles.push(role);
554 }
555
556 Ok(LaneGraphV1 {
557 endpoints,
558 lanes,
559 roles,
560 on_full,
561 })
562}
563